-
[번역 서비스] 적용 - Delayed Retry TopicWork 2025. 1. 28. 22:03
개요
- Quarkus Kafka 라이브러리의 Delayed Retry Topic 전략을 적용해 재시도를 구현한 내용을 설명한다.
사전 개념
- Quarkus는 SmallRye Reactive Messaging 프레임워크를 통해 Apache Kafka와의 통신을 지원한다.
- Quarkus에서 Kafka와의 통신을 위한 kafka connector 기능을 제공한다.
- 어플리케이션은 message를 주고받는다.
- message는 payload를 포함하고 있고 metadata를 포함하도록 확장될 수 있다.
- kafka와의 통신에서 하나의 message를 kafka record와 매핑한다.
- message는 channel을 통해 전송된다.
- message를 발행하고 컨슘하기 위해 어플리케이션 컴포넌트가 channel과 연결된다.
- kafka connector는 channel을 kafka topic에 매핑한다.
- channel은 connector를 사용해 메시징 서버(Kafka, AMQP 등)와 연결된다.
- connector는 incoming/outgoing message를 특정 channel에 매핑하도록 구성된다.
- 각 connector는 특정 메시징 기술에 특화된다. Kafka에 특화된 connector는 smallrye-kafka로 명칭한다.
- 어플리케이션은 message를 주고받는다.
- kafka connector는 카프카 브로커로부터 kafka record를 가져와 reactive messaging message로 매핑해준다.
적용
- 메시지를 처리하다 예외가 발생했을 경우에 대한 failure strategy를 적용할 수 있다.
- 여기서 적용한 것은 delayed-retry-topic 전략이다. cf) failure management
- 올바르게 처리되지 못한 record의 offset은 커밋되지만, 해당 record는 kafka retry topic에 발행되어 자동 재시도 되도록 함
- 재시도 토픽들의 record들은 지정된 delay time 이후에 재시도 수행됨
아래는 예시로 작성한 테스트용 코드임
1) kafka connector 구성
mp: messaging: outgoing: movies-out: # channel connector: smallrye-kafka topic: movies # kafka topic key: serializer: org.apache.kafka.common.serialization.StringSerializer value: serializer: org.apache.kafka.common.serialization.StringSerializer incoming: movies-in: # channel connector: smallrye-kafka topic: movies # kafka topic key: deserializer: org.apache.kafka.common.serialization.StringDeserializer value: deserializer: org.apache.kafka.common.serialization.StringDeserializer failure-strategy: delayed-retry-topic delayed-retry-topic: topics: my_retry_topic_2000,my_retry_topic_4000,my_retry_topic_10000 # delay time을 포함한 재시도 토픽명 지정 max-retries: 3 # 최대 재시도 횟수 dead-letter-queue: topic: dead-letter-topic-movies-in # dlq 토픽명 지정 auto: offset: reset: earliest
- 카프카 토픽 발행/컨슘을 위한 outgoing, incoming connector 구성
- incoming connector의 실패 전략으로 delayed-retry-topic 설정
- 재시도 토픽명에 _[DELAY_IN_MILLISECONDS] suffix를 붙여 재시도 delay time 명시
2025-01-28 15:10:21,437 INFO [io.sma.rea.mes.kafka] (Quarkus Main Thread) SRMSG18277: Delayed retry topics configured for channel `movies-in` with topics `[my_retry_topic_2000, my_retry_topic_4000, my_retry_topic_10000]`, max retries, `3`, timeout `120000`ms, dlq topic `dead-letter-topic-movies-in`
- 설정 값 적용 확인
2) Producer, Consumer 구성
@ApplicationScoped public class MovieProducer { @Inject @Channel("movies-out") Emitter<Record<String, String>> emitter; public void sendMovieToKafka(Movie movie) { emitter.send(Record.of(movie.year, movie.title)); } }
@ApplicationScoped public class MovieConsumer { private final Logger logger = Logger.getLogger(MovieConsumer.class); @Incoming("movies-in") public void receive(Record<String, String> record) { messageOccurredException(); logger.infof("Got a movie: %d - %s", record.key(), record.value()); } public void messageOccurredException() { throw new RuntimeException("MovieConsumer 예외 발생!!"); } }
- 실패 전략 확인을 위해 Consumer 로직 내부에서 의도적으로 예외 발생시킴
3) 동작 확인
구성한 구조는 아래와 같다.
2025-01-28 15:10:58,426 INFO [io.sma.rea.mes.kafka] (vert.x-eventloop-thread-3) SRMSG18278: A message sent to channel `movies-in` has been nacked, sending the record to topic my_retry_topic_2000 2025-01-28 15:11:00,453 INFO [io.sma.rea.mes.kafka] (vert.x-eventloop-thread-4) SRMSG18278: A message sent to channel `movies-in` has been nacked, sending the record to topic my_retry_topic_4000 2025-01-28 15:11:04,476 INFO [io.sma.rea.mes.kafka] (vert.x-eventloop-thread-4) SRMSG18278: A message sent to channel `movies-in` has been nacked, sending the record to topic my_retry_topic_10000 2025-01-28 15:11:14,515 INFO [io.sma.rea.mes.kafka] (vert.x-eventloop-thread-4) SRMSG18278: A message sent to channel `movies-in` has been nacked, sending the record to topic dead-letter-topic-movies-in
- 설정한 delay time 이후 재처리가 수행되었고, max retry 3회 이후 dlq로 메시지 발행됨
- 메시지 헤더에는 위와 같은 정보들이 포함되고, 이전 메시지의 헤더 값들을 포함하는 구조
추가)
Q. 재시도할 때 delay time을 어떻게 적용할까?
- 라이브러리 코드를 보면 producer에서 메시지를 바로 발행하고 consumer에서 메시지 조회 후 설정된 delay time 동안 기다렸다가 처리한다.
// smallrye-reactive-messaging-kafka 라이브러리의 KafkaDelayedRetryTopic 클래스 public Multi<? extends IncomingKafkaRecord<?, ?>> retryStream() { // retry topic consumer가 record 조회해오는 메서드 KafkaLatestCommit latestCommit = new KafkaLatestCommit(this.vertx, this.configuration, this.consumer); this.consumer.setRebalanceListener((KafkaConsumerRebalanceListener)null, latestCommit); Multi<ConsumerRecord<?, ?>> subscribe = this.consumer.subscribe(new HashSet(this.retryTopics)); latestCommit.capture(this.getContext()); return subscribe.onItem().transform((record) -> new IncomingKafkaRecord(record, this.channel, -1, latestCommit, this, this.configuration.getCloudEvents(), this.configuration.getTracingEnabled())).onItem().transformToUni((record) -> { incrementRetryHeader(record.getHeaders()); // 헤더에 retry-count 1 증가시킨 값 추가 Duration between = getDelay(record); // 토픽명의 suffix 값을 통해 delay 시간 조회 return between.isNegative() ? Uni.createFrom().item(record) : Uni.createFrom().item(record).onItem().delayIt().by(between); // Uni의 delayIt() 사용 }).concatenate(false); }
- SmallRye Mutiny Uni의 delayIt()을 사용해 처리를 지연시킴 (https://smallrye.io/smallrye-mutiny/latest/guides/delaying-events/)
참고
'Work' 카테고리의 다른 글
[번역 서비스] 로직 구현 시 고민한 부분 (0) 2025.01.30 [번역 서비스] 개선 - 번역 실패 모니터링 및 재처리 (0) 2025.01.09 [번역 서비스] 개선 - 번역 실패 모니터링 아키텍처 (0) 2025.01.08 [번역 서비스] 번역 시스템 설계 (0) 2024.12.29