Study/카프카 핵심 가이드

[카프카 핵심 가이드] CH3. 카프카 프로듀서: 카프카에 메시지 쓰기

sw_develop 2025. 2. 4. 01:34

배울 내용

  • 프로듀서의 디자인과 주요 요소
  • KafkaProducer와 ProducerRecord 객체를 어떻게 생성하는지, 어떻게 카프카에 레코드를 전송하는지, 카프카가 리턴할 수 있는 에러를 어떻게 처리하는지
  • 프로듀서의 작동을 제어하기 위해 사용되는 가장 중요한 설정 옵션들
  • 파티션 할당 방식을 정의하는 파티셔너와 객체의 직렬화 방식을 정의하는 시리얼라이저에는 어떠한 것들이 있는지, 이들을 작성하기 위해 어떻게 해야 하는지

 

3.1 프로듀서 개요

  • 프로듀서 API는 매우 단순하지만, 내부적으로는 많은 작업들이 이루어진다.

카프카 프로듀서 요소 개괄

1. ProducerRecord 객체 생성

  • topic, value는 필수 / partition, key는 선택

2. 객체 직렬화

  • ProducerRecord 전송 API 호출 시 프로듀서는 키와 값 객체가 네트워크 상에서 전송될 수 있도록 객체를 직렬화해서 바이트 배열로 변환

3. 파티셔닝

  • 파티션을 명시적으로 지정하지 않았을 때 해당 데이터를 파티셔너에게 보냄
  • 파티셔너는 ProducerRecord 객체의 키 값 기준으로 파티션을 결정함

4. 카프카 브로커에 전송

  • 메시지가 전송될 토픽과 파티션이 확정되면 프로듀서는 같은 토픽 파티션으로 전송될 레코드들을 모은 레코드 배치에 추가
  • 별도의 스레드가 레코드 배치를 적절한 카프카 브로커에게 전송함

5. 응답 처리

  • 메시지 저장 성공 : 브로커는 토픽, 파티션, 해당 파티션 안에서의 레코드의 오프셋을 담은 RecordMetadata 객체 리턴
  • 메시지 저장 실패 : 에러 리턴
    • 프로듀서가 에러 수신 시 재전송 시도 가능

 

3.2 카프카 프로듀서 생성하기

메시지 전송 방법 3가지

  • Fire and forget
    • 메시지를 서버에 전송만 하고 성공 혹은 실패 여부를 신경쓰지 않는다.
    • 대부분의 경우 메시지는 성공적으로 전달된다.
      • 카프카가 가용성이 높고 프로듀서는 자동으로 전송 실패한 메시지를 재전송 시도하기 때문.
    • 부득이하게 재시도를 할 수 없는 에러가 발생하거나 타임아웃이 발생했을 경우 메시지는 유실되고, 어플리케이션은 이에 대해 아무런 정보나 예외를 전달받지 않게 된다.
  • 동기적 전송 (Synchronous send)
    • 기술적으로 카프카 프로듀서는 언제나 비동기적으로 작동한다. 즉, 메시지를 보내면 send() 메서드는 Future 객체를 리턴한다.
    • 하지만 다음 메시지를 전송하기 전 get() 메서드를 호출해 작업이 완료될 때까지 기다렸다가 실제 성공 여부를 확인해야 한다.
  • 비동기적 전송 (Asynchronous send)
    • 콜백 함수와 함께 send() 메서드를 호출하면 카프카 브로커로부터 응답 받는 시점에 자동으로 콜백 함수가 호출된다.

 

3.3 카프카로 메시지 전달하기

3.3.1 동기적으로 메시지 전송하기

ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Precision Products", "France");
try {
	producer.send(record).get();
} catch (Exception e) {
	e.printStackTrace();
}
  • 동기적으로 메시지를 전송할 경우 전송 요청 스레드는 블로킹되어 다른 메시지를 전송할 수 없다.

 

3.3.2 비동기적으로 메시지 전송하기

  • 비동기적으로 메시지를 전송하고 결과에 대한 에러를 처리해야 하는 경우를 위해 프로듀서는 레코드를 전송할 때 콜백을 지정할 수 있도록 한다.
private class DemoProducerCallback implements Callback { // 1)
    @Override
    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
    	if (e != null) {
        	e.printStackTrace();       
        }
    }
}

ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Biomedical Materials", "USA");
producer.send(record, new DemoProducerCallback()); // 2)

1) 콜백을 사용하려면 org.apache.kafka.clients.producer.Callback 인터페이스를 구현하는 클래스가 필요함

2) 레코드 전송할 때 Callback 객체를 함께 매개변수로 전달함

 

주의할 점

  • 콜백은 프로듀서의 메인 스레드에서 실행된다.
    • 만약 2개의 메시지를 동일한 파티션에 전송한다면, 콜백도 보낸 순서대로 실행됨
  • 메인 스레드는 메시지 전송을 담당하므로 콜백 안에서 블로킹 작업을 수행하는 것은 권장되지 않는다. (콜백 처리로 메인 스레드가 블로킹되어 메시지 전송을 못하게 되니까)

 

3.4 프로듀서 설정하기

  • 메모리 사용량이나 성능, 신뢰성 등에 영향을 미칠 수 있는 몇몇 설정값을 알아본다.
  • 프로듀서가 카프카 브로커로 메시지를 전송하는 과정은 몇가지 단계로 구성되므로 각 단계별 설정이 존재한다.

 

3.4.1 client.id

  • 프로듀서를 구분하기 위한 논리적 식별자
  • 용도 : 브로커가 로그 메시지를 출력하거나 성능 메트릭 값 집계할 때, 클라이언트별로 사용량을 할당할 때 사용
  • 중요한 이유 : 식별자를 명확히 설정하면 문제가 발생했을 때 트러블슈팅을 쉽게 해줌

 

3.4.2 acks

  • 프로듀서가 임의의 쓰기 작업이 성공했다고 판별하기 위해 얼마나 많은 파티션 레플리카가 해당 레코드를 받아야 하는지 결정하는 값
  • 설정 가능한 3가지 값
    • acks=0
      • 프로듀서는 메시지가 성공적으로 전달되었다고 간주하고 브로커의 응답을 기다리지 않음
      • 메시지 유실 가능성 : 있음
        • 브로커가 메시지를 받지 못했을 경우 프로듀서는 이를 알 수 없고 메시지 유실됨
      • 프로듀서가 서버로부터 응답을 기다리지 않는 만큼 네트워크가 허용하는 한 빠르게 메시지를 보낼 수 있음
      • 따라서 매우 높은 처리량이 필요할 때 사용될 수 있음
    • acks=1
      • 리더 레플리카가 메시지를 받는 순간 브로커로부터 성공 응답 수신
      • 리더에 메시지를 쓸 수 없다면 프로듀서는 에러 응답 수신
        • ex) 리더에 크래시 발생했지만 새 리더가 아직 선출되지 않은 상태
        • 데이터 유실을 피하기 위해 메시지 재전송 시도함
      • 메시지 유실 가능성 : 있음
        • 리더에 메시지 쓰기는 성공했지만 해당 메시지가 나머지 레플리카에 복제가 안 된 상태에서 리더에 크래시가 나서 새 리더가 선출될 경우 메시지 유실될 수 있음
    • acks=all
      • 카프카 버전 3.0부터 기본값
      • 메시지가 모든 인-싱크 레플리카(in-sync replica)에 전달된 뒤 브로커로부터 성공 응답 수신
      • 메시지 유실 가능성 : 없음
        • 가장 안전한 형태
        • min.insync.replicas >= 2일 때, 최소 2개 이상의 브로커가 해당 메시지를 가지고 있어 크래시가 났을 경우에도 유실되지 않음
  • 참고
    • 프로듀서의 acks 설정을 내려잡아 신뢰성을 낮추면 레코드 전송을 빠르게 할 수 있음
      • 즉, 신뢰성과 프로듀서 지연 사이에는 트레이드오프 존재
    • 하지만, 레코드가 생성되어 컨슈머가 읽을 수 있을 때까지의 시간을 의미하는 종단 지연(end to end latency)은 세 값이 모두 동일
      • 카프카는 일관성을 유지하기 위해 모든 인-싱크 레플리카에 복제 완료한 뒤 컨슈머가 레코드를 읽어갈 수 있게 하기 때문
    • 따라서 종단 지연을 고려해야하는 상황에서는 해당 설정을 고려하지 않아도 됨 (프로듀서의 설정이 컨슈머에 영향을 주지 않음)

 

3.4.3 메시지 전달 시간

  • 카프카가 성공적으로 응답을 내려보내 줄 때까지 사용자가 기다릴 수 있는 시간이자 요청 실패를 인정하고 포기할 때까지 기다릴 수 있는 시간
  • 아파치 카프카 2.1부터 ProducerRecord를 보낼 때 걸리는 시간을 두 구간으로 나눔
    • 구간1: send()에 대한 비동기 호출이 이뤄진 시각부터 결과를 리턴할 때까지 걸리는 시간
      • 이 시간 동안 send()를 호출한 스레드는 블록됨
    • 구간2: send()에 대한 비동기 호출이 성공적으로 리턴한 시각부터 (성공했건 실패했건) 콜백이 호출될 때까지 걸리는 시간
      • ProducerRecord가 전송을 위해 배치에 추가된 시점에서부터 카프카가 성공 응답을 보내거나, 재시도 불가능한 실패가 일어나거나, 전송을 위해 할당된 시간이 소진될 때까지의 시간과 동일함
    • 참고) send()를 동기적으로 호출할 경우, 메시지를 보내는 스레드는 두 구간에 대해 연속적으로 블록되어 각각의 구간이 어느 정도 걸렸는지 알 수 없음

 

 

1) max.block.ms

  • 아래의 경우에 프로듀서가 얼마나 오랫동안 블록되는지 결정
    • send()를 호출했을 때 : partitionsFor를 호출해 명시적으로 메타데이터를 요청했을 때
  • 해당 메서드는 프로듀서의 전송 버퍼가 가득 차거나 메타데이터가 아직 사용 가능하지 않을 때 블록됨
  • 이 상태에서 max.block.ms만큼 시간이 흐르면 예외 발생함

 

2) delivery.timeout.ms

  • 레코드 전송 준비가 완료된 시점에서부터 브로커의 응답을 받거나 아니면 전송을 포기하게 되는 시점까지의 제한시간 결정
    • 레코드 전송 준비가 완료된 시점 = 비동기 호출인 send()가 문제없이 리턴되고 레코드가 배치에 저장된 시점
    • 전송을 포기하게 되는 시점 = 재시도까지 모두 수행한 후 전송 포기하고 에러 발생시키는 시점
  • linger.ms와 request.timeout.ms보다 커야 함 (해당 조건 벗어난 설정으로 카프카 프로듀서 생성 시 예외 발생)
  • 타임아웃 케이스
    • 프로듀서가 재시도 하는 도중에 delivery.timeout.ms 초과했을 때
      • 마지막으로 재시도 하기 전에 브로커가 리턴한 에러에 해당하는 예외와 함께 콜백 호출됨
    • 레코드 배치가 전송을 기다리는 와중에 delivery.timeout.ms 초과했을 때
      • 타임아웃 예외와 함께 콜백 호출됨
  • 팁) 재시도 관련 설정 튜닝 시 재시도 횟수와 재시도 사이의 시간 간격을 조정하는 것보다 delivery.timeout.ms를 조정하면 됨

 

3) request.timeout.ms

  • 프로듀서가 데이터를 전송할 때 카프카 브로커로부터 응답을 받기 위해 기다리는 최대 시간 결정
    • 각각의 쓰기 요청 후 전송을 포기하기까지 대기하는 시간
    • 재시도 시간이나 실제 전송 이전에 소요되는 시간 등을 포함하지 않음!
  • 응답 없이 타임아웃 발생할 경우, 프로듀서는 재전송을 시도하거나 TimeoutException과 함께 콜백을 호출함

 

4) retries, retry.backoff.ms

  • retries : 프로듀서가 메시지 전송을 포기하고 에러를 발생시킬 때까지 메시지 재전송하는 횟수 결정
    • 기본적으로 재시도 사이에 100ms 동안 대기함
    • 해당 간격은 retry.backoff.ms를 통해 조정할 수 있음
  • 현재 버전의 카프카(카프카 3.0)는 이 값들을 조정하는 것을 권장하지 않음
    • 대신, 크래시 난 브로커가 정상으로 돌아오기까지의 시간을 테스트한 뒤 delivery.timeout.ms 조정을 권장
      • 모든 파티션에 대해 새 리더가 선출되는 데 걸리는 시간
    • 즉, 재전송을 시도하는 전체 시간이 카프카 클러스터가 크래시로부터 복구되기까지의 시간보다 더 길게 잡히도록 잡아 주는 것
  • 참고)
    • 재시도는 재시도 가능한 일시적인 에러(파티션에 리더가 없는 경우)일 경우 적용 가능
    • 재시도 불가능한 에러 및 재시도 횟수가 고갈되었을 경우는 별도 처리 필요함

 

3.4.4 linger.ms

  • 현재 배치를 카프카 브로커에 전송하기 전까지 대기하는 시간 결정
  • KafkaProducer는 현재 배치가 가득 차거나 linger.ms에 설정된 제한 시간이 되었을 때 메시지 배치를 전송
    • 기본적으로 프로듀서는 메시지 전송에 사용할 수 있는 스레드가 있을 때 곧바로 전송하도록 되어 있음
  • linger.ms를 0보다 큰 값으로 설정하면 프로듀서가 브로커에 메시지 배치를 전송하기 전에 메시지를 추가할 수 있도록 몇 ms가량 더 기다리도록 할 수 있음 (지연이 조금 증가되는 대신 처리율 증대 가능)

 

3.4.5 buffer.memory

  • 프로듀서가 메시지를 전송하기 전에 메시지를 대기시키는 버퍼의 크기(메모리의 양) 결정
  • 버퍼 메모리가 가득 찼을 때 추가로 호출되는 send()는 max.block.ms 동안 블록되어 버퍼 메모리에 공간이 생기기를 기다림
  • 해당 시간 동안 대기한 후에도 공간이 없으면 예외 발생
    • 이 타임아웃은 send() 메서드에서 발생

 

3.4.6 compression.type

  • 기본적으로 메시지는 압축되지 않은 상태로 전송됨
  • snappy, gzip, lz4, zstd 중 하나 설정해 압축 알고리즘 적용 가능
  • 압축 기능을 활성화하여 카프카로 메시지를 전송할 때 자주 병목이 되곤 하는 네트워크 사용량과 저장 공간을 절약할 수 있음

 

3.4.7 batch.size

  • 각각의 배치에 사용될 메모리의 양 결정
    • 같은 파티션에 다수의 레코드가 전송될 경우 프로듀서는 이것들을 배치 단위로 모아서 한꺼번에 전송
  • 배치가 가득 차면 해당 배치에 들어 있는 모든 메시지가 한꺼번에 전송됨
    • 프로듀서가 각각의 배치가 가득 찰 때까지 기다린다는 의미는 아님! (절반만 차거나 하나의 메시지만 들어 있는 배치도 전송함)
    • 따라서 이 매개변수를 지나치게 큰 값으로 유지한다고 해서 메시지 전송에 지연이 발생하지는 않음
    • 반면, 이 값을 지나치게 작게 설정할 경우 프로듀서가 메시지를 자주 전송해야 해서 약간의 오버헤드가 발생할 수 있음

 

3.4.8 max.in.flight.requests.per.connection

  • 프로듀서가 카프카 브로커로부터 응답을 받지 못한 상태에서 전송할 수 있는 최대 메시지의 수 결정

 

3.4.9 max.request.size

  • 프로듀서가 전송하는 쓰기 요청의 크기 결정
    • 메시지의 최대 크기와 한 번의 요청에 보낼 수 있는 메시지의 최대 개수 제한
    • ex) 설정값 = 1MB
      • 전송 가능한 메시지의 최대 크기는 1MB
      • 한 번에 보낼 수 있는 1KB 크기의 메시지 개수는 1024개
  • 카프카 브로커의 message.max.bytes 설정 고려해야 함
    • 브로커가 받아들일 수 있는 최대 메시지 크기 결정
    • 두 매개변수를 동일하게 맞춰 프로듀서가 브로커가 받을 수 없는 크기의 메시지를 전송하지 않게 하는 것이 좋음

 

3.4.10 receive.buffer.bytes, send.buffer.bytes

  • 데이터를 읽거나 쓸 때 소켓이 사용하는 TCP 송수신 버퍼의 크기 결정
  • -1일 경우 운영체제의 기본값 사용함

 

3.4.11 enable.idempotence

  • true 설정 시 '정확히 한 번 전송'을 위한 멱등적 프로듀서 기능 활성화
    • 아래 설정 조건도 만족해야 함
      • max.in.flight.requests.per.connection <= 5
      • retries >= 1
      • acks = all
  • 멱등적 프로듀서 기능이 활성화 되면,
    • 프로듀서는 레코드를 보낼 때마다 순차적인 번호를 붙여 보냄
    • 만약 브로커가 동일한 번호를 가진 레코드를 2개 이상 받을 경우 하나만 저장하게 되고, 프로듀서는 별다른 문제를 발생시키지 않는 DuplicateSequenceException 수신함

 

3.6 파티셔너

https://fordevelop.tistory.com/239

 

3.7 헤더

  • 레코드 헤더는 카프카 레코드의 key/value 값을 건드리지 않고 추가 메타데이터를 심을 때 사용함
  • 헤더의 주된 용도 중 하나는 메시지의 전달 내역을 기록하는 것
    • ex) 데이터가 생성된 곳의 정보 저장하여 헤더 정보만으로 메시지 라우팅 및 출처 추적 가능