[E-commerce App] Multiple order-service에서 Kafka Connect를 활용해 단일 DB 사용하기 - 2. Order Kafka Producer
토픽에 정보를 넣는 것 만으로 db에 데이터를 insert 하기 위해서는, 꼭 아래와 같은 JSON 형태로 넣어야 했다.
이를 java 코드로 바꾸면 아래와 같다.
위 사진에서처럼 schema
와 payload
를 각각 객체로 만들고,
이들을 담고 있는 dto
객체를 하나 만들자.
세부적으로는, schema
를 구성하고 있는 구성 요소 중 fields
는 리스트 형태로 되어있기 때문에, field
객체도 따로 만들자.
따라서 총 4개의 dto 를 만들 것이다.
KafkaOrderDto
Schema
Field
Payload
order-service
🗂 dto
KafkaOrderDto.java
@Data
@AllArgsConstructor
public class KafkaOrderDto implements Serializable {
private Schema schema;
private Payload payload;
}
Schema.java
@Data
@Builder
public class Schema {
private String type;
private List<Field> fields;
private boolean optional;
private String name;
}
Field.java
@Data
@AllArgsConstructor
public class Field {
private String type;
private boolean optional;
private String field;
}
Payload.java
@Data
@Builder
public class Payload {
private String order_id;
private String user_id;
private String product_id;
private int qty;
private int unit_price;
private int total_price;
}
이제 이 dto 들을 사용할 수 있는 message 클래스를 만들자.
🗂 messagequeue
OrderProducer.java
@Service
@Slf4j
public class OrderProducer {
private KafkaTemplate<String, String> kafkaTemplate;
List<Field> fields = Arrays.asList(
new Field("string", true, "order_id"),
new Field("string", true, "user_id"),
new Field("string", true, "product_id"),
new Field("int32", true, "qty"),
new Field("int32", true, "unit_price"),
new Field("int32", true, "total_price")
);
Schema schema = Schema.builder()
.type("struct")
.fields(fields)
.optional(false)
.name("orders")
.build();
@Autowired
public OrderProducer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public OrderDto send(String topic, OrderDto orderDto) {
Payload payload = Payload.builder()
.order_id(orderDto.getOrderId())
.user_id(orderDto.getUserId())
.product_id(orderDto.getProductId())
.qty(orderDto.getQty())
.unit_price(orderDto.getUnitPrice())
.total_price(orderDto.getTotalPrice())
.build();
// 토픽에 전달할 객체 생성
KafkaOrderDto kafkaOrderDto = new KafkaOrderDto(schema, payload);
ObjectMapper mapper = new ObjectMapper();
String jsonInString = "";
try {
jsonInString = mapper.writeValueAsString(kafkaOrderDto);
} catch(JsonProcessingException ex) {
ex.printStackTrace();
}
kafkaTemplate.send(topic, jsonInString);
log.info("Order Producer sent data from the Order microservice: " + kafkaOrderDto);
return orderDto;
}
}
🗂 controller
OrderController.java
@RestController
@RequestMapping("/order-service")
public class OrderController {
...
@PostMapping("/{userId}/orders")
public ResponseEntity<ResponseOrder> createOrder(@PathVariable("userId") String userId, @RequestBody RequestOrder requestOrder) {
ModelMapper mapper = new ModelMapper();
mapper.getConfiguration().setMatchingStrategy(MatchingStrategies.STRICT);
OrderDto orderDto = mapper.map(requestOrder, OrderDto.class);
orderDto.setUserId(userId);
/* jpa */
// OrderDto createdOrder = orderService.createOrder(orderDto);
// ResponseOrder responseOrder = mapper.map(createdOrder, ResponseOrder.class);
/* 🌟 kafka */
orderDto.setOrderId(UUID.randomUUID().toString());
orderDto.setTotalPrice(requestOrder.getQty() * requestOrder.getUnitPrice());
/* send this order to the kafka */
kafkaProducer.send("example-catalog-topic", orderDto); // 토픽 이름은 catalog-service 의 KafkaConsumer 에서 확인할 수 있다.
orderProducer.send("orders", orderDto); // 🌟 아래에서 orders 라는 이름의 sink connector를 새로 등록할 것이다.
ResponseOrder responseOrder = mapper.map(orderDto, ResponseOrder.class);
return ResponseEntity.status(HttpStatus.CREATED).body(responseOrder);
}
}
Sink Connector 추가
서버 기동
아래 서버들을 먼저 기동하자.
- zookeeper 서버
- kafka 서버
그리고 kafka connect 서버를 기동한다.
# 현재 위치: /Users/minju/study/msa/kafka-demo/confluent-6.1.0
$ ./bin/connect-distributed ./etc/kafka/connect-distributed.properties
현재 등록되어진 Connector 확인
새로운 Sink Connector 등록
{
"name":"my-order-sink-connect",
"config":{
"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url":"jdbc:mysql://localhost:3306/mydb",
"connection.user":"***",
"connection.password":"***",
"auto.create":"true",
"auto.evolve":"true",
"delete.enabled":"false",
"tasks.max":"1",
"topics":"orders"
}
}
등록 후, 다시 connector 목록을 확인해보자.
이제 여러 개의 order-service
에서 발생했던 메시지가 토픽에 잘 전달되는지와
전달된 데이터가 하나의 단일 데이터베이스에 들어가는지 테스트해보자.
테스트
서버 기동
아래 서버들을 먼저 기동하자.
- zookeeper 서버
- kafka 서버
- kafka connect 서버
- eureka 서버
config-service
gateway-service
그리고 테스트를 위해 2개의 order-service
기동한다.
기동 후, 유레카 서버를 확인해보면 2개의 order-service
가 기동 중인 것을 확인할 수 있다.
상품 주문
포스트맨을 상품을 총 7번 주문해 보았다.
첫 번째 order-service
는 인텔리제이에서 실행중이고, 두 번째 order-service
는 터미널에서 실행중이다.
#1
주문
터미널에 콘솔이 찍혔다.
#2
주문
#3
주문
#4
주문
인텔리제이에 콘솔이 찍혔다.
#5
주문
#6
주문
#7
주문
총 7번 중, 5번은 터미널에서 실행한 두 번째 order-service
가 호출되었고,
2번은 인텔리제이에서 실행한 첫 번째 order-service
가 호출되었다.
그렇다면 정보들은 단일 db에 잘 저장되었을까?
mariadb 확인
mariadb에서 확인해보자.
첫 번째 데이터와 두 번째 데이터는 이전에 테스트 할 때 생성되었던 데이터고,
세 번째 데이터부터 총 7개의 데이터가 잘 저장되었음을 확인할 수 있다.
그럼 이제 사용자의 주문 내역에서도 총 7개의 데이터를 모두 가져올 수 있을 것이다.
(지금은 user-service
를 기동하지 않은 상태에서 userId
에 dummy data를 넣은 것이기 때문에 실제로 확인해 볼 수는 없다..)
추가적으로, 몇 번 인스턴스에서 발생한 데이터인지도 table에 column으로 넣는다면, 더 풍부한 정보를 나타낼 수 있을 것이다👍
💛 개인 공부 기록용 블로그입니다. 👻