-
[Kafka Connect] 카프카 커넥트 클러스터 구성Work 2025. 2. 3. 21:23
작업 배경
- 검색 기능 제공을 위해 Kafka Connect를 사용해 MongoDB --> Kafka로의 데이터 파이프라인 구성
- Mongo Source Connector가 MongoDB의 게시글, 댓글 컬렉션 내 데이터 변경사항에 대한 change event를 조회해와 Kafka에 메시지 발행
- ES Sync가 해당 메시지를 컨슘하여 Elastic Search로 관련 데이터 전달
- Elastic Search는 검색 시 사용되고 있음
참고)
더보기Kafka Connect란?
- 데이터베이스, 키-밸류 스토어, 파일 시스템과 같은 외부 시스템을 kafka에 연결해주는 구성 요소로, Apache Kafka에서 제공하는 오픈소스
- Kafka Connect를 이용하면 다양한 제품을 시스템에 쉽게 연동할 수 있어 간편하게 데이터 파이프라인을 구축할 수 있음
- 2가지 타입의 connector 제공함
- source connector (source -> kafka)
- sink connector (kafka -> taget)
작업 내용
- Docker 컨테이너 기반 Kafka Connect 클러스터 구성
- Kafka Connect를 사용해 MongoDB에서 Kafka로의 게시글/댓글 데이터 파이프라인 구축
기술 스택
- Ubuntu 22.04, Docker, Kafka Connect
- 커넥트
- docker image version : confluentinc/cp-kafka-connect:latest
- Confluent Platform : v7.5.2, Apache Kafka : v3.5.x
- docker image version : confluentinc/cp-kafka-connect:latest
- 커넥터
- MongoDB Kafka Connector version : v1.11.0
- MongoDB Java driver : v4.7
- MongoDB Kafka Connector version : v1.11.0
- 커넥트
구조도
개발 환경
- Ubuntu VM 서버 내 Docker 컨테이너로 환경별 워커 1대씩 실행중
- 해당 워커가 connector, task 작업 모두 할당받아 실행중
운영 환경
- Ubuntu VM 서버 내 Docker 컨테이너로 환경별 워커 3대씩 실행중
- leader worker에 의해 connector, task 작업 할당받아 실행중
참고) 카프카 커넥트, 커넥터 요약
더보기요약
- 카프카 커넥트 클러스터에 2개의 카프카 커넥터가 등록되어 있는 것
카프카 커넥트 간단 요약
- 역할
- 카프카 커넥터를 실행하기 위한 장치
- 구조
- 카프카 커넥트는 클러스터 구조를 가지고, 현재 카프카 커넥트 3대가 1개의 클러스터를 구성하고 있음
- 즉, 카프카 커넥트 3대는 1개의 거대한 커넥트라고 생각하면 됨
- 3대가 띄워져있지만 모두 동일한 설정을 가지므로 모두 동일하게 동작함 → 서버 scale-out 된 상태와 유사한 개념
- Q&A
- 카프카 커넥트 설정 수정이 필요하다면?
- 클러스터를 구성하는 모든 카프카 커넥트들의 설정을 변경해줘야 함 → 동일한 설정을 가져야 모두 동일하게 동작하니까!
- ex) 현재 3대가 실행중이니까 3대 모두 수정이 필요하다!
- 카프카 커넥트 설정 수정이 필요하다면?
카프카 커넥터 간단 요약
- 역할
- mongodb에서 데이터를 조회해 kafka로 전송하는 메인 작업 수행
- 구조
- 카프카 커넥트에 등록하여 사용
- 현재 서로 다른 카프카 커넥터 2개가 등록되어 실행중인 상태 (card connector, reply connector)
- 커넥터라는 종류만 같을 뿐 2개는 서로 다른 것임
- Q&A
- 카프카 커넥터 설정 수정이 필요하다면?
- 수정이 필요한 커넥터에 대해서 각각 수정해주면 됨
- 카프카 커넥터 설정 조회 시 '커넥트 서버 ip'를 다르게 해도 동일한 값이 조회되는 이유는?
- 카프카 커넥터 설정 조회 API
- curl --location '{커넥트 서버 ip}:8083/connectors/{커넥터명}/tasks/{taskId}/status'
- 위에서 언급한 것처럼 카프카 커넥트 3대는 1개의 클러스터를 구성하므로, 카프카 커넥트 클러스터에 서로 다른 2개의 카프카 커넥터가 등록되어 있는 것으로 보면 됨
- 따라서 위의 curl에서 '커넥터명'을 동일하게 설정하면 '커넥트 서버 ip'에 상관없이 동일한 카프카 커넥터의 설정 조회를 하는 것임
- 즉, 카프카 커넥터 설정 조회 시 '커넥트 서버 ip' 3개 중 아무거나 사용해도 무방함
- 카프카 커넥터 설정 조회 API
- 카프카 커넥터 설정 수정이 필요하다면?
참고) 카프카 커넥트, 커넥터 구성 정보
더보기커넥트 구성 정보
- docker-compose.yml
version: '3' services: kafka-connect: image: confluentinc/cp-kafka-connect:latest container_name: kafka-connect ports: - 8083:8083 - 8778:8778 environment: CONNECT_BOOTSTRAP_SERVERS: {카프카 브로커 주소} CONNECT_PRODUCER_MAX_REQUEST_SIZE: 4048576 CONNECT_GROUP_ID: {클러스터 그룹 ID} CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter CONNECT_OFFSET_STORAGE_TOPIC: {offset topic명} CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 3 CONNECT_OFFSET_STORAGE_PARTITIONS: 3 CONNECT_CONFIG_STORAGE_TOPIC: {config topic명} CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 3 CONNECT_STATUS_STORAGE_TOPIC: {status topic명} CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 3 CONNECT_STATUS_STORAGE_PARTITIONS: 3 CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000 CONNECT_LISTENERS: http://0.0.0.0:8083 CONNECT_REST_PORT: 8083 CONNECT_REST_ADVERTISED_HOST_NAME: {서버 ip} CONNECT_REST_ADVERTISED_PORT: 8083 CONNECT_PLUGIN_PATH: /usr/share/java,/etc/kafka-connect/jars CONNECT_ACCESS_CONTROL_ALLOW_ORIGIN: '*' CONNECT_ACCESS_CONTROL_ALLOW_METHODS: GET,POST,PUT,DELETE,OPTIONS,HEAD CONNECT_LOG4J_LOGGERS: com.mongodb.kafka.connect=ERROR,org.reflections=ERROR KAFKA_JMX_OPTS: -Djava.rmi.server.hostname=127.0.0.1 -Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -javaagent:/etc/kafka-connect/jolokia-agent-jvm-2.0.0-javaagent.jar=port=8778,host=0.0.0.0 volumes: - /etc/kafka-connect/jars:/etc/kafka-connect/jars - /etc/kafka-connect/jolokia-agent-jvm-2.0.0-javaagent.jar:/etc/kafka-connect/jolokia-agent-jvm-2.0.0-javaagent.jar restart: unless-stopped user: root working_dir: /root privileged: true
커넥터 구성 정보
{ "name": {커넥터명}, "config": { "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector", "topic.prefix": "live", "database": {mongodb database명}, "tasks.max": "1", "poll.await.time.ms": "100", "connection.uri": {mongodb 연결 주소} "name": {커넥터명}, "collection": {컬렉션명}, "poll.max.batch.size": "1000" }, "tasks": [ { "connector": {커넥터명}, "task": 0 } ], "type": "source" }
마주쳤던 고민/이슈
Q. 유지보수하기 쉬우려면 클러스터를 어떻게 구성해야 할까?
- 조건
- 서비스 실행이 쉬워야 한다.
- 버전 관리가 쉬워야 한다.
- 적용
- docker를 사용해 컨테이너 기반 클러스터 구성
Q. 빠른 이슈 감지 및 대응이 가능하려면 어떻게 해야할까?
- 조건
- 커넥트나 커넥터가 다운되었을 때 빠른 확인이 가능해야 한다.
- 적용
- 메트릭 수집 및 시각화를 통한 모니터링 구성 - https://fordevelop.tistory.com/235
- 이슈 상황별 해결 방식 및 이슈 히스토리 문서 작성
- 해결 방식은 Grafana 에러 알림에 링크 첨부해서 바로 확인할 수 있게 함
이슈) Kafka Connect Producer RecordTooLargeException
더보기상황
- kafka connect → kafka topic으로 메시지 발행 불가
[2024-02-10 09:49:23,700] ERROR WorkerSourceTask{id=live.mountain.card-0} failed to send record to live.mountain.card: (org.apache.kafka.connect.runtime.AbstractWorkerSourceTask) org.apache.kafka.common.errors.RecordTooLargeException: The message is 1048988 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration. [2024-02-10 09:49:24,601] INFO 172.18.0.1 - - [10/Feb/2024:09:49:24 +0000] "GET /connectors HTTP/1.1" 200 44 "-" "curl/7.61.1" 1 (org.apache.kafka.connect.runtime.rest.RestServer) [2024-02-10 09:49:28,707] INFO WorkerSourceTask{id=live.mountain.card-0} Committing offsets for 16 acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask) [2024-02-10 09:49:29,675] INFO 172.18.0.1 - - [10/Feb/2024:09:49:29 +0000] "GET /connectors HTTP/1.1" 200 44 "-" "curl/7.61.1" 1 (org.apache.kafka.connect.runtime.rest.RestServer) [2024-02-10 09:49:29,788] ERROR WorkerSourceTask{id=live.mountain.card-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask) org.apache.kafka.connect.errors.ConnectException: Unrecoverable exception from producer send callback at org.apache.kafka.connect.runtime.WorkerSourceTask.maybeThrowProducerSendException(WorkerSourceTask.java:338) at org.apache.kafka.connect.runtime.WorkerSourceTask.prepareToSendRecord(WorkerSourceTask.java:132) at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.sendRecords(AbstractWorkerSourceTask.java:411) at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:367) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259) at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:77) at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1048988 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.
원인
- kafka topic으로 발행할 메시지의 크기가 producer의 max.request.size 설정값 보다 컸기 때문임 (설정값 : 1048567 bytes, 메시지 크기 : 1048988 bytes)
해결
- 커넥트 컨테이너 env로 CONNECT_PRODUCER_MAX_REQUEST_SIZE 설정 및 커넥트 재시작
- CONNECT_PRODUCER_MAX_REQUEST_SIZE: 4048576
- 주의 : 해당 값은 5272000 bytes 보다 작아야 함
- 해당 connector와 연결된 kafka topic의 message.max.byte 설정값이 5272000 bytes 이기 때문
- 메시지의 크기가 토픽의 message.max.byte 설정값보다 클 경우 카프카 브로커가 받지 못함
- 주의 : 해당 값은 5272000 bytes 보다 작아야 함
- CONNECT_PRODUCER_MAX_REQUEST_SIZE: 4048576
참고
- Worker Configuration 오버라이딩
- 기본적으로 connector는 worker(kafka connect) configuration을 상속받기 때문에 필요 시 상속받은 값을 오버라이딩 할 수 있음
- 이때 producer.override.* / consumer.override.* 로 각각 producer와 consumer 설정을 오버라이딩할 수 있음 cf) https://docs.confluent.io/platform/current/connect/references/allconfigs.html#override-the-worker-configuration
- 현재 사용중인 connector는 모두 source connector 이므로 producer 설정만 오버라이딩하면 됨 (source connector는 producer 역할이고, sink connector는 consumer 역할)
- kafka producer의 max.request.size
- producer가 kafka broker에 한 번 요청할 때 보낼 수 있는 최대 크기로, 전송할 메시지의 크기가 해당 값보다 클 경우 전송이 불가함
- kafka broker의 message.max.byte 설정과 연관있음
- 레코드 배치(단일 요청)의 최대 크기로, 토픽 별로 설정 가능함
- 해당 값보다 큰 크기의 메시지를 받을 수 없음
'Work' 카테고리의 다른 글
[Kafka Connect] 참고 - Telegraf로 커넥트 메트릭 수집하기 (0) 2025.02.03 [Kafka Connect] 카프카 커넥트 모니터링 (0) 2025.02.03 [번역 서비스] 로직 구현 시 고민한 부분 (0) 2025.01.30 [번역 서비스] 적용 - Delayed Retry Topic (0) 2025.01.28 [번역 서비스] 개선 - 번역 실패 모니터링 및 재처리 (0) 2025.01.09