ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [카프카 핵심 가이드] 3.6 파티셔너 (카프카 3.9 버전 반영)
    DevBook 2025. 2. 6. 01:03

    ✅ 카프카 버전 3.3 이전에는?

    • 카프카 3.3 버전부터 파티셔너 관련 default configuration이 변경되었고, 새로운 configuration이 추가되었음
    • 아래 내용은 카프카 버전 3.3 이전의 파티셔너 내용임

     

    ProducerRecord의 키값 역할 2가지

    • 메시지에 value와 함께 저장되는 추가적인 정보
    • 토픽에 속한 여러 개의 파티션 중 해당 메시지가 저장될 파티션을 결정짓는 기준점
      • 기본 파티셔너 사용할 경우 같은 키 값을 가진 모든 메시지는 같은 파티션에 저장됨

     

    파티셔너 설정

    • partitioner.class 설정 사용 (https://kafka.apache.org/32/documentation.html#producerconfigs_partitioner.class)
      • DefaultPartitioner
        • 메시지 배치가 다 차거나 linger.ms에 도달할 때까지 특정 파티션에 스티킹함 (해당 파티션에만 메시지를 할당함)
        • 메시지의 키값이 존재할 때 : 키 해시 값 기반 파티션 선정
        • 메시지의 키값이 존재하지 않을 때 : 스티키 파티션 선정
      • RoundRobinPartitioner
      • UniformStickyPartitioner
        • 메시지 키값 존재 여부 상관없이 메시지 배치가 다 차거나 linger.ms에 도달할 때까지 특정 파티션에 스티킹함

     

    참고) Sticky Partitioner : KIP-480

    • 파티셔너가 파티션을 선정할 때(메시지 배치를 채울 때) 다음 메시지 배치로 넘어가기 전 이전 메시지 배치를 먼저 채움
    • 더 적은 요청 횟수로 같은 수의 메시지를 전송하여 지연 시간과 브로커의 CPU 사용량을 줄임

    • 카프카 브로커로 보내야 하는 요청 수 5개 -> 3개로 감소
    • 한 번에 브로커로 보내는 메시지 한도를 최대한 활용 (batch.size)

     

     카프카 버전 3.3 부터는?

    • 처리가 느린 카프카 브로커 내 파티션으로의 스티킹 파티셔닝으로 인한 성능 이슈(KIP-794)로 아래 내용으로 수정됨

     

    파티셔너 설정

    1) Default Configuration

    • partitioner.class 설정 (default 값 : null)
      • custom partitioner 지정했을 때 : 구현 방식 따라감
      • custom partitioner 지정하지 않았을 때 (null일 때)
        • 레코드 키값이 없거나 키값 상관없이 파티셔닝할 때
          • KafkaProducer의 built-in 파티셔닝 로직 적용
          • 참고) 키값 상관없이 파티셔닝하려면 paritioner.class=null, partitioner.ignore.keys=true 설정 권장
            • DefaultPartitioner, UniformStickyPartitioner는 deprecated됨
        • 레코드 키값에 따라 파티셔닝할 때
          • 키 해시 값 기반 파티션 선정

     

    2) New Configuration

    • 링크 참고
    • 해당 설정들은 partition.class=null인 경우에만 적용됨

     

    추가) KafkaProducer의 built-in 파티셔닝 로직은 어떻게 동작할까?

    public class KafkaProducer {
    	...
    
    	private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
            if (record.partition() != null)
                return record.partition();
    
            if (partitioner != null) { // 커스텀 파티셔너 지정했을 때
                int customPartition = partitioner.partition(
                    record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
                if (customPartition < 0) {
                    throw new IllegalArgumentException(String.format(
                        "The partitioner generated an invalid partition number: %d. Partition number should always be non-negative.", customPartition));
                }
                return customPartition;
            }
    
            if (serializedKey != null && !partitionerIgnoreKeys) { // 레코드 키값 존재하고 파티셔닝에 키값 사용할 때
                // hash the keyBytes to choose a partition
                return BuiltInPartitioner.partitionForKey(serializedKey, cluster.partitionsForTopic(record.topic()).size());
            } else { // 레코드 키값 존재하지 않거나 파티셔닝에 키값 사용하지 않을 때
                return RecordMetadata.UNKNOWN_PARTITION;
            }
        }
    
    }
    • 레코드 키값이 존재하지 않거나 파티셔닝에 키값 사용하지 않을 때 RecordMetadata.UNKNOWN_PARTITION 반환함
      • UNKNOWN_PARTITION = int 타입 -1 값으로 선언되어 있음

     

    public class RecordAccumulator {
    
    	public RecordAppendResult append(...) {
        	...
            
            if (partition == RecordMetadata.UNKNOWN_PARTITION) {
                partitionInfo = topicInfo.builtInPartitioner.peekCurrentPartitionInfo(cluster);
                effectivePartition = partitionInfo.partition();
            }
        }
    }
    • RecordAccumulator 클래스의 append() 메서드 내부에서 분기 처리됨

     

    public class BuiltInPartitioner {
    	...
    
    	StickyPartitionInfo peekCurrentPartitionInfo(Cluster cluster) {
            StickyPartitionInfo partitionInfo = stickyPartitionInfo.get();
            if (partitionInfo != null)
                return partitionInfo;
    
            // We're the first to create it.
            partitionInfo = new StickyPartitionInfo(nextPartition(cluster));
            if (stickyPartitionInfo.compareAndSet(null, partitionInfo))
                return partitionInfo;
    
            // Someone has raced us.
            return stickyPartitionInfo.get();
        }
        
        ...
        
        private int nextPartition(Cluster cluster) {
        	...
    
            PartitionLoadStats partitionLoadStats = this.partitionLoadStats;
    
            if (partitionLoadStats == null) {
                // We don't have stats to do adaptive partitioning (or it's disabled), just switch to the next
                // partition based on uniform distribution.
            } else {
            	// PartitionLoadStats 기반 adaptive partitioning 제공
            }
            
            ...
        }
    }
    • PartitionLoadStats 정보가 존재할 경우 해당 정보를 기반으로 Adaptive Partitioning을 수행함
      • 카프카 브로커의 성능을 모니터링하여 성능에 따라 메시지를 동적으로 분배함
      • 처리가 빠른 브로커 내 파티션을 선택해 더 많은 메시지를 전송할 수 있게 함
      • partitioner.adaptive.partitioning.enable 설정을 통해 제어되며, 기본값은 true

    댓글

Designed by Tistory.