kafka 프로듀서 정리

프로듀서에 대해서

Featured image

kafka 프로듀서 간단 정리

프로듀서

프로듀서는 카프카 브로커로 데이터를 전송할 때 내부적으로 파티셔너, 배치 생성 단계를 거친다.

프로듀서 전송 과정

ProducerRecord –(send())–> Partitioner –> Accumulator(batches in Topics) –> Sender –> Kafka Cluster

ProducerRecord

메시지 전송을 위해 ProducerRecord 객체를 이용한다.

ProducerRecord에서는 메시지 키가 무엇인지, 커스텀 파티셔너인지 여부를 따지기 전에 가장 먼저 파티션 번호가 있는지를 우선 순위로 확인하고 처리한다.

프로듀서에서 데이터를 전송할 파티션을 선정하는 방법은 3가지가 있다.

  1. 파티션 번호를 직접 지정하여 지정된 파티션에 바로 데이터를 전송
  2. 가장 우선 순위인 파티션 번호가 지정되어 있지 않는 경우에 메시지 키를 설정하여 메시지 키의 해시 값을 이용해 데이터를 전송할 파티션을 지정한다.
  3. 파티션 번호, 메시지 키 모두 지정돼있지 않는 경우에는 라운드 로빈 방식으로 데이터를 전송할 파티션을 할당한다.

메시지 키가 설정된 메시지는 무조건 동일한 파티션에 지정된다(하나의 파티션을 독점하는 것이 아닌 여러 메시지와 같은 파티션을 사용할 수는 있지만 같은 메시지 키를 가진 메세지는 무조건 같은 파티션에 지정)

Partitioner 종류

파티션 번호 지정 혹은 메시지 키가 설정 돼있을 때는 지정된 파티션 혹은 메시지 키의 해시값과 파티션을 매칭하여 데이터를 전송한다는 공통점이 있다.

파티셔너 특징
UniformStickyPartitioner RoundRobinPartitioner의 단점을 개선, 높은 처리량과 낮은 리소스 사용.
기본 파티셔너
배치가 모두 묶일 때 까지 기다렸다가 한번에 모두 동일한 파티션으로 데이터를 전송하는 방식.
RoundRobinPartitioner ProducerRecord가 들어오는 대로 파티션을 순회하면서 전송하기 때문에 배치로 묶이는 빈도가 적기 때문에 성능이 떨어짐

프로듀서 주요 옵션

설정 값 특징
acks=1(default) 프로듀서가 전송한 데이터가 리더 파티션에 저장되면 성공으로 판단
데이터 복제에 실패하더라도 리더 파티션에만 저장되면 다음 데이터를 계속 전송하기 때문에 일부 데이터가 유실될 수 있다.
acks=0 프로듀서가 전송한 즉시 데이터 저장 여부와 상관없이 성공으로 판단
전송 실패 여부를 알 수 없기 때문에 retries 옵션 설정은 무의미해진다.
네트워크 오류나 브로커의 이슈로 데이터가 유실되더라도 다음 데이터를 계속 전송한다.
전송 속도는 가장 빠르기 때문에 데이터가 유실되더라도 전송 속도가 중요할 경우 사용한다.
acks=-1(all) 프로듀서가 전송한 데이터가 min.insync.replicas 개수에 해당하는 리더/팔로워 파티션에 데이터가 저장되면 성공으로 판단
설정 값 전송 속도 데이터 유실 가능 여부
acks=1(default) 중간 O
acks=0 빠름 O
acks=-1(all) 느림 X

배치가 모두 묶일 때 까지 기다렸다가 한번에 모두 동일한 파티션으로 데이터를 전송하는 방식의 UniformStickyPartitioner의 linger.ms도 0인건가?

프로듀서와 브로커 사이의 네트워크 장애로 한 번 이상 데이터가 전송되더라도 이를 PID(Produder unique ID)외에 시퀀스 넘버가 추가되어 이를 통해 구분하여 정확히 한번만 전달하게 할 수 있는 멱등성 프로듀서 동작 여부 설정

PID는 어떠한 이슈로 인해 애플리케이션이 재시작 됬을 경우, 이전 이슈로 전송되지 못했던 메시지가 같은 데이터라도 PID값은 새로 부여되어 달라진다.

해당 옵션 true 설정 시, retries옵션은 Integer.MAX, acks옵션은 all로 자동 설정된다.

컨슈머는 기본적으로 파티션에 데이터가 쌓이는 족족 가져가지만, 트랜잭션으로 묶이게 되면 브로커에서 가져갈 때 다르게 동작하게 된다.

트랜색션 프로듀서를 사용하려면 enbale.idempotence=true로 설정하고, transaction.id는 임의의 String 값, 컨슈머에서는 isolation.level=read_committed로 설정하면 프로듀서와 컨슈머는 트랜잭션으로 처리 완료된 데이터만 읽고 쓰고 읽게 된다.

트랜잭션의 시작과 끝을 구분하기 위해 프로듀서에서 트랜잭션이 완료되었음만을 알리는 레코드를 마지막에 한번 더 전송하고 컨슈머는 파티션에 저장된 트랜잭션 레코드를 보고 트랜잭션이 완료되었다는 레코드를 확인했을 때 데이터를 전부 가져간다.

커스텀 파티셔너

특정 메시지 키를 가진 레코드는 0번 파티션에만 저장되도록 Partitioner 인터페이스를 직접 구현하여 커스텀하게 동작하는 파티셔너를 구현할 수 있다.

커스텀 파티셔너를 지정한 경우, ProducerConfig.PARTITION_CLASS_CONFIG 옵션을 사용자 생성 파티셔너로 설정하여 KafkaProducer 인스턴스를 생성해야 한다.

프로듀서의 브로커 정상 전송 여부 확인 처리

동기 처리

ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, message);
RecordMetadata metadata = producer.send(record).get();

KafkaProducer의 send() 메서드는 Future 객체를 반환하는 데 이 객체는 RecordMetadata의 비동기 결과를 표현하는 것으로 카프카 브로커에 정상적으로 저장이 되었는지에 대한 데이터가 포함되어 있다.

send()메서드에서 .get() 메서드를 사용하면 동기적으로 전송 결과 데이터를 알 수 있다.

동기로 프로듀서 전송 결과를 얻는 것은 성능에 제약을 준다.

비동기 처리

Callback 인터페이스를 구현하거나 send() 메서드가 반환한 ListenableFuture객체에 ListenableFutureCallback 인터페이스를 추가하고 onSuccess, onFailure 메서드를 오버라이드한 객체를 구현하여 레코드의 비동기 결과를 얻을 수 있다.

    ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topicName, message);
    future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
        @Override 
        public void onSuccess(SendResult<String, String> result) {
            log.info("send success");
        }

        public void onFailure(Throwable ex) {
            log.error("send error! topic={}, message={}", topicName, message);
        }
    });

비동기 전송 결과는 처리 속도는 빠르지만 데이터의 순서가 중요한 경우에는 사용하면 안된다.

프로듀서 전송 결과 처리 방식 특징
동기 방식 전송 결과를 기다렸다가 다음 데이터를 전송하기 때문에 처리 속도가 느리다.
비동기 방식 이전 전송 결과를 받기 전에 다음 데이터를 전송하고 이전 데이터의 전송이 실패된 경우에는 데이터 전송 순서가 역전될 수 있다.