ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [번역 서비스] 적용 - Delayed Retry Topic
    Work 2025. 1. 28. 22:03

    개요

    • Quarkus Kafka 라이브러리의 Delayed Retry Topic 전략을 적용해 재시도를 구현한 내용을 설명한다.

     

    사전 개념

    • Quarkus는 SmallRye Reactive Messaging 프레임워크를 통해 Apache Kafka와의 통신을 지원한다.
    • Quarkus에서 Kafka와의 통신을 위한 kafka connector 기능을 제공한다.
      • 어플리케이션은 message를 주고받는다.
        • message는 payload를 포함하고 있고 metadata를 포함하도록 확장될 수 있다.
        • kafka와의 통신에서 하나의 message를 kafka record와 매핑한다.
      • message는 channel을 통해 전송된다.
        • message를 발행하고 컨슘하기 위해 어플리케이션 컴포넌트가 channel과 연결된다.
        • kafka connector는 channel을 kafka topic에 매핑한다.
      • channel은 connector를 사용해 메시징 서버(Kafka, AMQP 등)와 연결된다.
        • connector는 incoming/outgoing message를 특정 channel에 매핑하도록 구성된다.
        • 각 connector는 특정 메시징 기술에 특화된다. Kafka에 특화된 connector는 smallrye-kafka로 명칭한다.
    • kafka connector는 카프카 브로커로부터 kafka record를 가져와 reactive messaging message로 매핑해준다.

     

    적용

    • 메시지를 처리하다 예외가 발생했을 경우에 대한 failure strategy를 적용할 수 있다.
    • 여기서 적용한 것은 delayed-retry-topic 전략이다. cf) failure management
      • 올바르게 처리되지 못한 record의 offset은 커밋되지만, 해당 record는 kafka retry topic에 발행되어 자동 재시도 되도록 함
      • 재시도 토픽들의 record들은 지정된 delay time 이후에 재시도 수행됨

     

     

    아래는 예시로 작성한 테스트용 코드임

    1) kafka connector 구성

    mp:
      messaging:
        outgoing:
          movies-out: # channel
            connector: smallrye-kafka
            topic: movies # kafka topic
            key:
              serializer: org.apache.kafka.common.serialization.StringSerializer
            value:
                serializer: org.apache.kafka.common.serialization.StringSerializer
    
        incoming:
          movies-in: # channel
            connector: smallrye-kafka
            topic: movies # kafka topic
            key:
              deserializer: org.apache.kafka.common.serialization.StringDeserializer
            value:
                deserializer: org.apache.kafka.common.serialization.StringDeserializer
            failure-strategy: delayed-retry-topic
            delayed-retry-topic:
              topics: my_retry_topic_2000,my_retry_topic_4000,my_retry_topic_10000 # delay time을 포함한 재시도 토픽명 지정
              max-retries: 3 # 최대 재시도 횟수
            dead-letter-queue:
              topic: dead-letter-topic-movies-in # dlq 토픽명 지정
            auto:
              offset:
                reset: earliest
    • 카프카 토픽 발행/컨슘을 위한 outgoing, incoming connector 구성
    • incoming connector의 실패 전략으로 delayed-retry-topic 설정
    • 재시도 토픽명에 _[DELAY_IN_MILLISECONDS] suffix를 붙여 재시도 delay time 명시

     

    2025-01-28 15:10:21,437 INFO  [io.sma.rea.mes.kafka] (Quarkus Main Thread) SRMSG18277: Delayed retry topics configured for channel `movies-in` with topics `[my_retry_topic_2000, my_retry_topic_4000, my_retry_topic_10000]`, max retries, `3`, timeout `120000`ms, dlq topic `dead-letter-topic-movies-in`
    • 설정 값 적용 확인

     

    2) Producer, Consumer 구성

    @ApplicationScoped
    public class MovieProducer {
    
        @Inject
        @Channel("movies-out")
        Emitter<Record<String, String>> emitter;
    
        public void sendMovieToKafka(Movie movie) {
            emitter.send(Record.of(movie.year, movie.title));
        }
    }
    @ApplicationScoped
    public class MovieConsumer {
    
        private final Logger logger = Logger.getLogger(MovieConsumer.class);
    
        @Incoming("movies-in")
        public void receive(Record<String, String> record) {
            messageOccurredException();
            logger.infof("Got a movie: %d - %s", record.key(), record.value());
        }
    
        public void messageOccurredException() {
            throw new RuntimeException("MovieConsumer 예외 발생!!");
        }
    }
    • 실패 전략 확인을 위해 Consumer 로직 내부에서 의도적으로 예외 발생시킴

     

    3) 동작 확인

    구성한 구조는 아래와 같다.

     

    2025-01-28 15:10:58,426 INFO  [io.sma.rea.mes.kafka] (vert.x-eventloop-thread-3) SRMSG18278: A message sent to channel `movies-in` has been nacked, sending the record to topic my_retry_topic_2000
    
    2025-01-28 15:11:00,453 INFO  [io.sma.rea.mes.kafka] (vert.x-eventloop-thread-4) SRMSG18278: A message sent to channel `movies-in` has been nacked, sending the record to topic my_retry_topic_4000
    
    2025-01-28 15:11:04,476 INFO  [io.sma.rea.mes.kafka] (vert.x-eventloop-thread-4) SRMSG18278: A message sent to channel `movies-in` has been nacked, sending the record to topic my_retry_topic_10000
    
    2025-01-28 15:11:14,515 INFO  [io.sma.rea.mes.kafka] (vert.x-eventloop-thread-4) SRMSG18278: A message sent to channel `movies-in` has been nacked, sending the record to topic dead-letter-topic-movies-in
    • 설정한 delay time 이후 재처리가 수행되었고, max retry 3회 이후 dlq로 메시지 발행됨

     

    • 메시지 헤더에는 위와 같은 정보들이 포함되고, 이전 메시지의 헤더 값들을 포함하는 구조

     

    추가)

    Q. 재시도할 때 delay time을 어떻게 적용할까?

    • 라이브러리 코드를 보면 producer에서 메시지를 바로 발행하고 consumer에서 메시지 조회 후 설정된 delay time 동안 기다렸다가 처리한다.
    // smallrye-reactive-messaging-kafka 라이브러리의 KafkaDelayedRetryTopic 클래스
    public Multi<? extends IncomingKafkaRecord<?, ?>> retryStream() { // retry topic consumer가 record 조회해오는 메서드
            KafkaLatestCommit latestCommit = new KafkaLatestCommit(this.vertx, this.configuration, this.consumer);
            this.consumer.setRebalanceListener((KafkaConsumerRebalanceListener)null, latestCommit);
            Multi<ConsumerRecord<?, ?>> subscribe = this.consumer.subscribe(new HashSet(this.retryTopics));
            latestCommit.capture(this.getContext());
            return subscribe.onItem().transform((record) -> new IncomingKafkaRecord(record, this.channel, -1, latestCommit, this, this.configuration.getCloudEvents(), this.configuration.getTracingEnabled())).onItem().transformToUni((record) -> {
                incrementRetryHeader(record.getHeaders()); // 헤더에 retry-count 1 증가시킨 값 추가
                Duration between = getDelay(record); // 토픽명의 suffix 값을 통해 delay 시간 조회
                return between.isNegative() ? Uni.createFrom().item(record) : Uni.createFrom().item(record).onItem().delayIt().by(between); // Uni의 delayIt() 사용
            }).concatenate(false);
        }

     

    참고

    댓글

Designed by Tistory.