3 분 소요

이 전 글에는 Kafka를 이용해 Producer와 Consumer를 사용해보았다.
이번에는 이어서 Kafka Connect를 사용해보자.

데이터베이스에 저장된 값을 또다른 데이터베이스에 이동하는 예제를 테스트해보기 위해, MariaDB부터 설치하자.

MariaDB

설치 및 테이블 생성

위 명령어로 mariadb 설치 후 시작해보자.

# mariadb 설치
$ brew install mariadb

# 시작
$ mysql.server start

# 종료
$ mysql.server stop

# 상태 확인
$ mysql.server status

이제 mysql 접속해보자.
접속할 때 username은 내가 따로 변경했기 때문에 admin으로 지정한 것이다. 보통은 root일 것이다.

# 접속
$ mysql -u admin -p

# 현재 어떤 데이터베이스(스키마)들이 있는지 확인
$ show databases;

접속했으면, mydb라는 이름의 데이터베이스를 새로 생성하자.
mydb가 가지고 있는 테이블들을 확인해보면, 아직 아무것도 없을 것이다.

# 데이터베이스 생성
mysql> create database mydb;

# 방금 생성한 mydb 사용
mysql> use mydb;

# 현재 사용 중인 데이터베이스가 가지고 있는 테이블들 확인
mysql> show tables;

스크린샷 2022-10-09 오후 6 05 45

order-service

order-service에서 기존에 사용하던 h2 데이터베이스 대신, MariaDB를 연동해보자.

pom.xml

<!-- MariaDB -->
<dependency>
    <groupId>org.mariadb.jdbc</groupId>
    <artifactId>mariadb-java-client</artifactId>
    <version>2.7.2</version>
</dependency>

이제 user-service를 기동해보자.
(유레카 서버->config-service->gateway-service->order-service 순으로 기동되어 있어야 한다.)

h2-console 접속

유레카 서버에 들어가서 order-service의 포트 번호 뒤를 /h2-console로 바꿔 h2 콘솔에 접속해보자.
그럼 기존에 아래 사진처럼 되어있을텐데, 그 아래 사진처럼 변경하자.

변경 전
스크린샷 2022-10-09 오후 6 17 09

변경 후
스크린샷 2022-10-09 오후 6 22 06

Saved Settings: Generic MySQL
Setting Name: Generic MySQL

Driver Class: org.mariadb.jdbc.Driver
JDVC URL: jdbc:mysql://localhost:3306/mydb
User Name: admin    # 본인이 설정한 username
Password: ***    # 본인이 설정한 password

테이블 생성

아래 쿼리문으로 users 테이블을 생성해보자.

create table users(
    id int auto_increment primary key,
    user_id varchar(20),
    pwd varchar(20),
    name varchar(20),
    created_at datetime default NOW()
);

우리가 Kafka Connect를 통해 하고싶은 작업은,
users 테이블에 새로운 데이터가 insert 될 때,
그 데이터를 감지했다가, 고스란히 새로운 데이터베이스에 옮기는 것이다.

이어서 orders 테이블을 생성해보자.

create table orders (
    id int auto_increment primary key,
    product_id varchar(20) not null,
    qty int default 0,
    unit_price int default 0,
    total_price int default 0,
    user_id varchar(50) not null,
    order_id varchar(50) not null,
    created_at datetime default NOW()
);

Apache Kafka 사용

Ecosystem - 2. Kafka Connect

  • Kafka Connect를 통해 Data를 Import/Export 가능
  • 프로기래밍 없이 Configuration 파일만으로 받아온 데이터를 다른쪽으로 이동 가능
  • Standalone mode, Distribution mode 지원
    - RESTful API를 통해 지원
    - Stream 또는 Batch 형태로 데이터 전송 가능
    - 커스텀 Connector를 통한 다양한 Plugin 제공 (File, S3, Hive, Mysql, etc …)
  • 데이터를 가져오는 쪽을 Connect Source라 하고, 데이터를 받는 쪽을 Connect Sink라 한다.

스크린샷 2022-10-09 오후 5 24 09

Kafka Connect 설치 및 실행

# Kafka Connect 설치
$ curl -O http://packages.confluent.io/archive/6.1/confluent-community-6.1.0.tar.gz

# 압축 해제
$ tar xvf confluent-community-6.1.0.tar.gz

# 압축 해제한 디렉토리로 이동
$ cd confluent-6.1.0

스크린샷 2022-10-09 오후 7 28 42

이제 Kafka Connect를 실행할건데, 그 전에 zookeeper 서버와 kafka 서버가 기동되고 있어야 한다!

# Kafka Connect 실행
$ ./bin/connect-distributed ./etc/kafka/connect-distributed.properties

Kafka Connect를 실행하기 전에 kafka 서버에서 토픽 리스트를 조회하고,
Kafka Connect를 실행한 뒤에 kafka 서버에서 토픽 리스트를 조회해보면 아래와 같다.
스크린샷 2022-10-09 오후 7 41 46
이 때 새로 생긴 것들은,
kafka connect가 source에서 읽어온 데이터들을 관리하기 위한 토픽들이며, kafka connect를 실행함으로써 자동으로 생긴 것이다.

JDBC Connector

Kafka Connect를 통해 한쪽에서 데이터를 읽어와서 다른쪽으로 전달하기 위해서 관계형 데이터베이스를 사용할 것이다.
java에서 관계형 데이터베이스를 사용하기 위해서는 JDBC 라이브러리를 이용해야 한다.
마찬가지로 Kafka Connect에서도 사용하려고 하는 소스에 맞는 JDBC Connector라는 것을 설치해야 한다.

1) JDBC Connector 설치

https://docs.confluent.io/5.5.1/connect/kafka-connect-jdbc/index.html
위 사이트에 들어가 아래 사진에 보이는 Download and extract the ZIP file을 클릭한 뒤, Downlod 한다.
스크린샷 2022-10-09 오후 7 06 49

그리고 다운받은 압축파일을 kafka가 있는 작업 디렉토리로 이동한다. (탐색기에서 이동해도 상관없다.)

# 압축파일이 저장된 위치로 이동
$ cd Downloads

# 압축파일을 작업 디렉토리로 이동
$ mv confluentinc-kafka-connect-jdbc-10.5.4.zip ~/study/msa/kafka-demo

# 압축 해제
$ tar xvf confluentinc-kafka-connect-jdbc-10.5.4.zip

# lib 안에 kafka-connect-jdbc-10.5.4.jar 파일이 위치해있다.
$ cd confluentinc-kafka-connect-jdbc-10.5.4/lib/

# 현재 위치를 읽어오기
$ pwd

스크린샷 2022-10-09 오후 8 12 18

스크린샷 2022-10-09 오후 8 17 30
kafka-connect-jdbc-10.5.4.jar 파일이 우리가 필요한 파일이다.
pwd 커맨드를 통해 해당 파일의 경로를 복사한다.

2) JDBC Connector 연동

이제 설정파일을 바꿀 것이기 때문에 Kafka Connect는 ctrl+c로 잠시 중지하자.
그리고 JDBC Connector를 연동하기 위해서 vim 또는 code를 통해 etc/kafka/connect-distributed.properties 파일을 수정할 것이다.

$ code ./etc/kafka/connect-distributed.properties

스크린샷 2022-10-09 오후 8 23 14

etc/kafka/connect-distributed.properties 파일 맨 아래 plugin.path에 이전에 pwd로 확인한 경로를 적어주면 된다.

변경 전
스크린샷 2022-10-09 오후 8 21 30

변경 후
스크린샷 2022-10-09 오후 8 21 55

3) JdbcSourceConnector에서 MariaDB 사용

마지막으로, JdbcSourceConnector에서 MariaDB를 사용하기 위해 mariadb 드라이버를 복사한다.

  • ./share/java/kafka/ 폴더에 mariadb-java-client-2.7.2.jar 파일 복사
# 홈 디렉토리로 이동
$ cd

$ cd .m2/repository/org/mariadb/jdbc/mariadb-java-client/2.7.2

$ cp ./mariadb-java-client-2.7.2.jar /Users/minju/study/msa/kafka-demo/confluent-6.1.0/share/java/kafka

아래 사진에 보이는 mariadb-java-client-2.7.2.jar 파일을 kafka connect 디렉토리(= confluent-6.1.0) 안에 있는 /share/java/kafka/에 복사할 것이다.
스크린샷 2022-10-09 오후 8 29 25

cp 커맨드 실행 후, 정상적으로 copy가 되었는지 확인해보자.
스크린샷 2022-10-09 오후 8 35 02
아래 사진을 보면, 정상적으로 copy가 되었음을 확인할 수 있다.
스크린샷 2022-10-09 오후 8 35 16



💛 개인 공부 기록용 블로그입니다. 👻

맨 위로 이동하기