-
[카프카 핵심 가이드] 3.6 파티셔너 (카프카 3.9 버전 반영)DevBook 2025. 2. 6. 01:03
✅ 카프카 버전 3.3 이전에는?
- 카프카 3.3 버전부터 파티셔너 관련 default configuration이 변경되었고, 새로운 configuration이 추가되었음
- 아래 내용은 카프카 버전 3.3 이전의 파티셔너 내용임
ProducerRecord의 키값 역할 2가지
- 메시지에 value와 함께 저장되는 추가적인 정보
- 토픽에 속한 여러 개의 파티션 중 해당 메시지가 저장될 파티션을 결정짓는 기준점
- 기본 파티셔너 사용할 경우 같은 키 값을 가진 모든 메시지는 같은 파티션에 저장됨
파티셔너 설정
- partitioner.class 설정 사용 (https://kafka.apache.org/32/documentation.html#producerconfigs_partitioner.class)
- DefaultPartitioner
- 메시지 배치가 다 차거나 linger.ms에 도달할 때까지 특정 파티션에 스티킹함 (해당 파티션에만 메시지를 할당함)
- 메시지의 키값이 존재할 때 : 키 해시 값 기반 파티션 선정
- 메시지의 키값이 존재하지 않을 때 : 스티키 파티션 선정
- RoundRobinPartitioner
- 메시지 키값 존재 여부 상관없이 라운드 로빈 알고리즘을 적용해 파티션 선정
- 참고) 새로운 메시지 배치 생성 시 고르지 않은 분산 이슈 : https://issues.apache.org/jira/browse/KAFKA-9965
- UniformStickyPartitioner
- 메시지 키값 존재 여부 상관없이 메시지 배치가 다 차거나 linger.ms에 도달할 때까지 특정 파티션에 스티킹함
- DefaultPartitioner
참고) Sticky Partitioner : KIP-480
- 파티셔너가 파티션을 선정할 때(메시지 배치를 채울 때) 다음 메시지 배치로 넘어가기 전 이전 메시지 배치를 먼저 채움
- 더 적은 요청 횟수로 같은 수의 메시지를 전송하여 지연 시간과 브로커의 CPU 사용량을 줄임
- 카프카 브로커로 보내야 하는 요청 수 5개 -> 3개로 감소
- 한 번에 브로커로 보내는 메시지 한도를 최대한 활용 (batch.size)
✅ 카프카 버전 3.3 부터는?
- 처리가 느린 카프카 브로커 내 파티션으로의 스티킹 파티셔닝으로 인한 성능 이슈(KIP-794)로 아래 내용으로 수정됨
파티셔너 설정
1) Default Configuration
- partitioner.class 설정 (default 값 : null)
- custom partitioner 지정했을 때 : 구현 방식 따라감
- custom partitioner 지정하지 않았을 때 (null일 때)
- 레코드 키값이 없거나 키값 상관없이 파티셔닝할 때
- KafkaProducer의 built-in 파티셔닝 로직 적용
- 참고) 키값 상관없이 파티셔닝하려면 paritioner.class=null, partitioner.ignore.keys=true 설정 권장
- DefaultPartitioner, UniformStickyPartitioner는 deprecated됨
- 레코드 키값에 따라 파티셔닝할 때
- 키 해시 값 기반 파티션 선정
- 레코드 키값이 없거나 키값 상관없이 파티셔닝할 때
2) New Configuration
- 링크 참고
- 해당 설정들은 partition.class=null인 경우에만 적용됨
추가) KafkaProducer의 built-in 파티셔닝 로직은 어떻게 동작할까?
public class KafkaProducer { ... private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) { if (record.partition() != null) return record.partition(); if (partitioner != null) { // 커스텀 파티셔너 지정했을 때 int customPartition = partitioner.partition( record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster); if (customPartition < 0) { throw new IllegalArgumentException(String.format( "The partitioner generated an invalid partition number: %d. Partition number should always be non-negative.", customPartition)); } return customPartition; } if (serializedKey != null && !partitionerIgnoreKeys) { // 레코드 키값 존재하고 파티셔닝에 키값 사용할 때 // hash the keyBytes to choose a partition return BuiltInPartitioner.partitionForKey(serializedKey, cluster.partitionsForTopic(record.topic()).size()); } else { // 레코드 키값 존재하지 않거나 파티셔닝에 키값 사용하지 않을 때 return RecordMetadata.UNKNOWN_PARTITION; } } }
- 레코드 키값이 존재하지 않거나 파티셔닝에 키값 사용하지 않을 때 RecordMetadata.UNKNOWN_PARTITION 반환함
- UNKNOWN_PARTITION = int 타입 -1 값으로 선언되어 있음
public class RecordAccumulator { public RecordAppendResult append(...) { ... if (partition == RecordMetadata.UNKNOWN_PARTITION) { partitionInfo = topicInfo.builtInPartitioner.peekCurrentPartitionInfo(cluster); effectivePartition = partitionInfo.partition(); } } }
- RecordAccumulator 클래스의 append() 메서드 내부에서 분기 처리됨
public class BuiltInPartitioner { ... StickyPartitionInfo peekCurrentPartitionInfo(Cluster cluster) { StickyPartitionInfo partitionInfo = stickyPartitionInfo.get(); if (partitionInfo != null) return partitionInfo; // We're the first to create it. partitionInfo = new StickyPartitionInfo(nextPartition(cluster)); if (stickyPartitionInfo.compareAndSet(null, partitionInfo)) return partitionInfo; // Someone has raced us. return stickyPartitionInfo.get(); } ... private int nextPartition(Cluster cluster) { ... PartitionLoadStats partitionLoadStats = this.partitionLoadStats; if (partitionLoadStats == null) { // We don't have stats to do adaptive partitioning (or it's disabled), just switch to the next // partition based on uniform distribution. } else { // PartitionLoadStats 기반 adaptive partitioning 제공 } ... } }
- PartitionLoadStats 정보가 존재할 경우 해당 정보를 기반으로 Adaptive Partitioning을 수행함
- 카프카 브로커의 성능을 모니터링하여 성능에 따라 메시지를 동적으로 분배함
- 처리가 빠른 브로커 내 파티션을 선택해 더 많은 메시지를 전송할 수 있게 함
- partitioner.adaptive.partitioning.enable 설정을 통해 제어되며, 기본값은 true
'DevBook' 카테고리의 다른 글
[카프카 핵심 가이드] CH4. 카프카 컨슈머: 카프카에서 데이터 읽기 (1) (0) 2025.02.11 [카프카 핵심 가이드] 추가 - 카프카 컨슈머 리밸런스 (0) 2025.02.11 [카프카 핵심 가이드] CH3. 카프카 프로듀서: 카프카에 메시지 쓰기 (0) 2025.02.04 [카프카 핵심 가이드] CH1. 카프카 시작하기 (0) 2025.02.04