[E-commerce App] order-service와 catalog-service에 Kafka Topic 적용
데이터 동기화
order-service
-> catalog-service
order-service
에 요청 된 주문의 수량 정보를catalog-service
에 반영order-service
에서 Kafka Topic으로 메시지 전송 ->Producer
역할catalog-service
에서 Kafka Topic에 전송 된 메시지 취득 ->Consumer
역할
토픽에 관심 정보가 있다고 등록을 하는 Consumer
를 먼저 작업한 뒤에,
토픽에 데이터를 전달하는 Producer
를 나중에 작업하자.
catalog-service
수정
pom.xml
<!-- Kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
config/KafkaConsumerConfig.java
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
/* 접속 정보 등록 */
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> properties = new HashMap<>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupId");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(properties);
}
/* 접속 정보를 이용한 리스너 */
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory
= new ConcurrentKafkaListenerContainerFactory<>();
kafkaListenerContainerFactory.setConsumerFactory(consumerFactory());
return kafkaListenerContainerFactory;
}
}
service/KafkaConsumer.java
Consumer는 토픽에서 변경되어진 값을 가지고 가서 실제 db에 반영해주는 작업을 할 것이다.
@Service
@Slf4j
public class KafkaConsumer {
CatalogRepository catalogRepository;
@Autowired
public KafkaConsumer(CatalogRepository catalogRepository) {
this.catalogRepository = catalogRepository;
}
/*example-catalog-topic 이라는 토픽에 새로운 데이터가 전달되면, 그 데이터 값을 가져와 아래 메서드를 실행한다.*/
@KafkaListener(topics = "example-catalog-topic")
public void updateQty(String kafkaMessage) {
log.info("Kafka Message: ->" + kafkaMessage);
Map<Object, Object> map = new HashMap<>();
ObjectMapper mapper = new ObjectMapper();
try {
// String 타입의 kafkaMessage 를 Json 타입으로 변경
map = mapper.readValue(kafkaMessage, new TypeReference<Map<Object, Object>>() {});
} catch (JsonProcessingException ex) {
ex.printStackTrace();
}
CatalogEntity entity = catalogRepository.findByProductId((String)map.get("productId")); // productId로 값을 가져오기 때문에, 잘못된 productId를 전달하면 entity레 null이 들어간다.
if (entity != null) {
entity.setStock(entity.getStock() - (Integer)map.get("qty"));
catalogRepository.save(entity);
}
}
}
order-service
수정
pom.xml
<!-- Kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
config/KafkaProducerConfig.java
@EnableKafka
@Configuration
public class KafkaProducerConfig {
/* 접속 정보 등록 */
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> properties = new HashMap<>();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(properties);
}
/* 데이터를 전달하는 용도의 인스턴스 */
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
service/KafkaProducer.java
Producer는 변경된 값을 토픽에 넣는 작업을 할 것이다.
@Service
@Slf4j
public class KafkaProducer {
private KafkaTemplate<String, String> kafkaTemplate;
public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public OrderDto send(String topic, OrderDto orderDto) {
ObjectMapper mapper = new ObjectMapper();
String jsonInString = "";
try {
jsonInString = mapper.writeValueAsString(orderDto);
} catch(JsonProcessingException ex) {
ex.printStackTrace();
}
kafkaTemplate.send(topic, jsonInString);
log.info("Kafka Producer sent data from the Order microservice: " + orderDto);
return orderDto;
}
}
controller/OrderController.java
주문이 들어오면, 변경된 수량 정보를 토픽에 넣자.
@RestController
@RequestMapping("/order-service")
public class OrderController {
...
@PostMapping("/{userId}/orders")
public ResponseEntity<ResponseOrder> createOrder(@PathVariable("userId") String userId, @RequestBody RequestOrder requestOrder) {
ModelMapper mapper = new ModelMapper();
mapper.getConfiguration().setMatchingStrategy(MatchingStrategies.STRICT);
/* jpa */
OrderDto orderDto = mapper.map(requestOrder, OrderDto.class);
orderDto.setUserId(userId);
OrderDto createdOrder = orderService.createOrder(orderDto);
ResponseOrder responseOrder = mapper.map(createdOrder, ResponseOrder.class);
/* 🌟 send this order to the kafka */
kafkaProducer.send("example-catalog-topic", orderDto); // 토픽 이름은 catalog-service 의 KafkaConsumer 에서 확인할 수 있다.
return ResponseEntity.status(HttpStatus.CREATED).body(responseOrder);
}
}
데이터 동기화 테스트
아래 서버들을 먼저 기동하자.
- zookeeper 서버
- kafka 서버
- eureka 서버
config-service
gateway-service
그리고 order-service
와 catalog-service
를 기동한다.
이제 상품을 주문 한 뒤에, order-service
와 catalog-service
의 h2 데이터베이스를 확인해보자.
상품 주문 전
아래 사진과 같이 order-service
의 db에는 아무런 값이 없고, catalog-service
의 db에는 현재, CATALOG-003
물품이 120개 있다.
order-service
catalog-service
상품 주문
이제 포스트맨을 통해 상품 주문을 해보자.
(지금 테스트는 order-service
와 catalog-service
의 데이터가 동기화 되었는지 확인하기 위한 것으로, userId
에는 아무거나 입력한다.)
상품 주문 후
이제 다시 order-service
와 catalog-service
의 h2 데이터베이스를 확인해보자.
아래 사진과 같이 order-service
의 db에 주문 정보가 생겼고, catalog-service
의 db에는 CATALOG-003
물품이 110개 있다.
order-service
catalog-service
order-service
에서 10개를 주문하니, catalog-service
에서 수량이 10개 줄어들었음을 확인할 수 있다.
다음 글에서는 하나의 order-service
가 아닌 여러 개의 order-service
를 기동했을 때,
Kafka Connect를 이용해 단일 데이터베이스를 사용하는 방법에 대해 알아보자.
💛 개인 공부 기록용 블로그입니다. 👻