1. Kafka
1). Kafka란?
실시간 데이터 피드를 관리하기 위해 통일된 높은 처리량, 낮은 지연 시간을 지닌 플랫폼 제공
- 모든 시스템으로 데이터를 실시간으로 전송하어 처할 수 있는 시스템
- 데이터가 많아지더라도 확장이 용이한 시스템
- Producer/Consumer 분리
- 메시지를 여러 Consumer에게 허용
- 높은 처리량을 위한 메시지 최적화
2). Kafka Broker
- 실행된 Kafka 애플리케이션 서버
- 3대 이상의 Broker Cluster 구성
- Zookeeper 연동
- 역할 : 메타데이터(Broker ID, Controller ID 등) 저장
- Controller 정보 저장
- n개 Broker 중 1대는 Controller 기능 수행
- Controller 역할
- 각 Broker에게 담당 파티션 할당 수행
- Broker 정사아 동작 모니터링 관리
3). Kafka Connect의 역할
- Kafka Connect를 통해 데이터를 Import/Export 가능
- 코드 없이 Configuration으로 데이터 이동
- Standalone mode, Distribution mode 지원
-- RESTful API를 통해 지원
-- Stream 또는 Batch 형태로 데이터 전송 가능
-- 커스텀 Connector를 통한 다양한 플러그인 제공 (File, S3, Hive, Mysql, etc..)
(1). Kafka Source Connect의 역할
- DB에 데이터가 추가되면 Source Connect에 의해 Consumer가 데이터를 받아 볼 수 있다. (Topic에 데이터가 쌓임)
(2). Kafka Sink Connect의 역할
- Consumer이 데이터를 받음과 동시에 이 데이터를 Sink Connect과 연결되어 있던 테이블에다가 동일하게 업데이트를 시켜줌.
2. Kafka 테스트
1). Kafka 다운로드
윈도우, 리눅스, MAC 버전이 따로 있지않고 그 안에 OS별 커맨드가 나뉘어져 있다.
https://kafka.apache.org/downloads
폴더를 하나 생성한 뒤 다운받은 Kafka.tar 파일을 압축해제한다
tar xvf kafka파일명
2). Kafka 서버 기동
// Windows에서 Kafka 실행 명령어는 \bin\windows에 있다
// Zookeeper 및 Kafka 서버 구동
PS C:\Work\kafka_2.12-3.4.0> .\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
PS C:\Work\kafka_2.12-3.4.0> .\bin\windows\kafka-server-start.bat .\config\server.properties
// 토픽 생성
PS C:\kafka_2.13-2.7.0> .\bin\windows\kafka-topics.bat --bootstrap-server localhost:9092 --create --topic quickstart-events --partitions 1
// 토픽 리스트 확인
PS C:\kafka_2.13-2.7.0> .\bin\windows\kafka-topics.bat --bootstrap-server localhost:9092 --list
// Topic 정보 확인
$KAFKA_HOME/bin/windows/kafka-topics.bat --describe --topic quickstart-events --bootstrap-server localhost:9092
3). Kafka Producer/Consumer 테스트
// 주키퍼, 카프카 서버 기동 후에 해야됨
// 메시지 생산
$KAFKA_HOME/bin/windows/kafka-console-producer.bat --broker-list localhost:9092 --topic quickstart-events
// 메시지 소비
$KAFKA_HOME/bin/windows/kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic quickstart-events --from-beginning
Producer의 메시지를 Consumer에서 바로 받는게 아니라
Consumer 에서 등록된 메시지를 토픽으로부터 받는 역할이 Consumer이다
토픽에다가 메시지를 전달하는게 Producer의 역할이므로 Producer과 Consumer은 직접적인 관련이 없다
3. Kafka Connect
1). MariaDB 설치 - Windows
2). Kafka Connect 설치 [Kafka와 DB 연동]
curl -O http://packages.confluent.io/archive/6.1/confluent-community-6.1.0.tar.gz
tar xvf confluent-community-6.1.0.tar.gz
cd $KAFKA_CONNECT_HOME
2). JDBC Connector 설정
// JDBC Connector 설치
https://www.confluent.io/hub/confluentinc/kafka-connect-jdbc
// ./etc/kafka/connect-distributed.properties 맨 아래에 plugin 정보 추가
plugin.path=\C:\\Work\\confluentinc-kafka-connect-jdbc-10.6.3\\lib
// JdbcSourceConnector에서 MariaDB를 사용하기 위해 Mariadb 드라이버 복사
//${USER.HOME}\.m2 폴더에서 mariadb-java-client-3.1.4.jar를
//./share/java/kafka/로 복사
C:\Users\PC\.m2\repository\org\mariadb\jdbc\mariadb-java-client\3.1.4\mariadb-java-client-3.1.4.jar
-> C:\confluent-6.1.0\share\java\kafka
3). Kafka Connect 실행
// confluent-7.3.1
PS C:\Work\confluent-7.3.1> .\bin\windows\connect-distributed.bat .\etc\kafka\connect-distributed.properties
4). log4j 경로 오류
log4j 경로는 config가 아니라 etc/kafka에 있다.
log4j 경로 수정
// C:\confluent-7.3.1\bin\windows\connect-distributed.bat
// 수정 전
rem Log4j settings
IF ["%KAFKA_LOG4J_OPTS%"] EQU [""] (
set KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:%BASE_DIR%/config/connect-log4j.properties
)
// 수정 후
rem Log4j settings
IF ["%KAFKA_LOG4J_OPTS%"] EQU [""] (
set KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:%BASE_DIR%/etc/kafka/connect-log4j.properties
)
5). Kafka Source Connect 테스트
6). 토픽 자동 생성 테스트
PS C:\Work\kafka_2.12-3.4.0> .\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic my_topic_users --from-beginning
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},
{"type":"string","optional":true,"field":"user_id"},
{"type":"string","optional":true,"field":"pwd"},
{"type":"string","optional":true,"field":"name"},
{"type":"int64","optional":true,"name":"org.apache.kafka.connect.data.Timestamp",
"version":1,"field":"created_at"}], "optional":false,"name":"users"},
"payload":{"id":5,"user_id":"user1","pwd":"root","name":"User name","created_at":1725321514000}}
테이블 정보와 등록한 데이터를 확인할 수 있다
7). Kafka Sink Connect 추가
8). Kafka Producer을 이용하여 Kafka Topic에 데이터 직접 전송
- Kafka-console-producer에서 데이터 전송 -> Topic 추가 -> MariaDB에 추가
기존에는 users 테이블에서 데이터 등록 -> producer -> my_topic_users에 등록 순서지만
producer에서 데이터를 등록했기 때문에 users 테이블에는 등록되지 않는다.
consumer에 나오는 형식으로 작성해야 된다.
'Java > Spring Boot' 카테고리의 다른 글
[MSA] Spring Cloud로 MSA를 개발해보자 12편 [장애 처리, 분산 추적] (0) | 2024.09.09 |
---|---|
[MSA] Spring Cloud로 MSA를 개발해보자 11편 [Kafka-2] (0) | 2024.09.06 |
[MSA] Spring Cloud로 MSA를 개발해보자 9편 [서비스간 통신-2] (0) | 2024.08.30 |
[MSA] Spring Cloud로 MSA를 개발해보자 8편 [서비스간 통신-1] (0) | 2024.08.29 |
[MSA] Spring Cloud로 MSA를 개발해보자 7편 [설정 정보 암호화] (0) | 2024.08.28 |