1 분 소요

메세지를 보내고, 가져오는 작업에 있어서 Kafka Connect가 사용되고,
Source와 Sink가 쓰일 것이다.

Kafka Source Connect 등록

포스트맨을 이용해 MariaDB 관련되어있는 Source를 등록해보도록 하자.

포스트맨을 켜서 아래 데이터를 body에 담아 http://localhost:8083/connectors에 요청해보자.

🌟 주의
그 전에 zookeeper 서버 & kafka 서버 & Kafka Connect가 기동되고 있어야 한다!

# 현재 위치: /Users/minju/study/msa/kafka-demo/kafka_2.13-2.7.0
# zookeeper 실행 
$ ./bin/zookeeper-server-start.sh ./config/zookeeper.properties

# 현재 위치: /Users/minju/study/msa/kafka-demo/kafka_2.13-2.7.0
# kafka 실행 
$ ./bin/kafka-server-start.sh ./config/server.properties

# 현재 위치: /Users/minju/study/msa/kafka-demo/confluent-6.1.0
# Kafka Connect 실행 
$ ./bin/connect-distributed ./etc/kafka/connect-distributed.properties


🚨 아래에서 connection.userconnection.password는 본인의 mysql 설정에 맞게 변경해주어야 한다!

{
  "name" : "my-source-connect",
  "config" : {
  "connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector",
  "connection.url":"jdbc:mysql://localhost:3306/mydb",
  "connection.user":"admin",
  "connection.password":"***", 
  "mode": "incrementing",
  "incrementing.column.name" : "id",
  "table.whitelist":"users",
  "topic.prefix" : "my_topic_",
  "tasks.max" : "1"
  }
}

스크린샷 2022-10-09 오후 9 15 48

참고
포스트맨이 아닌, 터미널에서는 아래 커맨드를 실행하면 된다.

echo '
{
  "name" : "my-source-connect",
  "config" : {
  "connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector",
  "connection.url":"jdbc:mysql://localhost:3306/mydb",
  "connection.user":"admin",
  "connection.password":"***", 
  "mode": "incrementing",
  "incrementing.column.name" : "id",
  "table.whitelist":"users",
  "topic.prefix" : "my_topic_",
  "tasks.max" : "1"
  }
}
' | curl -X POST -d @- http://localhost:8083/connectors --header "content-Type:application/json"

테스트

connector 확인

포스트맨을 이용해 현재 등록되어진 connector를 확인해보자.
스크린샷 2022-10-09 오후 9 18 43
특정 connector에 대한 상세정보를 보고 싶다면, 아래와 같이 요청하자.
스크린샷 2022-10-09 오후 9 20 08

Topic 확인

데이터 소스를 등록해두었으니, 데이터가 변경되었을 때 토픽에 잘 쌓이는지 토픽을 확인해보자.

# kafka 디렉토리로 이동
$ cd ~/study/msa/kafka-demo/kafka_2.13-2.7.0

# Topic 목록 확인
$ ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --list

스크린샷 2022-10-09 오후 9 23 23
새로운 토픽이 아직 생성되지 않은 이유는, 데이터의 변경사항이 없기 때문이다.

그렇다면 데이터를 변경해보자.

# 홈 디렉토리로 이동
$ cd

# mysql 접속
$ mysql -u admin -p

mysql> use mydb;

# 데이터 insert
mysql> insert into users(user_id, pwd, name) values('user1', 'test1111', 'User name');

스크린샷 2022-10-09 오후 9 26 51

이제 다시 토픽을 확인해보자.
스크린샷 2022-10-09 오후 9 27 19
my_topic_users라는 새로운 토픽이 생성되었다.

consumer - 토픽 정보 확인

consumer에서 생성된 토픽의 정보를 확인해보자.

# 메시지 소비(consume)
$ ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my_topic_users --from-beginning

스크린샷 2022-10-09 오후 9 31 25
이전에 mysql에서 쿼리문으로 삽입한 데이터를 확인할 수 있다.

데이터를 하나 더 넣어보자. (총 insert 한 데이터는 두 개이다.)
스크린샷 2022-10-09 오후 9 37 40

스크린샷 2022-10-09 오후 9 39 37
consumer를 기동시켜놓은 상태이기 때문에, 어떠한 메시지가 등록되게 되면 consumer가 그것을 감지해서 화면에 뿌려준다.
위 데이터를 JSON 포멧에 맞춰 확인해보면 아래와 같다.
스크린샷 2022-10-09 오후 9 41 19
나중에 토픽을 이용해 데이터베이스에 자료를 저장하고 싶다면, 위에 보이는 포맷으로 전달해야 한다.

source connect에서 topic에 데이터를 전달하면, topic에 해당 데이터가 쌓이는 것까지 확인했다.
다음 글에서 다룰 sink connect가 하는 역할은, topic에 전달된 데이터를 가져와서 사용하는 것이다.
source connect를 통해, 직접 db에 데이터를 insert 하지 않아도 토픽에 자료를 전달하면, db에 insert 할 수 있다.



💛 개인 공부 기록용 블로그입니다. 👻

맨 위로 이동하기