2 분 소요

이전 글에서 source connect에서 topic에 데이터를 전달하면, topic에 해당 데이터가 쌓이는 것까지 확인했다.
sink connect가 하는 역할은, topic에 전달된 데이터를 가져와서 사용하는 것이다.
(Kafka-console-producer에서 데이터 전송 -> Topic에 추가 -> MariaDB에 추가)

Kafka Sink Connect 등록

Sink Connect를 추가하는 방법은 Source Connect를 추가하는 방법과 유사하다.
이전과 마찬가지로 포스트맨을 이용해 MariaDB 관련되어있는 Sink를 등록해보도록 하자.

포스트맨을 켜서 아래 데이터를 body에 담아 http://localhost:8083/connectors에 요청해보자.

🌟 주의
그 전에 zookeeper 서버 & kafka 서버 & Kafka Connect가 기동되고 있어야 한다!

# 현재 위치: /Users/minju/study/msa/kafka-demo/kafka_2.13-2.7.0
# zookeeper 실행 
$ ./bin/zookeeper-server-start.sh ./config/zookeeper.properties

# 현재 위치: /Users/minju/study/msa/kafka-demo/kafka_2.13-2.7.0
# kafka 실행 
$ ./bin/kafka-server-start.sh ./config/server.properties

# 현재 위치: /Users/minju/study/msa/kafka-demo/confluent-6.1.0
# Kafka Connect 실행 
$ ./bin/connect-distributed ./etc/kafka/connect-distributed.properties


🚨 아래에서 connection.userconnection.password는 본인의 mysql 설정에 맞게 변경해주어야 한다!

{
  "name":"my-sink-connect",
  "config":{
  "connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
  "connection.url":"jdbc:mysql://localhost:3306/mydb",
  "connection.user":"admin",
  "connection.password":"***",
  "auto.create":"true",
  "auto.evolve":"true",
  "delete.enabled":"false",
  "tasks.max":"1",
  "topics":"my_topic_users"
  }
}

스크린샷 2022-10-09 오후 11 08 01

참고
포스트맨이 아닌, 터미널에서는 아래 커맨드를 실행하면 된다.

echo '
{
  "name":"my-sink-connect",
  "config":{
  "connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
  "connection.url":"jdbc:mysql://localhost:3306/mydb",
  "connection.user":"admin",
  "connection.password":"***",
  "auto.create":"true",
  "auto.evolve":"true",
  "delete.enabled":"false",
  "tasks.max":"1",
  "topics":"my_topic_users"
  }
}
'| curl -X POST -d @- http://localhost:8083/connectors --header "content-Type:application/json"

테스트

connector 확인

포스트맨을 이용해 현재 등록되어진 connector를 확인해보자.
스크린샷 2022-10-09 오후 11 10 23
특정 connector에 대한 상세정보를 보고 싶다면, 아래와 같이 요청하자.
스크린샷 2022-10-09 오후 11 11 10

sink connect 테이블 확인

정상적으로 sink connector가 만들어졌다는 것은, 데이터베이스에 토픽의 이름과 같은 형태의 테이블이 생성되었다는 것이다.
스크린샷 2022-10-09 오후 11 13 21

그렇다면, 기존의 users 테이블과 새로 생긴 my_topic_users 테이블의 데이터를 비교해보자.
스크린샷 2022-10-09 오후 11 14 26
두 테이블에 들어있는 데이터가 같음을 확인할 수 있다.

이번에는, users 테이블에 새로운 데이터를 insert 한 뒤에 my_topic_users 테이블을 확인해보자.
스크린샷 2022-10-09 오후 11 16 38
users 테이블에는 당연히 값이 추가되었겠지만, my_topic_users 테이블에도 추가된 것을 확인할 수 있다.

따라서 Kafka Connect를 이용하면, source connect를 통해 전달된 데이터가 sink connect와 연결된 테이블에까지 이어진다는 것을 확인했다.
지금까지는, 데이터베이스에서 insert 쿼리를 이용해 topic에 데이터를 추가하고, 그 topic에 추가된 데이터가 sink connect 테이블(=my_topic_users)에까지 들어오는 것을 확인해보았다.
이제부터 ㅇㅇㅇ

Kafka console producer 실행

$ ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic_users

우리는 토픽에 저장된 데이터가 sink connect에도 같이 저장되기를 원하는 것이다.
그러기 위해서는 sink connect가 알 수 있는 JDBC 데이터 포맷으로 데이터를 보내야 한다.
해당 포맷은 Kafka console consumer에서 확인할 수 있는 데이터 포맷이다.
스크린샷 2022-10-09 오후 11 26 32

이를 아래와 같이 변형해보자.

{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"user_id"},{"type":"string","optional":true,"field":"pwd"},{"type":"string","optional":true,"field":"name"},{"type":"int64","optional":true,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"created_at"}],"optional":false,"name":"users"},"payload":{"id":7,"user_id":"my_id","pwd":"my_password","name":"Ann","created_at":1665400364000}}

Kafka console producer
마지막에 있는 payload 값만 원하는 값으로 변경해 producer에 전달해보자.
스크린샷 2022-10-09 오후 11 33 29

Kafka console consumer
그럼 해당 값이 topic에 전달되어 consumer가 받아오는 것을 확인할 수 있다.
스크린샷 2022-10-09 오후 11 35 55

그렇다면 이 데이터가 데이터베이스에도 추가되었을까?
users 데이터베이스를 확인해보자.
스크린샷 2022-10-09 오후 11 59 33
당연히 users에는 추가되지 않았다.
왜냐하면 users는 토픽으로부터 데이터를 가져오는게 아니라, 자신이 가지고 있는 데이터를 토픽으로 밀어넣는 개념이기 때문이다.

이제 my_topic_users 데이터베이스를 확인해보자.
스크린샷 2022-10-10 오전 12 01 50
my_topic_users에는 producer에 전달한 데이터가 추가되었다.
왜냐하면 my_topic_users sink connect와 연결되어있어, 토픽으로부터 데이터를 가져올 수 있기 때문이다.

이렇게 sink connect만 가지고도, 토픽에 업데이트가 발생할 때 변경된 값을 타겟 소스(데이터베이스 혹은 스토리지)에 가져올 수 있다.



💛 개인 공부 기록용 블로그입니다. 👻

맨 위로 이동하기