-
[카프카 핵심 가이드] CH3. 카프카 프로듀서: 카프카에 메시지 쓰기DevBook 2025. 2. 4. 01:34
배울 내용
- 프로듀서의 디자인과 주요 요소
- KafkaProducer와 ProducerRecord 객체를 어떻게 생성하는지, 어떻게 카프카에 레코드를 전송하는지, 카프카가 리턴할 수 있는 에러를 어떻게 처리하는지
- 프로듀서의 작동을 제어하기 위해 사용되는 가장 중요한 설정 옵션들
- 파티션 할당 방식을 정의하는 파티셔너와 객체의 직렬화 방식을 정의하는 시리얼라이저에는 어떠한 것들이 있는지, 이들을 작성하기 위해 어떻게 해야 하는지
3.1 프로듀서 개요
- 프로듀서 API는 매우 단순하지만, 내부적으로는 많은 작업들이 이루어진다.
카프카 프로듀서 요소 개괄 1. ProducerRecord 객체 생성
- topic, value는 필수 / partition, key는 선택
2. 객체 직렬화
- ProducerRecord 전송 API 호출 시 프로듀서는 키와 값 객체가 네트워크 상에서 전송될 수 있도록 객체를 직렬화해서 바이트 배열로 변환
3. 파티셔닝
- 파티션을 명시적으로 지정하지 않았을 때 해당 데이터를 파티셔너에게 보냄
- 파티셔너는 ProducerRecord 객체의 키 값 기준으로 파티션을 결정함
4. 카프카 브로커에 전송
- 메시지가 전송될 토픽과 파티션이 확정되면 프로듀서는 같은 토픽 파티션으로 전송될 레코드들을 모은 레코드 배치에 추가함
- 별도의 스레드가 레코드 배치를 적절한 카프카 브로커에게 전송함
5. 응답 처리
- 메시지 저장 성공 : 브로커는 토픽, 파티션, 해당 파티션 안에서의 레코드의 오프셋을 담은 RecordMetadata 객체 리턴
- 메시지 저장 실패 : 에러 리턴
- 프로듀서가 에러 수신 시 재전송 시도 가능
3.2 카프카 프로듀서 생성하기
메시지 전송 방법 3가지
- Fire and forget
- 메시지를 서버에 전송만 하고 성공 혹은 실패 여부를 신경쓰지 않는다.
- 대부분의 경우 메시지는 성공적으로 전달된다.
- 카프카가 가용성이 높고 프로듀서는 자동으로 전송 실패한 메시지를 재전송 시도하기 때문.
- 부득이하게 재시도를 할 수 없는 에러가 발생하거나 타임아웃이 발생했을 경우 메시지는 유실되고, 어플리케이션은 이에 대해 아무런 정보나 예외를 전달받지 않게 된다.
- 동기적 전송 (Synchronous send)
- 기술적으로 카프카 프로듀서는 언제나 비동기적으로 작동한다. 즉, 메시지를 보내면 send() 메서드는 Future 객체를 리턴한다.
- 하지만 다음 메시지를 전송하기 전 get() 메서드를 호출해 작업이 완료될 때까지 기다렸다가 실제 성공 여부를 확인해야 한다.
- 비동기적 전송 (Asynchronous send)
- 콜백 함수와 함께 send() 메서드를 호출하면 카프카 브로커로부터 응답 받는 시점에 자동으로 콜백 함수가 호출된다.
3.3 카프카로 메시지 전달하기
3.3.1 동기적으로 메시지 전송하기
ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Precision Products", "France"); try { producer.send(record).get(); } catch (Exception e) { e.printStackTrace(); }
- 동기적으로 메시지를 전송할 경우 전송 요청 스레드는 블로킹되어 다른 메시지를 전송할 수 없다.
3.3.2 비동기적으로 메시지 전송하기
- 비동기적으로 메시지를 전송하고 결과에 대한 에러를 처리해야 하는 경우를 위해 프로듀서는 레코드를 전송할 때 콜백을 지정할 수 있도록 한다.
private class DemoProducerCallback implements Callback { // 1) @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e != null) { e.printStackTrace(); } } } ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Biomedical Materials", "USA"); producer.send(record, new DemoProducerCallback()); // 2)
1) 콜백을 사용하려면 org.apache.kafka.clients.producer.Callback 인터페이스를 구현하는 클래스가 필요함
2) 레코드 전송할 때 Callback 객체를 함께 매개변수로 전달함
주의할 점
- 콜백은 프로듀서의 메인 스레드에서 실행된다.
- 만약 2개의 메시지를 동일한 파티션에 전송한다면, 콜백도 보낸 순서대로 실행됨
- 메인 스레드는 메시지 전송을 담당하므로 콜백 안에서 블로킹 작업을 수행하는 것은 권장되지 않는다. (콜백 처리로 메인 스레드가 블로킹되어 메시지 전송을 못하게 되니까)
3.4 프로듀서 설정하기
- 메모리 사용량이나 성능, 신뢰성 등에 영향을 미칠 수 있는 몇몇 설정값을 알아본다.
- 프로듀서가 카프카 브로커로 메시지를 전송하는 과정은 몇가지 단계로 구성되므로 각 단계별 설정이 존재한다.
3.4.1 client.id
- 프로듀서를 구분하기 위한 논리적 식별자
- 용도 : 브로커가 로그 메시지를 출력하거나 성능 메트릭 값 집계할 때, 클라이언트별로 사용량을 할당할 때 사용
- 중요한 이유 : 식별자를 명확히 설정하면 문제가 발생했을 때 트러블슈팅을 쉽게 해줌
3.4.2 acks
- 프로듀서가 임의의 쓰기 작업이 성공했다고 판별하기 위해 얼마나 많은 파티션 레플리카가 해당 레코드를 받아야 하는지 결정하는 값
- 설정 가능한 3가지 값
- acks=0
- 프로듀서는 메시지가 성공적으로 전달되었다고 간주하고 브로커의 응답을 기다리지 않음
- 메시지 유실 가능성 : 있음
- 브로커가 메시지를 받지 못했을 경우 프로듀서는 이를 알 수 없고 메시지 유실됨
- 프로듀서가 서버로부터 응답을 기다리지 않는 만큼 네트워크가 허용하는 한 빠르게 메시지를 보낼 수 있음
- 따라서 매우 높은 처리량이 필요할 때 사용될 수 있음
- acks=1
- 리더 레플리카가 메시지를 받는 순간 브로커로부터 성공 응답 수신
- 리더에 메시지를 쓸 수 없다면 프로듀서는 에러 응답 수신
- ex) 리더에 크래시 발생했지만 새 리더가 아직 선출되지 않은 상태
- 데이터 유실을 피하기 위해 메시지 재전송 시도함
- 메시지 유실 가능성 : 있음
- 리더에 메시지 쓰기는 성공했지만 해당 메시지가 나머지 레플리카에 복제가 안 된 상태에서 리더에 크래시가 나서 새 리더가 선출될 경우 메시지 유실될 수 있음
- acks=all
- 카프카 버전 3.0부터 기본값
- 메시지가 모든 인-싱크 레플리카(in-sync replica)에 전달된 뒤 브로커로부터 성공 응답 수신
- 메시지 유실 가능성 : 없음
- 가장 안전한 형태
- 최소 2개 이상의 브로커가 해당 메시지를 가지고 있어 크래시가 났을 경우에도 유실되지 않음
- acks=0
- 참고
- 프로듀서의 acks 설정을 내려잡아 신뢰성을 낮추면 레코드 전송을 빠르게 할 수 있음
- 즉, 신뢰성과 프로듀서 지연 사이에는 트레이드오프 존재
- 하지만, 레코드가 생성되어 컨슈머가 읽을 수 있을 때까지의 시간을 의미하는 종단 지연(end to end latency)은 세 값이 모두 동일함
- 카프카는 일관성을 유지하기 위해 모든 인-싱크 레플리카에 복제 완료한 뒤 컨슈머가 레코드를 읽어갈 수 있게 하기 때문
- 따라서 종단 지연을 고려해야하는 상황에서는 해당 설정을 고려하지 않아도 됨 (프로듀서의 설정이 컨슈머에 영향을 주지 않음)
- 프로듀서의 acks 설정을 내려잡아 신뢰성을 낮추면 레코드 전송을 빠르게 할 수 있음
3.4.3 메시지 전달 시간
- 카프카가 성공적으로 응답을 내려보내 줄 때까지 사용자가 기다릴 수 있는 시간이자 요청 실패를 인정하고 포기할 때까지 기다릴 수 있는 시간
- 아파치 카프카 2.1부터 ProducerRecord를 보낼 때 걸리는 시간을 두 구간으로 나눔
- 구간1: send()에 대한 비동기 호출이 이뤄진 시각부터 결과를 리턴할 때까지 걸리는 시간
- 이 시간 동안 send()를 호출한 스레드는 블록됨
- 구간2: send()에 대한 비동기 호출이 성공적으로 리턴한 시각부터 (성공했건 실패했건) 콜백이 호출될 때까지 걸리는 시간
- ProducerRecord가 전송을 위해 배치에 추가된 시점에서부터 카프카가 성공 응답을 보내거나, 재시도 불가능한 실패가 일어나거나, 전송을 위해 할당된 시간이 소진될 때까지의 시간과 동일함
- 참고) send()를 동기적으로 호출할 경우, 메시지를 보내는 스레드는 두 구간에 대해 연속적으로 블록되어 각각의 구간이 어느 정도 걸렸는지 알 수 없음
- 구간1: send()에 대한 비동기 호출이 이뤄진 시각부터 결과를 리턴할 때까지 걸리는 시간
1) max.block.ms
- 아래의 경우에 프로듀서가 얼마나 오랫동안 블록되는지 결정
- send()를 호출했을 때 : partitionsFor를 호출해 명시적으로 메타데이터를 요청했을 때
- 해당 메서드는 프로듀서의 전송 버퍼가 가득 차거나 메타데이터가 아직 사용 가능하지 않을 때 블록됨
- 이 상태에서 max.block.ms만큼 시간이 흐르면 예외 발생함
2) delivery.timeout.ms
- 레코드 전송 준비가 완료된 시점에서부터 브로커의 응답을 받거나 아니면 전송을 포기하게 되는 시점까지의 제한시간 결정
- 레코드 전송 준비가 완료된 시점 = 비동기 호출인 send()가 문제없이 리턴되고 레코드가 배치에 저장된 시점
- 전송을 포기하게 되는 시점 = 재시도까지 모두 수행한 후 전송 포기하고 에러 발생시키는 시점
- linger.ms와 request.timeout.ms보다 커야 함 (해당 조건 벗어난 설정으로 카프카 프로듀서 생성 시 예외 발생)
- 타임아웃 케이스
- 프로듀서가 재시도 하는 도중에 delivery.timeout.ms 초과했을 때
- 마지막으로 재시도 하기 전에 브로커가 리턴한 에러에 해당하는 예외와 함께 콜백 호출됨
- 레코드 배치가 전송을 기다리는 와중에 delivery.timeout.ms 초과했을 때
- 타임아웃 예외와 함께 콜백 호출됨
- 프로듀서가 재시도 하는 도중에 delivery.timeout.ms 초과했을 때
- 팁) 재시도 관련 설정 튜닝 시 재시도 횟수와 재시도 사이의 시간 간격을 조정하는 것보다 delivery.timeout.ms를 조정하면 됨
3) request.timeout.ms
- 프로듀서가 데이터를 전송할 때 카프카 브로커로부터 응답을 받기 위해 기다리는 최대 시간 결정
- 각각의 쓰기 요청 후 전송을 포기하기까지 대기하는 시간
- 재시도 시간이나 실제 전송 이전에 소요되는 시간 등을 포함하지 않음!
- 응답 없이 타임아웃 발생할 경우, 프로듀서는 재전송을 시도하거나 TimeoutException과 함께 콜백을 호출함
4) retries, retry.backoff.ms
- retries : 프로듀서가 메시지 전송을 포기하고 에러를 발생시킬 때까지 메시지 재전송하는 횟수 결정
- 기본적으로 재시도 사이에 100ms 동안 대기함
- 해당 간격은 retry.backoff.ms를 통해 조정할 수 있음
- 현재 버전의 카프카(카프카 3.0)는 이 값들을 조정하는 것을 권장하지 않음
- 대신, 크래시 난 브로커가 정상으로 돌아오기까지의 시간을 테스트한 뒤 delivery.timeout.ms 조정을 권장함
- 모든 파티션에 대해 새 리더가 선출되는 데 걸리는 시간
- 즉, 재전송을 시도하는 전체 시간이 카프카 클러스터가 크래시로부터 복구되기까지의 시간보다 더 길게 잡히도록 잡아 주는 것
- 대신, 크래시 난 브로커가 정상으로 돌아오기까지의 시간을 테스트한 뒤 delivery.timeout.ms 조정을 권장함
- 참고)
- 재시도는 재시도 가능한 일시적인 에러(파티션에 리더가 없는 경우)일 경우 적용 가능
- 재시도 불가능한 에러 및 재시도 횟수가 고갈되었을 경우는 별도 처리 필요함
3.4.4 linger.ms
- 현재 배치를 카프카 브로커에 전송하기 전까지 대기하는 시간 결정
- KafkaProducer는 현재 배치가 가득 차거나 linger.ms에 설정된 제한 시간이 되었을 때 메시지 배치를 전송함
- 기본적으로 프로듀서는 메시지 전송에 사용할 수 있는 스레드가 있을 때 곧바로 전송하도록 되어 있음
- linger.ms를 0보다 큰 값으로 설정하면 프로듀서가 브로커에 메시지 배치를 전송하기 전에 메시지를 추가할 수 있도록 몇 ms가량 더 기다리도록 할 수 있음 (지연이 조금 증가되는 대신 처리율 증대 가능)
3.4.5 buffer.memory
- 프로듀서가 메시지를 전송하기 전에 메시지를 대기시키는 버퍼의 크기(메모리의 양) 결정
- 버퍼 메모리가 가득 찼을 때 추가로 호출되는 send()는 max.block.ms 동안 블록되어 버퍼 메모리에 공간이 생기기를 기다림
- 해당 시간 동안 대기한 후에도 공간이 없으면 예외 발생
- 이 타임아웃은 send() 메서드에서 발생
3.4.6 compression.type
- 기본적으로 메시지는 압축되지 않은 상태로 전송됨
- snappy, gzip, lz4, zstd 중 하나 설정해 압축 알고리즘 적용 가능
- 압축 기능을 활성화하여 카프카로 메시지를 전송할 때 자주 병목이 되곤 하는 네트워크 사용량과 저장 공간을 절약할 수 있음
3.4.7 batch.size
- 각각의 배치에 사용될 메모리의 양 결정
- 같은 파티션에 다수의 레코드가 전송될 경우 프로듀서는 이것들을 배치 단위로 모아서 한꺼번에 전송함
- 배치가 가득 차면 해당 배치에 들어 있는 모든 메시지가 한꺼번에 전송됨
- 프로듀서가 각각의 배치가 가득 찰 때까지 기다린다는 의미는 아님! (절반만 차거나 하나의 메시지만 들어 있는 배치도 전송함)
- 따라서 이 매개변수를 지나치게 큰 값으로 유지한다고 해서 메시지 전송에 지연이 발생하지는 않음
- 반면, 이 값을 지나치게 작게 설정할 경우 프로듀서가 메시지를 자주 전송해야 해서 약간의 오버헤드가 발생할 수 있음
3.4.8 max.in.flight.requests.per.connection
- 프로듀서가 카프카 브로커로부터 응답을 받지 못한 상태에서 전송할 수 있는 최대 메시지의 수 결정
3.4.9 max.request.size
- 프로듀서가 전송하는 쓰기 요청의 크기 결정
- 메시지의 최대 크기와 한 번의 요청에 보낼 수 있는 메시지의 최대 개수 제한
- ex) 설정값 = 1MB
- 전송 가능한 메시지의 최대 크기는 1MB
- 한 번에 보낼 수 있는 1KB 크기의 메시지 개수는 1024개
- 카프카 브로커의 message.max.bytes 설정 고려해야 함
- 브로커가 받아들일 수 있는 최대 메시지 크기 결정
- 두 매개변수를 동일하게 맞춰 프로듀서가 브로커가 받을 수 없는 크기의 메시지를 전송하지 않게 하는 것이 좋음
3.4.10 receive.buffer.bytes, send.buffer.bytes
- 데이터를 읽거나 쓸 때 소켓이 사용하는 TCP 송수신 버퍼의 크기 결정
- -1일 경우 운영체제의 기본값 사용함
3.4.11 enable.idempotence
- true 설정 시 '정확히 한 번 전송'을 위한 멱등적 프로듀서 기능 활성화
- 아래 설정 조건도 만족해야 함
- max.in.flight.requests.per.connection <= 5
- retries >= 1
- acks = all
- 아래 설정 조건도 만족해야 함
- 멱등적 프로듀서 기능이 활성화 되면,
- 프로듀서는 레코드를 보낼 때마다 순차적인 번호를 붙여 보냄
- 만약 브로커가 동일한 번호를 가진 레코드를 2개 이상 받을 경우 하나만 저장하게 되고, 프로듀서는 별다른 문제를 발생시키지 않는 DuplicateSequenceException 수신함
3.6 파티셔너
https://fordevelop.tistory.com/239
3.7 헤더
- 레코드 헤더는 카프카 레코드의 key/value 값을 건드리지 않고 추가 메타데이터를 심을 때 사용함
- 헤더의 주된 용도 중 하나는 메시지의 전달 내역을 기록하는 것
- ex) 데이터가 생성된 곳의 정보 저장하여 헤더 정보만으로 메시지 라우팅 및 출처 추적 가능
'DevBook' 카테고리의 다른 글
[카프카 핵심 가이드] CH4. 카프카 컨슈머: 카프카에서 데이터 읽기 (2) (1) 2025.02.13 [카프카 핵심 가이드] CH4. 카프카 컨슈머: 카프카에서 데이터 읽기 (1) (0) 2025.02.11 [카프카 핵심 가이드] 추가 - 카프카 컨슈머 리밸런스 (0) 2025.02.11 [카프카 핵심 가이드] 3.6 파티셔너 (카프카 3.9 버전 반영) (0) 2025.02.06 [카프카 핵심 가이드] CH1. 카프카 시작하기 (0) 2025.02.04