-
[카프카 핵심 가이드] CH4. 카프카 컨슈머: 카프카에서 데이터 읽기 (2)DevBook 2025. 2. 13. 20:00
4.6 오프셋과 커밋
- poll()을 호출할 때마다 카프카에 쓰여진 메시지 중 컨슈머 그룹에 속한 컨슈머들이 아직 읽지 않은 레코드가 리턴됨
- 카프카의 고유한 특성 중 하나는 컨슈머로부터의 응답을 받는 방식이 아니라는 점
- 대신, 컨슈머가 카프카를 사용해 각 파티션에서의 위치를 추적할 수 있게 함
- 파티션에서의 현재 위치를 업데이트하는 작업을 '오프셋 커밋'이라고 함
- 카프카는 레코드를 개별적으로 커밋하지 않음
- 컨슈머는 파티션에서 성공적으로 처리해 낸 마지막 메시지를 커밋하여 그 앞의 모든 메시지들 역시 성공적으로 처리되었음을 암묵적으로 나타냄
- poll()이 리턴한 마지막 오프셋 바로 다음 오프셋을 커밋하는 것이 기본적인 작동!
- 카프카의 __consumer_offsets 토픽에 각 파티션별로 커밋된 오프셋을 업데이트하는 메시지를 보냄
- 컨슈머는 각 파티션의 마지막으로 커밋된 메시지를 읽어온 뒤 거기서부터 처리를 재개함
- KafkaConsumer API는 오프셋을 커밋하는 다양한 방법을 지원함
4.6.1 자동 커밋
- 컨슈머가 자동으로 커밋하는 방식
- enable.auto.commit=true로 설정하면, 컨슈머는 5초에 한 번, poll()을 통해 받은 메시지 중 마지막 메시지의 오프셋을 커밋함
- auto.commit.interval.ms로 간격 조정 가능
- 자동 커밋은 폴링 루프에 의해 실행됨
- 다음 번에 호출된 poll()이 이전 poll() 호출에서 리턴된 마지막 오프셋을 커밋함
- 고려사항
- 메시지 중복 처리
- 상황
- 자동 커밋은 5초에 한 번 수행, 마지막으로 커밋한 지 3초 뒤에 컨슈머가 다운됨
- 리밸런싱 완료 후 마지막으로 커밋된 오프셋부터 작업을 시작하는데, 해당 오프셋은 3초 전의 것이기 때문에 다운되기 3초 전까지 읽어서 처리된 이벤트들은 두 번 처리되게 됨
- 커밋 간격을 줄이더라도 중복을 완전히 없애는 것은 불가능
- 상황
- 메시지 중복 처리
4.6.2 현재 오프셋 커밋하기
- 개발자가 원하는 시간에 현재 오프셋을 커밋하는 옵션 제공
- 방법
- enable.auto.commit=false로 설정해 명시적으로 커밋할 때만 오프셋이 커밋되게 함
- commitSync() 사용
- poll()이 리턴한 마지막 오프셋을 커밋한 뒤 커밋이 성공하면 리턴, 어떠한 이유로 실패하면 예외 발생시킴
- 해결할 수 없는 에러가 발생하지 않는 한, commitSync()는 커밋을 재시도함
- 고려사항
- poll()에 의해 리턴된 마지막 오프셋을 커밋한다는 점 유의!
- 메시지 누락
- poll()에서 리턴된 모든 레코드의 처리가 완료되기 전 commitSync()를 호출하면, 서버가 크래시되었을 때 커밋은 되었지만 아직 처리되지 않은 메시지들이 누락될 수 있음
- 메시지 중복 처리
- 서버가 레코드를 처리하는 와중에 크래시가 날 경우, 마지막 메시지 배치의 맨 앞 레코드에서부터 리밸런스 시작 시점까지의 모든 레코드들은 두 번 처리될 것임
코드 예시)
더보기Duration timeout = Duration.ofMillis(100); while (true) { ConsumerRecords<String, String> records = consumer.poll(timeout); for (ConsumerRecord<String, String> record : records) { // 레코드 처리 } try { consumer.commitSync(); } catch (CommitFailedException e) { // 에러 로깅 } }
4.6.3 비동기적 커밋
- 수동 커밋의 단점 중 하나는 브로커가 커밋 요청에 응답할 때까지 서버가 블록된다는 점
- 비동기적 커밋 API를 사용하면, 브로커가 커밋에 응답할 때까지 기다리지 않고 요청만 보내고 처리를 계속함
- 고려사항
- 기본적으로, commitAsync()는 재시도 하지 않음
- commitAsync()가 서버로부터 응답을 받은 시점에는 이미 다른 커밋 시도가 성공했을 수도 있기 때문임
- 실패한 앞의 커밋 요청을 재시도해서 성공한다면, 다음 오프셋이 커밋 완료된 상황에서 이전 오프셋이 커밋되는 상황 발생
- 콜백을 사용해 재시도 구현 시 이와 같은 커밋 순서를 고려해야 함
코드 예시)
더보기Duration timeout = Duration.ofMillis(100); while (true) { ConsumerRecords<String, String> records = consumer.poll(timeout); for (ConsumerRecord<String, String> record : records) { // 레코드 처리 } consumer.commitAsync(new OffsetCommitCallback() { public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception e) { // 콜백 수행 } }); }
Q. 비동기적 커밋 재시도는 어떻게 할 수 있을까?
- 순차적으로 단조증가하는 번호 사용
더보기AtomicLong sequenceNumber = new AtomicLong(0); Duration timeout = Duration.ofMillis(100); while (true) { ConsumerRecords<String, String> records = consumer.poll(timeout); for (ConsumerRecord<String, String> record : records) { // 레코드 처리 } long failedSeq = sequenceNumber.incrementAndGet(); // 번호 증가 consumer.commitAsync(new OffsetCommitCallback() { public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception e) { long currentSeq = sequenceNumber.get(); if (failedSeq >= currentSeq) { // 다음 커밋 시도가 없었으므로 재시도 } else { // 다음 커밋 시도가 있었으므로 skip } } }); }
4.6.4 동기적 커밋과 비동기적 커밋 함께 사용하기
- 언제 사용?
- 컨슈머를 닫기 전 혹은 리밸런스 전 마지막 커밋일 때, 성공 여부를 추가로 확인할 필요가 있음
- 이때 일반적인 패턴은 종료 직전에 commitAsync()와 commitSync()를 함께 사용하는 것
코드 예시)
더보기Duration timeout = Duration.ofMillis(100); try { while(!closing) { ConsumerRecords<String, String> records = consumer.poll(timeout); for (ConsumerRecord<String, String> record : records) { // 레코드 처리 } consumer.commitAsync(); } consumer.commitSync(); } catch (Exception e) { // 에러 로깅 } finally { consumer.close(); }
- 정상적인 상황에서는 commitAsync() 사용
- 컨슈머를 닫는 상황에서는 다음 커밋이 없으므로 commitSync()를 호출해 커밋이 성공하거나 회복 불가능한 에러가 발생할 때까지 재시도하도록 함
4.6.5 특정 오프셋 커밋하기
- 언제 사용?
- 가장 최근 오프셋만 커밋하는 게 아니라 더 자주 커밋하고 싶을 때
- poll()이 큰 메시지 배치를 리턴해서, 리밸런스가 발생했을 경우 전체 배치를 재처리하는 상황을 피하기 위해 배치를 처리하는 도중에 오프셋을 커밋하고 싶을 때
- 방법
- commitSync()나 commitAsync() 호출 시 커밋하고자 하는 파티션과 오프셋의 맵 전달
코드 예시)
더보기private Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>(); int count = 0; ... Duration timeout = Duration.ofMillis(100); while(true) { ConsumerRecords<String, String> records = consumer.poll(timeout); for (ConsumerRecord<String, String> record : records) { // 레코드 처리 currentOffsets.put( new TopicPartition(record.topic(), record.partition()); new OffsetAndMetadata(record.offset()+1, "no metadata"); ); if (count % 1000 == 0) { consumer.commitAsync(currentOffsets, null); } count++; } }
- 커밋될 오프셋은 다음번에 읽어야 할 메시지의 오프셋으로 지정 (record.offset() + 1)
4.7 리밸런스 리스너
- 배경
- 컨슈머는 종료하기 전이나 리밸런싱이 시작되기 전에 정리 작업을 해줘야 함
- 컨슈머에 할당된 파티션이 해제될 것이라는 걸 알게 된다면 해당 파티션에서 마지막으로 처리한 이벤트의 오프셋을 커밋해야 함
- 컨슈머 API는 컨슈머에 파티션이 할당되거나 해제될 때 사용자의 코드가 실행되도록 하는 메커니즘 제공
- subscribe() 호출할 때 ConsumerRebalanceListener를 전달해주면 됨
- ConsumerRebalanceListener 인터페이스에는 다음과 같이 3개의 메서드가 정의되어 있음
- public void onPartitionsAssigned(Collection<TopicPartition> partitions)
- 파티션이 컨슈머에 재할당된 후, 컨슈머가 메시지를 읽기 시작하기 전에 호출
- 파티션과 함께 사용할 상태 적재, 필요한 오프셋 탐색 등의 준비 작업을 수행함
- 여기서 수행되는 모든 준비 작업은 max.poll.timeout.ms 안에 완료되어야 함
- public void onPartitionsRevoked(Collection<TopicPartition> partitions)
- 컨슈머가 할당받았던 파티션이 할당 해제될 때 호출
- 리밸런스 방식에 따른 구현 - 참고) 카프카 컨슈머 리밸런스
- 조급한 리밸런스(Eager Rebalance) 방식
- 컨슈머가 메시지 읽기를 멈춘 뒤에, 그리고 리밸런스가 시작되기 전에 호출됨 => 컨슈머가 그룹 코디네이터로 JoinGroup 요청을 보내기 전 할당된 파티션에 대한 소유권을 취소해야 하기 때문
- 협력적 리밸런스(Cooperative Rebalance) 방식
- 1차 리밸런스가 완료될 때, 컨슈머에서 할당 해제되어야 할 파티션들에 대해서만 호출됨
- 조급한 리밸런스(Eager Rebalance) 방식
- public void onPartitionsLost(Collection<TopicPartition> partitions)
- 협력적 리밸런스 알고리즘이 사용되었을 때, 할당된 파티션이 리밸런스 알고리즘에 의해 해제되기 전에 다른 컨슈머에 먼저 할당된 예외 상황에서만 호출됨 (일반적인 상황에서는 onPartitionsRevoked() 호출)
- 해당 메서드를 구현하지 않았을 경우 예외 상황에서 onPartitionsRevoked()가 대신 호출됨
- 주의) 파티션을 새로 할당받은 컨슈머가 이미 상태를 저장했을 수도 있기 때문에 충돌 필해야 함
- 협력적 리밸런스 알고리즘이 사용되었을 때, 할당된 파티션이 리밸런스 알고리즘에 의해 해제되기 전에 다른 컨슈머에 먼저 할당된 예외 상황에서만 호출됨 (일반적인 상황에서는 onPartitionsRevoked() 호출)
- public void onPartitionsAssigned(Collection<TopicPartition> partitions)
참고) 협력적 리밸런스 알고리즘 사용 시 알아둘 사항!
- onPartitionsAssigned()
- 리밸런싱이 발생할 때마다 호출 => 즉, 리밸런스가 발생했음을 컨슈머에게 알려주는 역할
- 컨슈머에게 새로 할당된 파티션이 없을 경우 빈 목록과 함께 호출됨
- onPartitionsRevoked()
- 파티션이 특정 컨슈머에서 해제될 때에만 호출 (메서드 호출될 때 빈 목록이 주어지는 경우 없음) => 즉, 재할당 대상 파티션을 할당받은 컨슈머에서만 호출
- onPartitionsLost()
- 예외적인 리밸런스 상황에서 호출
- 메서드 호출될 때 주어지는 파티션들은 이 메서드가 호출되는 시점에 이미 다른 컨슈머들에게 할당되어 있는 상태
코드 예제) 파티션이 해제되기 전 onPartitionsRevoked() 사용해 오프셋 커밋하기
더보기private Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>(); Duration timeout = Duration.ofMillis(100); private class HandleRebalance implements ConsumerRebalanceListener { public void onPartitionsAssigned(Collection<TopicPartition> partitions) { } public void onPartitionsRevoked(Collection<TopicPartition> partitions) { consumer.commitSync(currentOffsets); // 현재 오프셋 커밋 (3) } } ... try { consumer.subscribe(topics, new HandleRebalance()); // 리밸런스 리스너 함께 전달 while (true) { ConsumerRecords<String, String> records = consumer.poll(timeout); for (ConsumerRecord<String, String> record : records) { // 레코드 처리 currentOffsets.put( new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset()+1, null) ); consumer.commitAsync(); // 비동기 커밋 } } catch (WakeupException e) { // ignore, we're closing } catch (Exception e) { // 에러 로깅 } finally { try { consumer.commitSync(currentOffsets); } finally { consumer.close(); // 컨슈머 종료 } } }
- (3) 리밸런싱으로 파티션이 해제될 상황일 때 오프셋을 커밋함
4.8 특정 오프셋의 레코드 읽어오기
- 다른 오프셋에서부터 읽기를 시작하고 싶은 경우에 사용
- 상황
- 파티션의 맨 앞에서부터 모든 메시지 읽고자 하거나, 앞의 메시지는 전부 건너뛰고 파티션에 새로 들어온 메시지부터 읽기 시작할 때
- seekToBegining()과 seekToEnd() 사용
- 특정한 오프셋으로 돌아가야할 때
- ex) 시간에 민감한 어플리케이션에서 처리가 늦어져 몇 초간 메시지를 건너뛰어야 하는 경우, 파일에 데이터를 쓰는 컨슈머가 파일이 유실되어 데이터를 복구하기 위해 특정한 과거 시점으로 되돌아가야 할 때
- 파티션의 맨 앞에서부터 모든 메시지 읽고자 하거나, 앞의 메시지는 전부 건너뛰고 파티션에 새로 들어온 메시지부터 읽기 시작할 때
코드 예시) 모든 파티션의 현재 오프셋을 특정한 시각에 생성된 레코드의 오프셋으로 설정하기
더보기Long oneHourEarlier = Instant.now().atZone(ZoneId.systemDefault()).minusHours(1).toEpochSecond(); Map<TopicPartition, Long> partitionTimestampMap = consumer.assignment() .stream() .collect(Collectors.toMap(tp -> tp, tp -> oneHourEarlier); // (1) Map<TopicPartition, OffsetAndTimestamp> offsetMap = consumer.offsetsForTimes(partitionTimestampMap); // (2) for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsetMap.entrySet()) { consumer.seek(entry.getKey(), entry.getValue().offset()); // (3) }
- (1) 해당 컨슈머에 할당된 모든 파티션에 대해 되돌리고자 하는 타임스탬프 값을 담은 Map 생성
- (2) 파티션별 각 타임스탬프에 해당하는 오프셋을 받아옴 (브로커에 요청을 보내 타임스탬프 인덱스에 저장된 오프셋을 리턴하도록 함)
- (3) 각 파티션의 오프셋을 앞 단계에서 리턴된 오프셋으로 재설정
4.9 폴링 루프를 벗어나는 방법 (카프카 컨슈머를 안전하게 종료하는 방법)
- 컨슈머를 종료하고자 할 때, 컨슈머가 poll()을 오랫동안 기다리고 있더라도 즉시 루프를 탈출하고 싶다면 다른 스레드에서 consumer.wakeup()을 호출해주어야 함
- 메인 스레드에서 컨슈머 루프를 돌고 있다면 ShutdownHook을 사용할 수 있음
- consumer.wakeup()은 다른 스레드에서 호출해줄 때만 안전하게 작동하는 유일한 컨슈머 메서드
- wakeup을 호출하면 (대기중이던) poll()이 WakeupException을 발생시키며 중단되거나, 대기중이 아닐 경우에는 다음 번에 처음으로 poll()이 호출될 때 예외 발생함
- WakeupException은 딱히 처리해 줄 필요 없지만, 스레드를 종료하기 전 consumer.close()는 호출해줘야 함
- 컨슈머를 닫으면 오프셋을 커밋하고 그룹 코디네이터에게 컨슈머가 그룹을 떠난다는 메시지를 전송함
- 이때 코디네이터가 즉시 리밸런싱을 실행하게 됨
- 예제) https://github.com/gwenshap/kafka-examples/blob/master/SimpleMovingAvg/src/main/java/com/shapira/examples/newconsumer/simplemovingavg/SimpleMovingAvgNewConsumer.java
요약
- 카프카 컨슈머의 poll()은 블로킹 될 수 있어, 안전한 종료를 위해 consumer.wakeup()을 사용해야 함
- ShutdownHook를 활용하면 어플리케이션 종료 시 자동으로 wakeup()을 호출할 수 있음
- WakeupException을 캐치하고 consumer.close()를 호출해 안전하게 종료해야 함
'DevBook' 카테고리의 다른 글
[카프카 핵심 가이드] CH4. 카프카 컨슈머: 카프카에서 데이터 읽기 (1) (0) 2025.02.11 [카프카 핵심 가이드] 추가 - 카프카 컨슈머 리밸런스 (0) 2025.02.11 [카프카 핵심 가이드] 3.6 파티셔너 (카프카 3.9 버전 반영) (0) 2025.02.06 [카프카 핵심 가이드] CH3. 카프카 프로듀서: 카프카에 메시지 쓰기 (0) 2025.02.04 [카프카 핵심 가이드] CH1. 카프카 시작하기 (0) 2025.02.04 - poll()을 호출할 때마다 카프카에 쓰여진 메시지 중 컨슈머 그룹에 속한 컨슈머들이 아직 읽지 않은 레코드가 리턴됨