ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [카프카 핵심 가이드] CH4. 카프카 컨슈머: 카프카에서 데이터 읽기 (2)
    DevBook 2025. 2. 13. 20:00

    4.6 오프셋과 커밋

    • poll()을 호출할 때마다 카프카에 쓰여진 메시지 중 컨슈머 그룹에 속한 컨슈머들이 아직 읽지 않은 레코드가 리턴됨
      • 카프카의 고유한 특성 중 하나는 컨슈머로부터의 응답을 받는 방식이 아니라는 점
      • 대신, 컨슈머가 카프카를 사용해 각 파티션에서의 위치를 추적할 수 있게 함
    • 파티션에서의 현재 위치를 업데이트하는 작업을 '오프셋 커밋'이라고 함
      • 카프카는 레코드를 개별적으로 커밋하지 않음
      • 컨슈머는 파티션에서 성공적으로 처리해 낸 마지막 메시지를 커밋하여 그 앞의 모든 메시지들 역시 성공적으로 처리되었음을 암묵적으로 나타냄
      • poll()이 리턴한 마지막 오프셋 바로 다음 오프셋을 커밋하는 것이 기본적인 작동!
    • 카프카의 __consumer_offsets 토픽에 각 파티션별로 커밋된 오프셋을 업데이트하는 메시지를 보냄
      • 컨슈머는 각 파티션의 마지막으로 커밋된 메시지를 읽어온 뒤 거기서부터 처리를 재개함
    • KafkaConsumer API는 오프셋을 커밋하는 다양한 방법을 지원함

     

    4.6.1 자동 커밋

    • 컨슈머가 자동으로 커밋하는 방식
    • enable.auto.commit=true로 설정하면, 컨슈머는 5초에 한 번, poll()을 통해 받은 메시지 중 마지막 메시지의 오프셋을 커밋함
      • auto.commit.interval.ms로 간격 조정 가능
      • 자동 커밋은 폴링 루프에 의해 실행됨
      • 다음 번에 호출된 poll()이 이전 poll() 호출에서 리턴된 마지막 오프셋을 커밋
    • 고려사항
      • 메시지 중복 처리
        • 상황
          • 자동 커밋은 5초에 한 번 수행, 마지막으로 커밋한 지 3초 뒤에 컨슈머가 다운됨
          • 리밸런싱 완료 후 마지막으로 커밋된 오프셋부터 작업을 시작하는데, 해당 오프셋은 3초 전의 것이기 때문에 다운되기 3초 전까지 읽어서 처리된 이벤트들은 두 번 처리되게 됨
        • 커밋 간격을 줄이더라도 중복을 완전히 없애는 것은 불가능

     

    4.6.2 현재 오프셋 커밋하기

    • 개발자가 원하는 시간에 현재 오프셋을 커밋하는 옵션 제공
    • 방법
      • enable.auto.commit=false로 설정해 명시적으로 커밋할 때만 오프셋이 커밋되게 함
      • commitSync() 사용
        • poll()이 리턴한 마지막 오프셋을 커밋한 뒤 커밋이 성공하면 리턴, 어떠한 이유로 실패하면 예외 발생시킴
        • 해결할 수 없는 에러가 발생하지 않는 한, commitSync()는 커밋을 재시도함
    • 고려사항
      • poll()에 의해 리턴된 마지막 오프셋을 커밋한다는 점 유의!
      • 메시지 누락
        • poll()에서 리턴된 모든 레코드의 처리가 완료되기 전 commitSync()를 호출하면, 서버가 크래시되었을 때 커밋은 되었지만 아직 처리되지 않은 메시지들이 누락될 수 있음
      • 메시지 중복 처리
        • 서버가 레코드를 처리하는 와중에 크래시가 날 경우, 마지막 메시지 배치의 맨 앞 레코드에서부터 리밸런스 시작 시점까지의 모든 레코드들은 두 번 처리될 것임

     

    코드 예시)

    더보기
    Duration timeout = Duration.ofMillis(100);
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(timeout);
        for (ConsumerRecord<String, String> record : records) {
        	// 레코드 처리
        }
        try {
        	consumer.commitSync();
        } catch (CommitFailedException e) {
        	// 에러 로깅
        }
    }

     

    4.6.3 비동기적 커밋

    • 수동 커밋의 단점 중 하나는 브로커가 커밋 요청에 응답할 때까지 서버가 블록된다는 점
    • 비동기적 커밋 API를 사용하면, 브로커가 커밋에 응답할 때까지 기다리지 않고 요청만 보내고 처리를 계속
    • 고려사항
      • 기본적으로, commitAsync()는 재시도 하지 않음
      • commitAsync()가 서버로부터 응답을 받은 시점에는 이미 다른 커밋 시도가 성공했을 수도 있기 때문
        • 실패한 앞의 커밋 요청을 재시도해서 성공한다면, 다음 오프셋이 커밋 완료된 상황에서 이전 오프셋이 커밋되는 상황 발생
      • 콜백을 사용해 재시도 구현 시 이와 같은 커밋 순서를 고려해야 함

     

    코드 예시)

    더보기
    Duration timeout = Duration.ofMillis(100);
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(timeout);
        for (ConsumerRecord<String, String> record : records) {
        	// 레코드 처리
        }
        consumer.commitAsync(new OffsetCommitCallback() {
        	public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception e) {
            	// 콜백 수행
            }
        });
    }

     

    Q. 비동기적 커밋 재시도는 어떻게 할 수 있을까?

    • 순차적으로 단조증가하는 번호 사용
    더보기
    AtomicLong sequenceNumber = new AtomicLong(0);
    
    Duration timeout = Duration.ofMillis(100);
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(timeout);
        for (ConsumerRecord<String, String> record : records) {
        	// 레코드 처리
        }
        
        long failedSeq = sequenceNumber.incrementAndGet(); // 번호 증가
        
        consumer.commitAsync(new OffsetCommitCallback() {
        	public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception e) {
                long currentSeq = sequenceNumber.get();
                
                if (failedSeq >= currentSeq) {
                	// 다음 커밋 시도가 없었으므로 재시도
                } else {
                	// 다음 커밋 시도가 있었으므로 skip
                }
            }
        });
    }

     

    4.6.4 동기적 커밋과 비동기적 커밋 함께 사용하기

    • 언제 사용?
      • 컨슈머를 닫기 전 혹은 리밸런스 전 마지막 커밋일 때, 성공 여부를 추가로 확인할 필요가 있음
      • 이때 일반적인 패턴은 종료 직전에 commitAsync()와 commitSync()를 함께 사용하는 것

     

    코드 예시)

    더보기
    Duration timeout = Duration.ofMillis(100);
    
    try {
        while(!closing) {
        	ConsumerRecords<String, String> records = consumer.poll(timeout);
            for (ConsumerRecord<String, String> record : records) {
            	// 레코드 처리
            }
            consumer.commitAsync();
        }
        consumer.commitSync();
    } catch (Exception e) {
    	// 에러 로깅
    } finally {
    	consumer.close();
    }
    • 정상적인 상황에서는 commitAsync() 사용
    • 컨슈머를 닫는 상황에서는 다음 커밋이 없으므로 commitSync()를 호출해 커밋이 성공하거나 회복 불가능한 에러가 발생할 때까지 재시도하도록

     

    4.6.5 특정 오프셋 커밋하기

    • 언제 사용?
      • 가장 최근 오프셋만 커밋하는 게 아니라 더 자주 커밋하고 싶을 때
      • poll()이 큰 메시지 배치를 리턴해서, 리밸런스가 발생했을 경우 전체 배치를 재처리하는 상황을 피하기 위해 배치를 처리하는 도중에 오프셋을 커밋하고 싶을 때
    • 방법
      • commitSync()나 commitAsync() 호출 시 커밋하고자 하는 파티션과 오프셋의 맵 전달

     

    코드 예시)

    더보기
    private Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
    int count = 0;
    
    ...
    
    Duration timeout = Duration.ofMillis(100);
    
    while(true) {
        ConsumerRecords<String, String> records = consumer.poll(timeout);
        for (ConsumerRecord<String, String> record : records) {
        	// 레코드 처리
            
            currentOffsets.put(
                new TopicPartition(record.topic(), record.partition());
                new OffsetAndMetadata(record.offset()+1, "no metadata");
            );
            
            if (count % 1000 == 0) {
            	consumer.commitAsync(currentOffsets, null);
            }
            
            count++;
        }
    }
    • 커밋될 오프셋은 다음번에 읽어야 할 메시지의 오프셋으로 지정 (record.offset() + 1)

     

    4.7 리밸런스 리스너

    • 배경
      • 컨슈머는 종료하기 전이나 리밸런싱이 시작되기 전에 정리 작업을 해줘야
      • 컨슈머에 할당된 파티션이 해제될 것이라는 걸 알게 된다면 해당 파티션에서 마지막으로 처리한 이벤트의 오프셋을 커밋해야 함
      • 컨슈머 API는 컨슈머에 파티션이 할당되거나 해제될 때 사용자의 코드가 실행되도록 하는 메커니즘 제공
        • subscribe() 호출할 때 ConsumerRebalanceListener를 전달해주면 됨
    • ConsumerRebalanceListener 인터페이스에는 다음과 같이 3개의 메서드가 정의되어 있음
      • public void onPartitionsAssigned(Collection<TopicPartition> partitions)
        • 파티션이 컨슈머에 재할당된 후, 컨슈머가 메시지를 읽기 시작하기 전에 호출
        • 파티션과 함께 사용할 상태 적재, 필요한 오프셋 탐색 등의 준비 작업을 수행함
        • 여기서 수행되는 모든 준비 작업은 max.poll.timeout.ms 안에 완료되어야 함
      • public void onPartitionsRevoked(Collection<TopicPartition> partitions)
        • 컨슈머가 할당받았던 파티션이 할당 해제될 때 호출
        • 리밸런스 방식에 따른 구현 - 참고) 카프카 컨슈머 리밸런스
          • 조급한 리밸런스(Eager Rebalance) 방식
            • 컨슈머가 메시지 읽기를 멈춘 뒤에, 그리고 리밸런스가 시작되기 전에 호출됨 => 컨슈머가 그룹 코디네이터로 JoinGroup 요청을 보내기 전 할당된 파티션에 대한 소유권을 취소해야 하기 때문
          • 협력적 리밸런스(Cooperative Rebalance) 방식
            • 1차 리밸런스가 완료될 때, 컨슈머에서 할당 해제되어야 할 파티션들에 대해서만 호출됨
      • public void onPartitionsLost(Collection<TopicPartition> partitions)
        • 협력적 리밸런스 알고리즘이 사용되었을 때, 할당된 파티션이 리밸런스 알고리즘에 의해 해제되기 전에 다른 컨슈머에 먼저 할당된 예외 상황에서만 호출됨 (일반적인 상황에서는 onPartitionsRevoked() 호출)
          • 해당 메서드를 구현하지 않았을 경우 예외 상황에서 onPartitionsRevoked()가 대신 호출됨
        • 주의) 파티션을 새로 할당받은 컨슈머가 이미 상태를 저장했을 수도 있기 때문에 충돌 필해야 함

     

    참고) 협력적 리밸런스 알고리즘 사용 시 알아둘 사항!

    • onPartitionsAssigned()
      • 리밸런싱이 발생할 때마다 호출 => 즉, 리밸런스가 발생했음을 컨슈머에게 알려주는 역할
      • 컨슈머에게 새로 할당된 파티션이 없을 경우 빈 목록과 함께 호출됨
    • onPartitionsRevoked()
      • 파티션이 특정 컨슈머에서 해제될 때에만 호출 (메서드 호출될 때 빈 목록이 주어지는 경우 없음) => 즉, 재할당 대상 파티션을 할당받은 컨슈머에서만 호출
    • onPartitionsLost()
      • 예외적인 리밸런스 상황에서 호출
      • 메서드 호출될 때 주어지는 파티션들은 이 메서드가 호출되는 시점에 이미 다른 컨슈머들에게 할당되어 있는 상태

     

    코드 예제) 파티션이 해제되기 전 onPartitionsRevoked() 사용해 오프셋 커밋하기

    더보기
    private Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
    Duration timeout = Duration.ofMillis(100);
    
    private class HandleRebalance implements ConsumerRebalanceListener {
        public void onPartitionsAssigned(Collection<TopicPartition> partitions) { }
        
        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        	consumer.commitSync(currentOffsets); // 현재 오프셋 커밋 (3)
        }
    }
    
    ...
    
    try {
        consumer.subscribe(topics, new HandleRebalance()); // 리밸런스 리스너 함께 전달
        
        while (true) {
        	ConsumerRecords<String, String> records = consumer.poll(timeout);
            for (ConsumerRecord<String, String> record : records) {
            	// 레코드 처리
                
                currentOffsets.put(
                	new TopicPartition(record.topic(), record.partition()),
                    new OffsetAndMetadata(record.offset()+1, null)
                );
                
                consumer.commitAsync(); // 비동기 커밋
            }
        } catch (WakeupException e) {
        	// ignore, we're closing
        } catch (Exception e) {
        	// 에러 로깅
        } finally {
        	try {
            	consumer.commitSync(currentOffsets);
            } finally {
            	consumer.close(); // 컨슈머 종료
            }
        }
    }
    • (3) 리밸런싱으로 파티션이 해제될 상황일 때 오프셋을 커밋함

     

    4.8 특정 오프셋의 레코드 읽어오기

    • 다른 오프셋에서부터 읽기를 시작하고 싶은 경우에 사용
    • 상황
      • 파티션의 맨 앞에서부터 모든 메시지 읽고자 하거나, 앞의 메시지는 전부 건너뛰고 파티션에 새로 들어온 메시지부터 읽기 시작할 때
        • seekToBegining()과 seekToEnd() 사용
      • 특정한 오프셋으로 돌아가야할 때
        • ex) 시간에 민감한 어플리케이션에서 처리가 늦어져 몇 초간 메시지를 건너뛰어야 하는 경우, 파일에 데이터를 쓰는 컨슈머가 파일이 유실되어 데이터를 복구하기 위해 특정한 과거 시점으로 되돌아가야 할 때

     

    코드 예시) 모든 파티션의 현재 오프셋을 특정한 시각에 생성된 레코드의 오프셋으로 설정하기

    더보기
    Long oneHourEarlier = Instant.now().atZone(ZoneId.systemDefault()).minusHours(1).toEpochSecond();
    Map<TopicPartition, Long> partitionTimestampMap = consumer.assignment()
                                                            .stream()
                                                            .collect(Collectors.toMap(tp -> tp, tp -> oneHourEarlier); // (1)
    Map<TopicPartition, OffsetAndTimestamp> offsetMap = consumer.offsetsForTimes(partitionTimestampMap); // (2)
    
    for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsetMap.entrySet()) {
    	consumer.seek(entry.getKey(), entry.getValue().offset()); // (3)
    }
    • (1) 해당 컨슈머에 할당된 모든 파티션에 대해 되돌리고자 하는 타임스탬프 값을 담은 Map 생성
    • (2) 파티션별 각 타임스탬프에 해당하는 오프셋을 받아옴 (브로커에 요청을 보내 타임스탬프 인덱스에 저장된 오프셋을 리턴하도록 함)
    • (3) 각 파티션의 오프셋을 앞 단계에서 리턴된 오프셋으로 재설정

     

    4.9 폴링 루프를 벗어나는 방법 (카프카 컨슈머를 안전하게 종료하는 방법)

    • 컨슈머를 종료하고자 할 때, 컨슈머가 poll()을 오랫동안 기다리고 있더라도 즉시 루프를 탈출하고 싶다면 다른 스레드에서 consumer.wakeup()을 호출해주어야
      • 메인 스레드에서 컨슈머 루프를 돌고 있다면 ShutdownHook을 사용할 수 있음 
      • consumer.wakeup()은 다른 스레드에서 호출해줄 때만 안전하게 작동하는 유일한 컨슈머 메서드
      • wakeup을 호출하면 (대기중이던) poll()이 WakeupException을 발생시키며 중단되거나, 대기중이 아닐 경우에는 다음 번에 처음으로 poll()이 호출될 때 예외 발생함
    • WakeupException은 딱히 처리해 줄 필요 없지만, 스레드를 종료하기 전 consumer.close()는 호출해줘야
      • 컨슈머를 닫으면 오프셋을 커밋하고 그룹 코디네이터에게 컨슈머가 그룹을 떠난다는 메시지를 전송함
      • 이때 코디네이터가 즉시 리밸런싱을 실행하게 됨
    • 예제) https://github.com/gwenshap/kafka-examples/blob/master/SimpleMovingAvg/src/main/java/com/shapira/examples/newconsumer/simplemovingavg/SimpleMovingAvgNewConsumer.java

     

    요약

    • 카프카 컨슈머의 poll()은 블로킹 될 수 있어, 안전한 종료를 위해 consumer.wakeup()을 사용해야 함
    • ShutdownHook를 활용하면 어플리케이션 종료 시 자동으로 wakeup()을 호출할 수 있음
    • WakeupException을 캐치하고 consumer.close()를 호출해 안전하게 종료해야 함

    댓글

Designed by Tistory.