3 분 소요

토픽에 정보를 넣는 것 만으로 db에 데이터를 insert 하기 위해서는, 꼭 아래와 같은 JSON 형태로 넣어야 했다.
스크린샷 2022-10-12 오전 12 01 29
이를 java 코드로 바꾸면 아래와 같다.

스크린샷 2022-10-11 오후 11 45 17
위 사진에서처럼 schemapayload를 각각 객체로 만들고,
이들을 담고 있는 dto 객체를 하나 만들자.
세부적으로는, schema를 구성하고 있는 구성 요소 중 fields는 리스트 형태로 되어있기 때문에, field 객체도 따로 만들자.

따라서 총 4개의 dto 를 만들 것이다.

  • KafkaOrderDto
  • Schema
  • Field
  • Payload

order-service

🗂 dto

KafkaOrderDto.java

@Data
@AllArgsConstructor
public class KafkaOrderDto implements Serializable {

    private Schema schema;
    private Payload payload;
}

Schema.java

@Data
@Builder
public class Schema {

    private String type;
    private List<Field> fields;
    private boolean optional;
    private String name;
}

Field.java

@Data
@AllArgsConstructor
public class Field {

    private String type;
    private boolean optional;
    private String field;
}

Payload.java

@Data
@Builder
public class Payload {

    private String order_id;
    private String user_id;
    private String product_id;
    private int qty;
    private int unit_price;
    private int total_price;
}

이제 이 dto 들을 사용할 수 있는 message 클래스를 만들자.

🗂 messagequeue

OrderProducer.java

@Service
@Slf4j
public class OrderProducer {
    private KafkaTemplate<String, String> kafkaTemplate;

    List<Field> fields = Arrays.asList(
        new Field("string", true, "order_id"),
        new Field("string", true, "user_id"),
        new Field("string", true, "product_id"),
        new Field("int32", true, "qty"),
        new Field("int32", true, "unit_price"),
        new Field("int32", true, "total_price")
        );

    Schema schema = Schema.builder()
        .type("struct")
        .fields(fields)
        .optional(false)
        .name("orders")
        .build();

    @Autowired
    public OrderProducer(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public OrderDto send(String topic, OrderDto orderDto) {

        Payload payload = Payload.builder()
            .order_id(orderDto.getOrderId())
            .user_id(orderDto.getUserId())
            .product_id(orderDto.getProductId())
            .qty(orderDto.getQty())
            .unit_price(orderDto.getUnitPrice())
            .total_price(orderDto.getTotalPrice())
            .build();

        // 토픽에 전달할 객체 생성
        KafkaOrderDto kafkaOrderDto = new KafkaOrderDto(schema, payload);

        ObjectMapper mapper = new ObjectMapper();
        String jsonInString = "";
        try {
            jsonInString = mapper.writeValueAsString(kafkaOrderDto);
        } catch(JsonProcessingException ex) {
            ex.printStackTrace();
        }

        kafkaTemplate.send(topic, jsonInString);
        log.info("Order Producer sent data from the Order microservice: " + kafkaOrderDto);

        return orderDto;
    }
}

🗂 controller

OrderController.java

@RestController
@RequestMapping("/order-service")
public class OrderController {
    ...
    @PostMapping("/{userId}/orders")
    public ResponseEntity<ResponseOrder> createOrder(@PathVariable("userId") String userId, @RequestBody RequestOrder requestOrder) {
        ModelMapper mapper = new ModelMapper();
        mapper.getConfiguration().setMatchingStrategy(MatchingStrategies.STRICT);

        OrderDto orderDto = mapper.map(requestOrder, OrderDto.class);
        orderDto.setUserId(userId);

        /* jpa */
        // OrderDto createdOrder = orderService.createOrder(orderDto);
        // ResponseOrder responseOrder = mapper.map(createdOrder, ResponseOrder.class);

        /* 🌟 kafka */
        orderDto.setOrderId(UUID.randomUUID().toString());
        orderDto.setTotalPrice(requestOrder.getQty() * requestOrder.getUnitPrice());

        /* send this order to the kafka */
        kafkaProducer.send("example-catalog-topic", orderDto); // 토픽 이름은 catalog-service 의 KafkaConsumer 에서 확인할 수 있다.
        orderProducer.send("orders", orderDto); // 🌟 아래에서 orders 라는 이름의 sink connector를 새로 등록할 것이다.

        ResponseOrder responseOrder = mapper.map(orderDto, ResponseOrder.class);

        return ResponseEntity.status(HttpStatus.CREATED).body(responseOrder);
    }
}

Sink Connector 추가

서버 기동

아래 서버들을 먼저 기동하자.

  • zookeeper 서버
  • kafka 서버

그리고 kafka connect 서버를 기동한다.

# 현재 위치: /Users/minju/study/msa/kafka-demo/confluent-6.1.0
$ ./bin/connect-distributed ./etc/kafka/connect-distributed.properties

현재 등록되어진 Connector 확인

스크린샷 2022-10-12 오전 12 33 57

새로운 Sink Connector 등록

스크린샷 2022-10-12 오전 12 35 56

{
  "name":"my-order-sink-connect",
  "config":{
  "connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
  "connection.url":"jdbc:mysql://localhost:3306/mydb",
  "connection.user":"***",
  "connection.password":"***",
  "auto.create":"true",
  "auto.evolve":"true",
  "delete.enabled":"false",
  "tasks.max":"1",
  "topics":"orders"
  }
}

등록 후, 다시 connector 목록을 확인해보자.
스크린샷 2022-10-12 오전 12 40 02

이제 여러 개의 order-service에서 발생했던 메시지가 토픽에 잘 전달되는지와
전달된 데이터가 하나의 단일 데이터베이스에 들어가는지 테스트해보자.

테스트

서버 기동

아래 서버들을 먼저 기동하자.

  • zookeeper 서버
  • kafka 서버
  • kafka connect 서버
  • eureka 서버
  • config-service
  • gateway-service

그리고 테스트를 위해 2개의 order-service 기동한다.
기동 후, 유레카 서버를 확인해보면 2개의 order-service가 기동 중인 것을 확인할 수 있다.
스크린샷 2022-10-12 오전 12 49 31

상품 주문

포스트맨을 상품을 총 7번 주문해 보았다.
첫 번째 order-service는 인텔리제이에서 실행중이고, 두 번째 order-service는 터미널에서 실행중이다.

#1 주문
스크린샷 2022-10-12 오전 12 54 47
터미널에 콘솔이 찍혔다.

#2 주문
스크린샷 2022-10-12 오전 12 55 56

#3 주문
스크린샷 2022-10-12 오전 12 56 45

#4 주문
스크린샷 2022-10-12 오전 12 57 49
인텔리제이에 콘솔이 찍혔다.

#5 주문
스크린샷 2022-10-12 오전 12 58 22

#6 주문
스크린샷 2022-10-12 오전 12 59 00

#7 주문
스크린샷 2022-10-12 오전 12 59 15

총 7번 중, 5번은 터미널에서 실행한 두 번째 order-service가 호출되었고,
2번은 인텔리제이에서 실행한 첫 번째 order-service가 호출되었다.
그렇다면 정보들은 단일 db에 잘 저장되었을까?

mariadb 확인

mariadb에서 확인해보자.
스크린샷 2022-10-12 오전 1 04 39

첫 번째 데이터와 두 번째 데이터는 이전에 테스트 할 때 생성되었던 데이터고,
세 번째 데이터부터 총 7개의 데이터가 잘 저장되었음을 확인할 수 있다.

그럼 이제 사용자의 주문 내역에서도 총 7개의 데이터를 모두 가져올 수 있을 것이다.
(지금은 user-service를 기동하지 않은 상태에서 userId에 dummy data를 넣은 것이기 때문에 실제로 확인해 볼 수는 없다..)

추가적으로, 몇 번 인스턴스에서 발생한 데이터인지도 table에 column으로 넣는다면, 더 풍부한 정보를 나타낼 수 있을 것이다👍



💛 개인 공부 기록용 블로그입니다. 👻

맨 위로 이동하기