ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Kafka Connect] 카프카 커넥트 클러스터 구성
    Work 2025. 2. 3. 21:23

    작업 배경

    • 검색 기능 제공을 위해 Kafka Connect를 사용해 MongoDB --> Kafka로의 데이터 파이프라인 구성
    • Mongo Source Connector가 MongoDB의 게시글, 댓글 컬렉션 내 데이터 변경사항에 대한 change event를 조회해와 Kafka에 메시지 발행
    • ES Sync가 해당 메시지를 컨슘하여 Elastic Search로 관련 데이터 전달
    • Elastic Search는 검색 시 사용되고 있음

     

    참고)

    더보기

    Kafka Connect란?

    • 데이터베이스, 키-밸류 스토어, 파일 시스템과 같은 외부 시스템을 kafka에 연결해주는 구성 요소로, Apache Kafka에서 제공하는 오픈소스
    • Kafka Connect를 이용하면 다양한 제품을 시스템에 쉽게 연동할 수 있어 간편하게 데이터 파이프라인을 구축할 수 있음
    • 2가지 타입의 connector 제공함
      • source connector (source -> kafka)
      • sink connector (kafka -> taget)

     

    작업 내용

    • Docker 컨테이너 기반 Kafka Connect 클러스터 구성
    • Kafka Connect를 사용해 MongoDB에서 Kafka로의 게시글/댓글 데이터 파이프라인 구축

     

    기술 스택

    • Ubuntu 22.04, Docker, Kafka Connect

     

    구조도

    개발 환경

    • Ubuntu VM 서버 내 Docker 컨테이너로 환경별 워커 1대씩 실행중
    • 해당 워커가 connector, task 작업 모두 할당받아 실행중

     

    운영 환경

    • Ubuntu VM 서버 내 Docker 컨테이너로 환경별 워커 3대씩 실행중
    • leader worker에 의해 connector, task 작업 할당받아 실행중

     

    참고) 카프카 커넥트, 커넥터 요약

    더보기

    요약

    • 카프카 커넥트 클러스터에 2개의 카프카 커넥터가 등록되어 있는 것

     

    카프카 커넥트 간단 요약

    • 역할
      • 카프카 커넥터를 실행하기 위한 장치
    • 구조
      • 카프카 커넥트는 클러스터 구조를 가지고, 현재 카프카 커넥트 3대가 1개의 클러스터를 구성하고 있음
      • 즉, 카프카 커넥트 3대는 1개의 거대한 커넥트라고 생각하면 됨
        • 3대가 띄워져있지만 모두 동일한 설정을 가지므로 모두 동일하게 동작함 → 서버 scale-out 된 상태와 유사한 개념
    • Q&A
      • 카프카 커넥트 설정 수정이 필요하다면?
        • 클러스터를 구성하는 모든 카프카 커넥트들의 설정을 변경해줘야 함 → 동일한 설정을 가져야 모두 동일하게 동작하니까!
        • ex) 현재 3대가 실행중이니까 3대 모두 수정이 필요하다!

     

    카프카 커넥터 간단 요약

    • 역할
      •  mongodb에서 데이터를 조회해 kafka로 전송하는 메인 작업 수행
    • 구조
      • 카프카 커넥트에 등록하여 사용
      • 현재 서로 다른 카프카 커넥터 2개가 등록되어 실행중인 상태 (card connector, reply connector)
        • 커넥터라는 종류만 같을 뿐 2개는 서로 다른 것임
    • Q&A
      • 카프카 커넥터 설정 수정이 필요하다면?
        • 수정이 필요한 커넥터에 대해서 각각 수정해주면 됨
      • 카프카 커넥터 설정 조회 시 '커넥트 서버 ip'를 다르게 해도 동일한 값이 조회되는 이유는?
        • 카프카 커넥터 설정 조회 API
          • curl --location '{커넥트 서버 ip}:8083/connectors/{커넥터명}/tasks/{taskId}/status'
        • 위에서 언급한 것처럼 카프카 커넥트 3대는 1개의 클러스터를 구성하므로, 카프카 커넥트 클러스터에 서로 다른 2개의 카프카 커넥터가 등록되어 있는 것으로 보면 됨
        • 따라서 위의 curl에서 '커넥터명'을 동일하게 설정하면 '커넥트 서버 ip'에 상관없이 동일한 카프카 커넥터의 설정 조회를 하는 것임
        • 즉, 카프카 커넥터 설정 조회 시 '커넥트 서버 ip' 3개 중 아무거나 사용해도 무방함

     

    참고) 카프카 커넥트, 커넥터 구성 정보

    더보기

    커넥트 구성 정보

    • docker-compose.yml
    version: '3'
    services:
      kafka-connect:
        image: confluentinc/cp-kafka-connect:latest
        container_name: kafka-connect
        ports:
          - 8083:8083
          - 8778:8778
        environment:
          CONNECT_BOOTSTRAP_SERVERS: {카프카 브로커 주소}
          CONNECT_PRODUCER_MAX_REQUEST_SIZE: 4048576
          CONNECT_GROUP_ID: {클러스터 그룹 ID}
          CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
          CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
          CONNECT_OFFSET_STORAGE_TOPIC: {offset topic명}
          CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 3
          CONNECT_OFFSET_STORAGE_PARTITIONS: 3
          CONNECT_CONFIG_STORAGE_TOPIC: {config topic명}
          CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 3
          CONNECT_STATUS_STORAGE_TOPIC: {status topic명}
          CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 3
          CONNECT_STATUS_STORAGE_PARTITIONS: 3
          CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
          CONNECT_LISTENERS: http://0.0.0.0:8083
          CONNECT_REST_PORT: 8083
          CONNECT_REST_ADVERTISED_HOST_NAME: {서버 ip}
          CONNECT_REST_ADVERTISED_PORT: 8083
          CONNECT_PLUGIN_PATH: /usr/share/java,/etc/kafka-connect/jars
          CONNECT_ACCESS_CONTROL_ALLOW_ORIGIN: '*'
          CONNECT_ACCESS_CONTROL_ALLOW_METHODS: GET,POST,PUT,DELETE,OPTIONS,HEAD
          CONNECT_LOG4J_LOGGERS: com.mongodb.kafka.connect=ERROR,org.reflections=ERROR
          KAFKA_JMX_OPTS: -Djava.rmi.server.hostname=127.0.0.1 -Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -javaagent:/etc/kafka-connect/jolokia-agent-jvm-2.0.0-javaagent.jar=port=8778,host=0.0.0.0
        volumes:
          - /etc/kafka-connect/jars:/etc/kafka-connect/jars
          - /etc/kafka-connect/jolokia-agent-jvm-2.0.0-javaagent.jar:/etc/kafka-connect/jolokia-agent-jvm-2.0.0-javaagent.jar
        restart: unless-stopped
        user: root
        working_dir: /root
        privileged: true

     

    커넥터 구성 정보

    {
        "name": {커넥터명},
        "config": {
            "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
            "topic.prefix": "live",
            "database": {mongodb database명},
            "tasks.max": "1",
            "poll.await.time.ms": "100",
            "connection.uri": {mongodb 연결 주소}
            "name": {커넥터명},
            "collection": {컬렉션명},
            "poll.max.batch.size": "1000"
        },
        "tasks": [
            {
                "connector": {커넥터명},
                "task": 0
            }
        ],
        "type": "source"
    }

     

    마주쳤던 고민/이슈

    Q. 유지보수하기 쉬우려면 클러스터를 어떻게 구성해야 할까?

    • 조건
      • 서비스 실행이 쉬워야 한다.
      • 버전 관리가 쉬워야 한다.
    • 적용
      • docker를 사용해 컨테이너 기반 클러스터 구성

     

    Q. 빠른 이슈 감지 및 대응이 가능하려면 어떻게 해야할까?

    • 조건
      • 커넥트나 커넥터가 다운되었을 때 빠른 확인이 가능해야 한다.
    • 적용
      • 메트릭 수집 및 시각화를 통한 모니터링 구성 - https://fordevelop.tistory.com/235
      • 이슈 상황별 해결 방식 및 이슈 히스토리 문서 작성
        • 해결 방식은 Grafana 에러 알림에 링크 첨부해서 바로 확인할 수 있게 함

     

    이슈) Kafka Connect Producer RecordTooLargeException

    더보기

    상황

    • kafka connect → kafka topic으로 메시지 발행 불가
    [2024-02-10 09:49:23,700] ERROR WorkerSourceTask{id=live.mountain.card-0} failed to send record to live.mountain.card:  (org.apache.kafka.connect.runtime.AbstractWorkerSourceTask)
    org.apache.kafka.common.errors.RecordTooLargeException: The message is 1048988 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.
    [2024-02-10 09:49:24,601] INFO 172.18.0.1 - - [10/Feb/2024:09:49:24 +0000] "GET /connectors HTTP/1.1" 200 44 "-" "curl/7.61.1" 1 (org.apache.kafka.connect.runtime.rest.RestServer)
    [2024-02-10 09:49:28,707] INFO WorkerSourceTask{id=live.mountain.card-0} Committing offsets for 16 acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask)
    [2024-02-10 09:49:29,675] INFO 172.18.0.1 - - [10/Feb/2024:09:49:29 +0000] "GET /connectors HTTP/1.1" 200 44 "-" "curl/7.61.1" 1 (org.apache.kafka.connect.runtime.rest.RestServer)
    [2024-02-10 09:49:29,788] ERROR WorkerSourceTask{id=live.mountain.card-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
    org.apache.kafka.connect.errors.ConnectException: Unrecoverable exception from producer send callback
            at org.apache.kafka.connect.runtime.WorkerSourceTask.maybeThrowProducerSendException(WorkerSourceTask.java:338)
            at org.apache.kafka.connect.runtime.WorkerSourceTask.prepareToSendRecord(WorkerSourceTask.java:132)
            at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.sendRecords(AbstractWorkerSourceTask.java:411)
            at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:367)
            at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
            at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
            at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:77)
            at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)
            at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
            at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
            at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
            at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
            at java.base/java.lang.Thread.run(Thread.java:829)
    Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1048988 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.

     

    원인

    • kafka topic으로 발행할 메시지의 크기가 producer의 max.request.size 설정값 보다 컸기 때문임 (설정값 : 1048567 bytes, 메시지 크기 : 1048988 bytes)

     

    해결

    • 커넥트 컨테이너 env로 CONNECT_PRODUCER_MAX_REQUEST_SIZE 설정 및 커넥트 재시작
      • CONNECT_PRODUCER_MAX_REQUEST_SIZE: 4048576
        • 주의 : 해당 값은 5272000 bytes 보다 작아야 함
          • 해당 connector와 연결된 kafka topic의 message.max.byte 설정값이 5272000 bytes 이기 때문
          • 메시지의 크기가 토픽의 message.max.byte 설정값보다 클 경우 카프카 브로커가 받지 못함

     

    참고

    • Worker Configuration 오버라이딩
      • 기본적으로 connector는 worker(kafka connect) configuration을 상속받기 때문에 필요 시 상속받은 값을 오버라이딩 할 수 있음
      • 이때 producer.override.* / consumer.override.* 로 각각 producer와 consumer 설정을 오버라이딩할 수 있음 cf) https://docs.confluent.io/platform/current/connect/references/allconfigs.html#override-the-worker-configuration
      • 현재 사용중인 connector는 모두 source connector 이므로 producer 설정만 오버라이딩하면 됨 (source connector는 producer 역할이고, sink connector는 consumer 역할)
    • kafka producer의 max.request.size
      • producer가 kafka broker에 한 번 요청할 때 보낼 수 있는 최대 크기로, 전송할 메시지의 크기가 해당 값보다 클 경우 전송이 불가함
      • kafka broker의 message.max.byte 설정과 연관있음
        • 레코드 배치(단일 요청)의 최대 크기로, 토픽 별로 설정 가능함
        • 해당 값보다 큰 크기의 메시지를 받을 수 없음

    댓글

Designed by Tistory.