Study/카프카 핵심 가이드

[카프카 핵심 가이드] 3.6 파티셔너 (카프카 3.9 버전 반영)

sw_develop 2025. 2. 6. 01:03

✅ 카프카 버전 3.3 이전에는?

  • 카프카 3.3 버전부터 파티셔너 관련 default configuration이 변경되었고, 새로운 configuration이 추가되었음
  • 아래 내용은 카프카 버전 3.3 이전의 파티셔너 내용임

 

ProducerRecord의 키값 역할 2가지

  • 메시지에 value와 함께 저장되는 추가적인 정보
  • 토픽에 속한 여러 개의 파티션 중 해당 메시지가 저장될 파티션을 결정짓는 기준점
    • 기본 파티셔너 사용할 경우 같은 키 값을 가진 모든 메시지는 같은 파티션에 저장됨

 

파티셔너 설정

  • partitioner.class 설정 사용 (https://kafka.apache.org/32/documentation.html#producerconfigs_partitioner.class)
    • DefaultPartitioner
      • 메시지 배치가 다 차거나 linger.ms에 도달할 때까지 특정 파티션에 스티킹함 (해당 파티션에만 메시지를 할당함)
      • 메시지의 키값이 존재할 때 : 키 해시 값 기반 파티션 선정
      • 메시지의 키값이 존재하지 않을 때 : 스티키 파티션 선정
    • RoundRobinPartitioner
    • UniformStickyPartitioner
      • 메시지 키값 존재 여부 상관없이 메시지 배치가 다 차거나 linger.ms에 도달할 때까지 특정 파티션에 스티킹함

 

참고) Sticky Partitioner : KIP-480

  • 파티셔너가 파티션을 선정할 때(메시지 배치를 채울 때) 다음 메시지 배치로 넘어가기 전 이전 메시지 배치를 먼저 채움
  • 더 적은 요청 횟수로 같은 수의 메시지를 전송하여 지연 시간과 브로커의 CPU 사용량을 줄임

  • 카프카 브로커로 보내야 하는 요청 수 5개 -> 3개로 감소
  • 한 번에 브로커로 보내는 메시지 한도를 최대한 활용 (batch.size)

 

 카프카 버전 3.3 부터는?

  • 처리가 느린 카프카 브로커 내 파티션으로의 스티킹 파티셔닝으로 인한 성능 이슈(KIP-794)로 아래 내용으로 수정됨

 

파티셔너 설정

1) Default Configuration

  • partitioner.class 설정 (default 값 : null)
    • custom partitioner 지정했을 때 : 구현 방식 따라감
    • custom partitioner 지정하지 않았을 때 (null일 때)
      • 레코드 키값이 없거나 키값 상관없이 파티셔닝할 때
        • KafkaProducer의 built-in 파티셔닝 로직 적용
        • 참고) 키값 상관없이 파티셔닝하려면 paritioner.class=null, partitioner.ignore.keys=true 설정 권장
          • DefaultPartitioner, UniformStickyPartitioner는 deprecated됨
      • 레코드 키값에 따라 파티셔닝할 때
        • 키 해시 값 기반 파티션 선정

 

2) New Configuration

  • 링크 참고
  • 해당 설정들은 partition.class=null인 경우에만 적용됨

 

추가) KafkaProducer의 built-in 파티셔닝 로직은 어떻게 동작할까?

public class KafkaProducer {
	...

	private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
        if (record.partition() != null)
            return record.partition();

        if (partitioner != null) { // 커스텀 파티셔너 지정했을 때
            int customPartition = partitioner.partition(
                record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
            if (customPartition < 0) {
                throw new IllegalArgumentException(String.format(
                    "The partitioner generated an invalid partition number: %d. Partition number should always be non-negative.", customPartition));
            }
            return customPartition;
        }

        if (serializedKey != null && !partitionerIgnoreKeys) { // 레코드 키값 존재하고 파티셔닝에 키값 사용할 때
            // hash the keyBytes to choose a partition
            return BuiltInPartitioner.partitionForKey(serializedKey, cluster.partitionsForTopic(record.topic()).size());
        } else { // 레코드 키값 존재하지 않거나 파티셔닝에 키값 사용하지 않을 때
            return RecordMetadata.UNKNOWN_PARTITION;
        }
    }

}
  • 레코드 키값이 존재하지 않거나 파티셔닝에 키값 사용하지 않을 때 RecordMetadata.UNKNOWN_PARTITION 반환함
    • UNKNOWN_PARTITION = int 타입 -1 값으로 선언되어 있음

 

public class RecordAccumulator {

	public RecordAppendResult append(...) {
    	...
        
        if (partition == RecordMetadata.UNKNOWN_PARTITION) {
            partitionInfo = topicInfo.builtInPartitioner.peekCurrentPartitionInfo(cluster);
            effectivePartition = partitionInfo.partition();
        }
    }
}
  • RecordAccumulator 클래스의 append() 메서드 내부에서 분기 처리됨

 

public class BuiltInPartitioner {
	...

	StickyPartitionInfo peekCurrentPartitionInfo(Cluster cluster) {
        StickyPartitionInfo partitionInfo = stickyPartitionInfo.get();
        if (partitionInfo != null)
            return partitionInfo;

        // We're the first to create it.
        partitionInfo = new StickyPartitionInfo(nextPartition(cluster));
        if (stickyPartitionInfo.compareAndSet(null, partitionInfo))
            return partitionInfo;

        // Someone has raced us.
        return stickyPartitionInfo.get();
    }
    
    ...
    
    private int nextPartition(Cluster cluster) {
    	...

        PartitionLoadStats partitionLoadStats = this.partitionLoadStats;

        if (partitionLoadStats == null) {
            // We don't have stats to do adaptive partitioning (or it's disabled), just switch to the next
            // partition based on uniform distribution.
        } else {
        	// PartitionLoadStats 기반 adaptive partitioning 제공
        }
        
        ...
    }
}
  • PartitionLoadStats 정보가 존재할 경우 해당 정보를 기반으로 Adaptive Partitioning을 수행함
    • 카프카 브로커의 성능을 모니터링하여 성능에 따라 메시지를 동적으로 분배함
    • 처리가 빠른 브로커 내 파티션을 선택해 더 많은 메시지를 전송할 수 있게 함
    • partitioner.adaptive.partitioning.enable 설정을 통해 제어되며, 기본값은 true