2 분 소요

데이터 동기화

order-service -> catalog-service

  • order-service에 요청 된 주문의 수량 정보를 catalog-service에 반영
  • order-service에서 Kafka Topic으로 메시지 전송 -> Producer 역할
  • catalog-service에서 Kafka Topic에 전송 된 메시지 취득 -> Consumer 역할

스크린샷 2022-10-10 오후 6 23 07

토픽에 관심 정보가 있다고 등록을 하는 Consumer를 먼저 작업한 뒤에,
토픽에 데이터를 전달하는 Producer를 나중에 작업하자.

catalog-service 수정

pom.xml

<!-- Kafka -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

config/KafkaConsumerConfig.java

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    /* 접속 정보 등록 */
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> properties = new HashMap<>();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupId");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        return new DefaultKafkaConsumerFactory<>(properties);
    }

    /* 접속 정보를 이용한 리스너 */
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory
            = new ConcurrentKafkaListenerContainerFactory<>();

        kafkaListenerContainerFactory.setConsumerFactory(consumerFactory());

        return kafkaListenerContainerFactory;
    }
}

service/KafkaConsumer.java

Consumer는 토픽에서 변경되어진 값을 가지고 가서 실제 db에 반영해주는 작업을 할 것이다.

@Service
@Slf4j
public class KafkaConsumer {

    CatalogRepository catalogRepository;

    @Autowired
    public KafkaConsumer(CatalogRepository catalogRepository) {
        this.catalogRepository = catalogRepository;
    }

    /*example-catalog-topic 이라는 토픽에 새로운 데이터가 전달되면, 그 데이터 값을 가져와 아래 메서드를 실행한다.*/
    @KafkaListener(topics = "example-catalog-topic")
    public void updateQty(String kafkaMessage) {
        log.info("Kafka Message: ->" + kafkaMessage);

        Map<Object, Object> map = new HashMap<>();
        ObjectMapper mapper = new ObjectMapper();
        try {
            // String 타입의 kafkaMessage 를 Json 타입으로 변경
            map = mapper.readValue(kafkaMessage, new TypeReference<Map<Object, Object>>() {});
        } catch (JsonProcessingException ex) {
            ex.printStackTrace();
        }

        CatalogEntity entity = catalogRepository.findByProductId((String)map.get("productId")); // productId로 값을 가져오기 때문에, 잘못된 productId를 전달하면 entity레 null이 들어간다.
        if (entity != null) {
            entity.setStock(entity.getStock() - (Integer)map.get("qty"));
            catalogRepository.save(entity);
        }
    }

}

order-service 수정

pom.xml

<!-- Kafka -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

config/KafkaProducerConfig.java

@EnableKafka
@Configuration
public class KafkaProducerConfig {

    /* 접속 정보 등록 */
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> properties = new HashMap<>();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        return new DefaultKafkaProducerFactory<>(properties);
    }

    /* 데이터를 전달하는 용도의 인스턴스 */
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

service/KafkaProducer.java

Producer는 변경된 값을 토픽에 넣는 작업을 할 것이다.

@Service
@Slf4j
public class KafkaProducer {
    private KafkaTemplate<String, String> kafkaTemplate;

    public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public OrderDto send(String topic, OrderDto orderDto) {
        ObjectMapper mapper = new ObjectMapper();
        String jsonInString = "";
        try {
            jsonInString = mapper.writeValueAsString(orderDto);
        } catch(JsonProcessingException ex) {
            ex.printStackTrace();
        }

        kafkaTemplate.send(topic, jsonInString);
        log.info("Kafka Producer sent data from the Order microservice: " + orderDto);

        return orderDto;
    }
}

controller/OrderController.java

주문이 들어오면, 변경된 수량 정보를 토픽에 넣자.

@RestController
@RequestMapping("/order-service")
public class OrderController {
    ...
    @PostMapping("/{userId}/orders")
    public ResponseEntity<ResponseOrder> createOrder(@PathVariable("userId") String userId, @RequestBody RequestOrder requestOrder) {
        ModelMapper mapper = new ModelMapper();
        mapper.getConfiguration().setMatchingStrategy(MatchingStrategies.STRICT);

        /* jpa */
        OrderDto orderDto = mapper.map(requestOrder, OrderDto.class);
        orderDto.setUserId(userId);

        OrderDto createdOrder = orderService.createOrder(orderDto);
        ResponseOrder responseOrder = mapper.map(createdOrder, ResponseOrder.class);

        /* 🌟 send this order to the kafka */
        kafkaProducer.send("example-catalog-topic", orderDto); // 토픽 이름은 catalog-service 의 KafkaConsumer 에서 확인할 수 있다.

        return ResponseEntity.status(HttpStatus.CREATED).body(responseOrder);
    }
}

데이터 동기화 테스트

아래 서버들을 먼저 기동하자.

  • zookeeper 서버
  • kafka 서버
  • eureka 서버
  • config-service
  • gateway-service

그리고 order-servicecatalog-service를 기동한다.
이제 상품을 주문 한 뒤에, order-servicecatalog-service의 h2 데이터베이스를 확인해보자.

상품 주문 전

아래 사진과 같이 order-service의 db에는 아무런 값이 없고, catalog-service의 db에는 현재, CATALOG-003 물품이 120개 있다.

order-service
스크린샷 2022-10-10 오후 9 02 19

catalog-service
스크린샷 2022-10-10 오후 9 05 11

상품 주문

이제 포스트맨을 통해 상품 주문을 해보자.
(지금 테스트는 order-servicecatalog-service의 데이터가 동기화 되었는지 확인하기 위한 것으로, userId에는 아무거나 입력한다.)
스크린샷 2022-10-10 오후 9 08 37

상품 주문 후

이제 다시 order-servicecatalog-service의 h2 데이터베이스를 확인해보자.
아래 사진과 같이 order-service의 db에 주문 정보가 생겼고, catalog-service의 db에는 CATALOG-003 물품이 110개 있다.

order-service
스크린샷 2022-10-10 오후 9 06 41

catalog-service
스크린샷 2022-10-10 오후 9 05 43

order-service에서 10개를 주문하니, catalog-service에서 수량이 10개 줄어들었음을 확인할 수 있다.

다음 글에서는 하나의 order-service가 아닌 여러 개의 order-service를 기동했을 때,
Kafka Connect를 이용해 단일 데이터베이스를 사용하는 방법에 대해 알아보자.



💛 개인 공부 기록용 블로그입니다. 👻

맨 위로 이동하기