kafka 컨슈머 정리

컨슈머에 대해서

Featured image

kafka 컨슈머 간단 정리

컨슈머는 브로커에 적재돼있는 데이터를 사용하기 위해 브로커로부터 데이터를 가져와서 필요한 처리를 한다.

컨슈머 주요 옵션

오프셋 옵션 종류 특징
auto.offset.reset=latest 가장 최신 오프셋부터 읽기 시작
auto.offset.reset=earliest 가장 오래전에 넣은 오프셋부터 읽기 시작
auto.offset.reset=none 컨슈머 그룹이 커밋한 기록이 있는지 찾아서 없으면 오류를, 있다면 기족 커밋 기록 이후 오프셋부터 읽기 시작
트랜잭션 옵션 특징
isolation.level=read_uncommitted 커밋 여부와 관계없이 파티션에 있는 모든 레코드를 읽는다.
isolation.level=read_committed 커밋이 완료된 레코드만 읽는다.

컨슈머 운영 방법

1개 이상의 컨슈머로 이루어진 컨슈머 그룹을 운영

각각의 컨슈머 그룹으로부터 격리된 환경에서 안전하게 운영할 수 있는 방식

컨슈머 그룹의 컨슈머 개수는 가져가고자 하는 토픽의 파티션 개수보다 같거나 작아야 한다.

컨슈머 그룹은 다른 컨슈머 그룹과 격리되는 특징이 있기 때문에 특정 그룹의 장애 발생 시, 서로의 그룹 간 장애가 격리되는 구조이므로 유연하게 대처가 가능하다.

컨슈머 그룹의 장애 발생

컨슈머 그룹으로 이루어진 컨슈머 중 일부 컨슈머에 장애가 발생할 경우 리밸런싱이 일어나게 된다.

리밸런싱이란, 특정 컨슈머에 장애가 발생했을 경우 해당 컨슈머에 할당된 파티션이 다른 컨슈머로 소유권이 넘어가는 과정을 말한다.

리밸런싱은 언제든 발생할 수 있는 프로세스이기 때문에 이를 대응하는 코드를 미리 작성해두어야 한다. 때문에 이처럼 가용성을 높여주고 안정적인 운영을 도와주는 리밸런싱은 유용하지만 자주 일어나서는 안된다. 리밸런싱이 발생하는 과정에서 파티션의 소유권을 재할당할 때, 해당 컨슈머 그룹의 컨슈머들이 토픽의 데이터를 읽을 수 없게 되기 때문이다.

카프카 프로커 중 한 대가 그룹 조정자의 역할로써, 컨슈머 그룹의 컨슈머 추가/삭제 시점을 감지하여 리밸런싱을 발생시킨다.

토픽의 특정 파티션만 구독하는 컨슈머를 운영

보통 consumer 객체의 subscibe() 메서드를 사용하여 구독 형태로 컨슈머를 사용하지만, 직접 특정 파티션을 할당 받아 컨슈머를 사용하기 위한 방법이 있다.

컨슈머가 특정 토픽명과 해당 토픽 내 할당할 파티션 번호를 assign()메서드 파라미터를 통해 명시하여 사용하면 된다.

subscribe() 방식과 달리 직접 컨슈머가 특정 토픽과 파티션을 할당 받아 사용하게 되므로 리밸런싱하는 과정이 없다.

오프셋 커밋

명시적/비명시적 오프셋 커밋 방법이 있다.

비명시 오프셋 커밋(자동 커밋) 방식은 enable.auto.commit=trueauto.commitl.inverval.ms옵션을 통해 일정 시간마다 자동으로 오프셋 커밋이 발생되도록 설정할 수 있다.

오프셋 커밋 방식 구현 방법 특징 데이터 유실 가능 여부
비명시적 커밋(자동 커밋) `enable.auto.commit=true` 와 `auto.commitl.inverval.ms`옵션을 통해 설정 로그성 데이터 처리 같이 사용자가 오프셋, 파티션 관리를 하지 않아도 되는 경우 사용됨.
구현하기는 쉽지만 리밸런싱이나 컨슈머 강제 동료과 같은 상황이 발생했을 경우, 데이터 유실/중복이 발생할 수 있다.
O
명시적 커밋(수동 커밋) (poll() 메서드 호출 후)commitSync() 호출 메시지 처리 완료 때 까지 메시지를 가져온 것으로 간주되면 안되는 경우에 사용
이전 커밋이 실패하더라도 다음 커밋이 처리되긴 하지만 수동 커밋임으로 이러한 상황이 발생할 경우에 대해 대응하는 코드를 짜둠으로써(ex. consumerRecord.seek() or retry 설정) 처리가 가능한 커밋 방식이다(이전 커밋에 실패했을 때 해당 커밋을 재시도하기 위한 설정이 아님)
브로커에 커밋 요청 후 응답을 받을 때 까지 기다렸다가 다음 커밋을 진행하기 때문에 속도가 느려 처리량이 줄어든다.
X
(poll() 메서드 호출 후)commitAsync() 호출 커밋 요청 후 응답 받기 전에 비동기적으로 다음 커밋을 요청한다. 때문에 이전 커밋 요청이 실패했더라도 다음 커밋을 처리하는 과정에서 데이터의 순서 보장이 안되거나 중복 처리가 발생할 수 있다. O

ack.acknowledge() vs consumer.commitSync()

리밸런싱으로 인한 중복 처리 대응 방법

poll() 메서드를 통해 컨슈머로 반환받은 데이터를 모두 처리하기 전에(처리 완료 후 오프셋 커밋이 진행되므로) 리밸런스가 발생할 경우 데이터가 중복 처리될 수 있다.

리밸런스 발생 시 데이터를 중복 처리하지 않기 위해서는 리밸런스 발생 시 처리한 데이터를 기준으로 커밋을 시도해야 하고 이때 리밸런스 발생 감지를 위해 ConsumerRebalanceListener 인터페이스가 지원된다.

ConsumerRebalanceListener 인터페이스는 onPartitionAssigned() 메서드와 onPartitionRevoked() 메서드로 이뤄져 있다.

ConsumerRebalanceListener 인터페이스 메서드 종류 특징
onPartitionAssigned() 리밸런스가 끝난 뒤 파티션 할당이 완료되었을 때 호출되는 메서드
onPartitionRevoked() 리밸런스가 시작되기 직전에 호출되는 메서드
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);
    consumer.subscribe(Arrays.asList(TOPIC), new RebalanceListener());
    
    private static class RebalanceListener implements ConsumerRebalanceListener {
        
        @Override
        public void onPartitionAssigned(Collection<TopicPartition> partitions) {
            log.info("Partition assigned");
        }
        
        @Override
        public void onPartitionRevoked(Collection<TopicPartition> partitions) {
            log.info("Partition revoked");
            consumer.commitSync();      // 리밸런스 발생 직전, 현재 가장 마지막으로 처리 완료된 레코드를 기준으로 커밋을 실시하게 함으로써 데이터 중복을 방지할 수 있음.
        }
        
    }

컨슈머 배포 프로세스

중단 배포

짧은 시간 동안 중단이 되어 지연이 발생하더라도 서비스 운영이 큰 문제가 없을 경우 사용하는 방식

서비스가 완전히 중단된 후 배포가 되기 때문에 컨슈머 랙이 발생하게 되고 이는 즉, 지연이 발생한다는 의미이다.

무중단 배포

서비스 중단으로 지연이 발생할 시 서비스에 큰 문제가 발생할 경우 사용하는 방식

블루/그린 배포

이전 버전과 신규 버전 애플리케이션을 동시에 띄워놓고 트래픽을 전환하는 방식

파티션과 컨슈머 개수가 동일할 때 신규 버전 애플리케이션을 배포해두고 동일 컨슈머 그룹으로 파티션을 굳고하도록 실행하면 신규 버전 애플리케이션의 컨슈머들은 파티션을 할당받지 못하고 유휴 상태에 빠지는 상황을 활용하여 배포하는 방식이다.

파티션과 컨슈머 개수가 동일하지 않다면 파티션들이 기존 애플리케이션과 신규 애플리케이션에 섞여 할당받을 수 있기 때문이다.

신규 애플리케이션들이 모두 준비된 후 기존 애플리케이션을 중단하면 리밸런싱이 발생하면서 파티션들은 모두 신규 애플리케이션에 할당되게 된다.

리밸런스가 한번만 수행되기 때문에 짧은 리밸런스 시간으로 배포 수행이 가능하다.

롤링 배포

구 애플리케이션에서 신규 애플리케이션으로 트래픽을 점진적으로 전환하는 배포 방식으로 파티션 개수가 인스턴스 개수와 같거나 그 이상이어야 한다.

파티션의 개수가 많아질 수록 리밸런스가 많아지는 방식이므로 파티션 개수가 적을 수록 좋다.

카나리 배포

100개의 파티션을 운영하는 경우, 1개 파티션에 먼저 컨슈머를 따로 배정하여 여기에 신규 애플리케이션을 배포한 후 사전 테스트를 진행하고 문제가 없을 경우 나머지 99개의 파티션에는 롤링 이나 블루/그린 배포 방식으로 전부 배포하는 방식이다.