마이크로 서비스 유저 서비스처럼 config에서 설정파일 받게 수정
1. Orders 서비스와 Catalogs 서비스에 Kafka Topic를 적용
- Orders 서비스에 요청된 주문의 수량 정보를 Catalogs 서비스에 반영
- Orders 서비스에서 Kafka Topic로 메시지 전송 -> Producer
- Catalogs 서비스에서 Kafka Topic에 전송된 메시지 취득 -> Consumer
2. 동기화 코드 [1]
order-service -> KafkaProducerConfig.java
// 카프카 활성화
@EnableKafka
// 이 클래스를 설정 파일로 사용, bean을 관리
@Configuration
public class KafkaProducerConfig {
@Bean
// kafka 프로듀서의 인스턴스를 생성하는 ProducerFactory
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> properties = new HashMap<>();
// kafka 클러스터 주소 설정, 프로듀서는 이 주소를 사용하여 브로커와 통신
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
// 메시지의 키를 String로 변환
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// 메시지의 값을 String로 변환
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(properties);
}
@Bean
// kafka 메시지를 보내는 KafkaTemplate를 생성하는 템플릿
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
Order-Service -> KafkaProducer.java
@Service
@Slf4j
public class KafkaProducer {
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired
public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
// topic는 메시지를 보낼 kafka의 토픽 이름
// orderDto = 전송 데이터
public OrderDto send(String topic, OrderDto orderDto) {
ObjectMapper mapper = new ObjectMapper();
String jsonInString = "";
try {
jsonInString = mapper.writeValueAsString(orderDto);
} catch (JsonProcessingException ex) {
ex.printStackTrace();
}
// kafkaTemplate를 사용하여 변환된 JSON 문자열을 kafka 토픽으로 보낸다.
// 이 메시지는 topic로 전달됨
kafkaTemplate.send(topic, jsonInString);
log.info("Kafka Producer send data from the Order microservice: " + orderDto);
return orderDto;
}
}
KafkaConsumerConfig.java
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Bean
// consumer의 인스턴스를 생성하기 위한 consumeFactory
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> properties = new HashMap<>();
// kafka 클러스터 주소 설정
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
// kafka consumer 그룹 ID 설정
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupId");
// 메시지의 키를 역직렬화하고 키를 String로 변환
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// 메시지의 값를 역직렬화하고 값를 String로 변환
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(properties);
}
@Bean
// kafka consumer을 생성하고 메시지를 수신
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory
= new ConcurrentKafkaListenerContainerFactory<>();
kafkaListenerContainerFactory.setConsumerFactory(consumerFactory());
return kafkaListenerContainerFactory;
}
}
Catalog-Service -> KafkaConsumer.java
@Service
@Slf4j
public class KafkaConsumer {
CatalogRepository repository;
@Autowired
public KafkaConsumer(CatalogRepository repository) {
this.repository = repository;
}
// kafka로부터 메시지를 수신할 리스너를 저장
// example-catalog-topic 토픽에서 메시지를 수신하면 해당 메시지를 처리하는 updateQty 실행
@KafkaListener(topics = "example-catalog-topic")
public void updateQty(String kafkaMessage) {
log.info("Kafka Message: ->" + kafkaMessage);
Map<Object, Object> map = new HashMap<>();
ObjectMapper mapper = new ObjectMapper();
try {
map = mapper.readValue(kafkaMessage, new TypeReference<Map<Object, Object>>() {});
} catch (JsonProcessingException ex) {
ex.printStackTrace();
}
// productId를 통해 상품의 정보를 조회
// map.get("productId")는 kafka 메시지에서 추출한 productId임
CatalogEntity entity = repository.findByProductId((String)map.get("productId"));
if (entity != null) {
// 조회된 CatalogEntity의 stock 값을 kafka 메시지에 포함된 qty 값만큼 감소
entity.setStock(entity.getStock() - (Integer)map.get("qty"));
repository.save(entity);
}
}
}
Order-Service -> OrderController.java
@PostMapping("/{userId}/orders")
public ResponseEntity<ResponseOrder> createOrder(@PathVariable("userId") String userId,
@RequestBody RequestOrder orderDetails) {
ModelMapper mapper = new ModelMapper();
mapper.getConfiguration().setMatchingStrategy(MatchingStrategies.STRICT);
/* jpa */
OrderDto orderDto = mapper.map(orderDetails, OrderDto.class);
orderDto.setUserId(userId);
OrderDto createOrder = orderService.createOrder(orderDto);
ResponseOrder responseOrder = mapper.map(createOrder, ResponseOrder.class);
// kafka 추가
// kafkaProducer은 kafka로 메시지를 보내는 객체
// 목적지인 토픽의 이름 example-catalog-topic
// 전송할 데이터로, 주문 정보를 담고 있는 orderDto.
// 직렬화되어 JSON 형식으로 토픽에 전송됨
kafkaProducer.send("example-catalog-topic", orderDto);
return ResponseEntity.status(HttpStatus.CREATED).body(responseOrder);
}
3. 동기화 테스트 [1]
주키퍼 서버, 카프카 서버, 유레카 서버, 게이트웨이, config 서비스, orders 서비스, catalog 서비스 기동
Order Service와 Catalog Service간의 데이터 동기화가 완료되었다
4. 동기화 코드 [2]
데이터 동기화 - Multiple Orders Service
- Order Service에 요청된 주문 정보를 DB가 아니라 Kafka Topic로 전송
- Kafka Topic에 설정된 Kafka Sink Connect를 사용해 단일 DB에 저장 -> 데이터 동기화
Field.java
@Data
@AllArgsConstructor
public class Field {
private String type;
private boolean optional;
private String field;
}
KafkaOrderDto.java
@Data
@AllArgsConstructor
public class KafkaOrderDto implements Serializable {
private Schema schema;
private Payload payload;
}
Payload.java
@Data
@Builder
public class Payload {
private String order_id;
private String user_id;
private String product_id;
private int qty;
private int unit_price;
private int total_price;
}
Schema.java
@Data
@Builder
public class Schema {
private String type;
private List<Field> fields;
private boolean optional;
private String name;
}
OrderProducer.java
@Service
@Slf4j
public class OrderProducer {
private KafkaTemplate<String, String> kafkaTemplate;
// Kafka 메시지의 스키마 정의
// 생성자 패턴
List<Field> fields = Arrays.asList(new Field("string", true, "order_id"),
new Field("string", true, "user_id"),
new Field("string", true, "product_id"),
new Field("int32", true, "qty"),
new Field("int32", true, "unit_price"),
new Field("int32", true, "total_price"));
// Kafka 메시지의 스키마 정의
// 빌더 패턴
Schema schema = Schema.builder()
.type("struct")
.fields(fields)
.optional(false)
.name("orders")
.build();
@Autowired
public OrderProducer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
// send는 kafka에 메시지를 전송하는 기능
// topic와 orderDto를 인자로 받아 해당 토픽으로 주문 정보를 담은 메시지 전송
public OrderDto send(String topic, OrderDto orderDto) {
// 주문 정보를 담은 Payload 객체를 빌더 패턴으로 생성
// orderDto로부터 주문정보를 가져와 Payload에 넣음
Payload payload = Payload.builder()
.order_id(orderDto.getOrderId())
.user_id(orderDto.getUserId())
.product_id(orderDto.getProductId())
.qty(orderDto.getQty())
.unit_price(orderDto.getUnitPrice())
.total_price(orderDto.getTotalPrice())
.build();
// kafka에 전송할 전체 메시지를 담음
KafkaOrderDto kafkaOrderDto = new KafkaOrderDto(schema, payload);
ObjectMapper mapper = new ObjectMapper();
String jsonInString = "";
try {
jsonInString = mapper.writeValueAsString(kafkaOrderDto);
} catch (JsonProcessingException ex) {
ex.printStackTrace();
}
kafkaTemplate.send(topic, jsonInString);
log.info("Order Producer send data from the Order microservice: " + kafkaOrderDto);
return orderDto;
}
}
OrderController.java
@PostMapping("/{userId}/orders")
public ResponseEntity<ResponseOrder> createOrder(@PathVariable("userId") String userId,
@RequestBody RequestOrder orderDetails) {
ModelMapper mapper = new ModelMapper();
mapper.getConfiguration().setMatchingStrategy(MatchingStrategies.STRICT);
OrderDto orderDto = mapper.map(orderDetails, OrderDto.class);
orderDto.setUserId(userId);
/* kafka */
// orderDto에 고유한 주문ID를 생성
orderDto.setOrderId(UUID.randomUUID().toString());
orderDto.setTotalPrice(orderDetails.getQty() * orderDetails.getUnitPrice());
/* send this order to the kafka */
kafkaProducer.send("example-catalog-topic", orderDto);
// orderProducer을 사용해 kafka의 "orders" 토픽으로 orderDto 메시지를 전송
orderProducer.send("orders", orderDto);
ResponseOrder responseOrder = mapper.map(orderDto, ResponseOrder.class);
return ResponseEntity.status(HttpStatus.CREATED).body(responseOrder);
}
5. 동기화 테스트 [2]
주키퍼 서버, 카프카 서버, 카프카 커넥트, Rabbit MQ, Order-Service(2개) 기동
PS C:\Work\kafka_2.12-3.4.0> .\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
PS C:\Work\kafka_2.12-3.4.0> .\bin\windows\kafka-server-start.bat .\config\server.properties
PS C:\Work\confluent-7.3.1> .\bin\windows\connect-distributed.bat .\etc\kafka\connect-distributed.properties
PS C:\Windows\system32> net start RabbitMQ
1). 싱크커넥터 등록
2). 동기화 테스트
첫번째 주문 : 3번 상품 12개 주문
두번째 주문 : 2번 상품 13개 주문
세번째 주문 : 1번 상품 14개 주문
두곳의 Order-Service에서 주문을 했지만 단일 DB에 저장됨
'Java > Spring Boot' 카테고리의 다른 글
[MSA] Spring Cloud로 MSA를 개발해보자 13편 [모니터링] (1) | 2024.09.12 |
---|---|
[MSA] Spring Cloud로 MSA를 개발해보자 12편 [장애 처리, 분산 추적] (0) | 2024.09.09 |
[MSA] Spring Cloud로 MSA를 개발해보자 10편 [Kafka-1] (0) | 2024.09.01 |
[MSA] Spring Cloud로 MSA를 개발해보자 9편 [서비스간 통신-2] (0) | 2024.08.30 |
[MSA] Spring Cloud로 MSA를 개발해보자 8편 [서비스간 통신-1] (0) | 2024.08.29 |