스케일 아웃과 데이터 동기화
데이터 동기화 문제
우리는 Cloud Native Architecture에 맞춰 스케일 아웃에 대해 유연하게 대응할 수 있도록, Gateway와 Service Discovery를 사용하여 전체 구조를 구성했다. 덕분에 여러 개의 동일한 마이크로 서비스를 병렬로 실행시키는 것이 가능해졌다.
하지만 시스템 구성할 때 각 마이크로 서비스 별로 독립된 메모리 H2 데이터베이스를 사용하도록 설정했기 때문에,
이처럼 2개의 OrderService를 띄운다면 각각의 OrderService마다 자신의 데이터베이스에 분산 저장을 하게 된다. 이러한 경우 각 데이터베이스에 저장된 데이터의 차이 때문에 데이터 동기화 문제가 발생하게 된다.
이처럼 위 구조와 동일하게 1개의 user-service, 2개의 order-service를 띄워놓은 경우, 어떤 문제가 발생하는지 테스트해보자.
한 명의 유저를 회원 가입 시켜두고,
이와 같은 3개의 주문을 요청하여 order-service의 데이터베이스에 저장하였다.
어떻게 저장되어 있는지 데이터베이스를 확인해 보면,
이와 같이 Gateway의 라운드 로빈 전략에 의해 하나의 order-service에 2개, 다른 하나의 order-service에 1개로 나뉘어 저장되어 있다.
따라서 동일한 유저 정보를 조회하는 요청에도
이처럼 한 번은 두 개의 주문 요청이, 다른 한 번은 한 개의 주문 요청이 된 것처럼 조회가 된다.
분산된 마이크로 서비스마다 서로 다른 데이터베이스를 사용하게 되면 이와 같은 데이터 동기화 문제가 발생하게 된다.
데이터 동기화 문제 해결 방안
이러한 데이터 동기화 문제를 해결하기 위한 방법으로는 세 가지가 있다.
1.
한 개의 데이터베이스만 사용
이러한 데이터 동기화 문제는 분산된 데이터베이스로 인해 발생하는 것이니, 첫 번째 해결 방법은 하나의 데이터베이스만 사용하도록 하여 데이터 동기화 문제 자체가 발생하지 않도록 하는 방법이다. 하지만 이 방법을 적용하기 위해서는 트랜잭션과 동시성 문제 해결 필요가 있고, 이를 위해 동시성 제어를 하는 순간 성능이 떨어진다는 단점이 있다.
2.
메세지 큐잉 서버를 통한 데이터베이스 간의 동기화
두 번째 해결 방법은 분산된 데이터베이스를 그대로 사용하되, Message Queuing Server를 통해 각 데이터베이스의 데이터들을 동기화하는 방법이다. 이 방법은 동시성 제어를 하지 않지만, Message Queuing Server를 통해 다른 데이터베이스의 동기화가 끝나야지만 데이터의 일관성이 유지되는 지연된 데이터 일관성(결과적 데이터 일관성)을 가진다. 또한 데이터의 저장 로직과 별개로 추가적인 동기화 로직을 작성해주어야 한다는 단점이 있다.
3.
하나의 DB + 메세지 큐잉 서버 복합 사용
마지막 방법은 위 두 가지 방법을 복합적으로 사용하여, 하나의 데이터베이스를 사용하지만 Message Queuing Server를 중간에 두어 별도의 트랜잭션이나 동시성 제어 없이 사용하는 방법이다. 대규모 데이터 처리가 가능한 메세지 큐잉 서버를 사용하여 별도의 트랜잭션이나 동시성 제어 없이 하나의 데이터베이스를 사용하고, 이로 인해 추후의 데이터베이스 스케일링이나 샤딩 전략을 유연하게 가져갈 수 있다는 장점이 있다.
Kafka 적용하기
마이크로 서비스 간 데이터 동기화
강의에서 구현하는 시스템에서는 사용자가 주문 요청을 넣으면 FeignClient를 통해 Order-service에 전달되어 주문 요청을 저장한다. 그 과정에서 상품이 주문된 개수만큼 catalog-service에서 수량을 감소 시키도록 만들것이다.
하지만 이를 RestTemplate나 FeignClient를 통해 직접 호출하여 처리하는 것이 아니라, 위처럼 Kafka를 통해 Topic을 등록해두고 해당 Topic으로 변경사항을 자동으로 감지하게 만들어보자. 위 상황에서는 order-service가 producer / catalog-service가 consumer가 된다.
먼저 catalog-service를 consumer로 등록해보자.
implementation 'org.springframework.kafka:spring-kafka'
Java
복사
Spring에서 Kafka를 연동하여 사용하기 위해 Kafka 라이브러리 의존성을 추가해주고,
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> properties = new HashMap<>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupId");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(properties);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory
= new ConcurrentKafkaListenerContainerFactory<>();
kafkaListenerContainerFactory.setConsumerFactory(consumerFactory());
return kafkaListenerContainerFactory;
}
}
Java
복사
이와 같이 애플리케이션에서 Consumer로 동작하기 위해 ConsumerFactory와 KafkaListener를 빈으로 등록해준다.
@Service
@Slf4j
@RequiredArgsConstructor
public class KafkaConsumer {
private final CatalogRepository catalogRepository;
private final ObjectMapper objectMapper;
@KafkaListener(topics = "example-catalog-topic")
public void updateQuantity(String kafkaMessage) {
log.info(("Kafka Message = {}"), kafkaMessage);
Map<String, Object> map = new HashMap<>();
try {
map = objectMapper.readValue(kafkaMessage, new TypeReference<Map<String, Object>>() {});
} catch (JsonProcessingException e) {
e.printStackTrace();
}
CatalogEntity catalog = catalogRepository.findByProductId((String) map.get("productId"));
if (catalog != null) {
catalog.setStock(catalog.getStock() - (Integer) map.get("quantity"));
catalogRepository.save(catalog);
}
}
}
Java
복사
그 후 메세지를 받으면 데이터를 처리하기 위해, 위와 같이 KafkaConsumer라는 별도의 서비스를 구축한다. @KafkaListener 애노테이션을 통해 특정 topic의 메세지를 수신하는 경우 해당 메서드가 동작하게 되고, 메서드 내부에서는 메세지를 파싱하여 변경사항을 Repository에 반영하는 로직을 추가하였다.
다음은 order-service를 producer로 등록해보자.
implementation 'org.springframework.kafka:spring-kafka'
Java
복사
마찬가지로 Spring에서 Kafka를 연동하여 사용하기 위해 Kafka 라이브러리 의존성을 추가해주고,
@EnableKafka
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> properties = new HashMap<>();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(properties);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
Java
복사
ProducerFactory와 KafkaTemplate를 빈으로 등록해준다.
@Service
@Slf4j
@RequiredArgsConstructor
public class KafkaProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
private final ObjectMapper objectMapper;
public OrderDto send(String topic, OrderDto orderDto) {
String jsonInString = "";
try {
jsonInString = objectMapper.writeValueAsString(orderDto);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
kafkaTemplate.send(topic, jsonInString);
log.info("Kafka Producer sent data from the Order microservice: {}", orderDto);
return orderDto;
}
}
Java
복사
이후 Kafka에 메세지를 보내는 KafkaProducer를 서비스로 등록하여, topic과 데이터를 받아 메세지를 전달하도록 작성한다.
@PostMapping("/{userId}/orders")
public ResponseEntity<ResponseOrder> createOrder(@PathVariable String userId,
@RequestBody RequestOrder orderDetails) {
modelMapper.getConfiguration().setMatchingStrategy(MatchingStrategies.STRICT);
OrderDto orderDto = modelMapper.map(orderDetails, OrderDto.class);
orderDto.setUserId(userId);
orderService.createOrder(orderDto);
ResponseOrder responseOrder = modelMapper.map(orderDto, ResponseOrder.class);
/* send order to kafka */
kafkaProducer.send("example-catalog-topic", orderDto);
return ResponseEntity.status(HttpStatus.CREATED).body(responseOrder);
}
Java
복사
그리고 이와 같이 주문을 저장하는 로직에서 모든 로직 처리 이후에 Kafka에 메세지를 보내도록 작성해주었다.
그렇다면 제대로 동작하는지 확인해보자. 동작 확인을 위해 Service Discovery, Config Service, Gateway, Order-service, Catalog-service 애플리케이션을 동작시킨다.
초기 Catalog-service의 데이터베이스에는 위와 같이 재고에 대한 정보들이 저장되어 있다.
이후 CATALOG-002 제품에 대해 10개만큼 order-service에 주문을 넣으면
order-service 데이터베이스에 주문이 잘 저장된다.
그리고 order-service가 producer로서 Kafka에 메세지를 전달하고,
이를 catalog-service가 consumer로서 메세지를 전달 받아 처리한다.
catalog-service의 데이터베이스를 확인해보면, 의도한대로 CATALOG-002 상품에 대해 재고가 10개 줄어있는 것을 확인할 수 있다.
Kafka를 통한 스케일 아웃 데이터 동기화
위에서 언급한 스케일 아웃으로 인해 동일한 서비스가 여러 개 실행되는 상황에서 데이터베이스의 동기화 문제를 해결해보자.
spring:
datasource:
# url: jdbc:h2:mem:testdb
# driver-class-name: org.h2.Driver
url: jdbc:mariadb://localhost:3306/mydb
driver-class-name: org.mariadb.jdbc.Driver
username: root
password: test1357
YAML
복사
먼저 기존의 메모리를 사용하던 H2 데이터베이스에서 로컬에 Mariadb 서버를 띄우고 연결하여, 각 서비스마다 개별의 데이터베이스를 사용하는 것이 아니라 통합된 하나의 데이터베이스만 사용하도록 바꾸어준다.
이제 우리는 JPA를 통해서 데이터베이스에 엔티티를 저장하는 것이 아니라, 데이터를 포맷에 맞춰 가공 후 Kafka에 메세지로 전달하여 Kafka Connect를 통해 자동으로 데이터베이스에 저장되도록 만들 것이다.
Kafka Connect를 통해 데이터베이스에 저장하도록 만들기 위해서는
이와 같은 형태로 저장하고자 하는 데이터를 가공하여 메세지를 보내야한다. 이를 위해서 아래와 같이 JSON 형태로 객체를 만들어보자.
@Data
@AllArgsConstructor
public class KafkaOrderDto implements Serializable {
private Schema schema;
private Payload payload;
}
Java
복사
@Data
@Builder
public class Schema {
private String type;
private List<Field> fields;
private boolean optional;
private String name;
}
Java
복사
@Data
@AllArgsConstructor
public class Field {
private String type;
private boolean optional;
private String field;
}
Java
복사
@Data
@Builder
public class Payload {
private String order_id;
private String user_id;
private String product_id;
private int quantity;
private int unit_price;
private int total_price;
}
Java
복사
이와 같이 데이터를 구성할 여러 데이터 클래스들을 만들고,
@Service
@Slf4j
@RequiredArgsConstructor
public class OrderProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
private final ObjectMapper objectMapper;
private final List<Field> fields = List.of(
new Field("string", true, "order_id"),
new Field("string", true, "user_id"),
new Field("string", true, "product_id"),
new Field("int32", true, "quantity"),
new Field("int32", true, "unit_price"),
new Field("int32", true, "total_price"));
private final Schema schema = Schema.builder()
.type("struct")
.fields(fields)
.optional(false)
.name("orders")
.build();
public OrderDto send(String topic, OrderDto orderDto) {
Payload payload = Payload.builder()
.order_id(orderDto.getOrderId())
.user_id(orderDto.getUserId())
.product_id(orderDto.getProductId())
.quantity(orderDto.getQuantity())
.unit_price(orderDto.getUnitPrice())
.total_price(orderDto.getTotalPrice())
.build();
KafkaOrderDto kafkaOrderDto = new KafkaOrderDto(schema, payload);
String jsonInString = "";
try {
jsonInString = objectMapper.writeValueAsString(kafkaOrderDto);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
kafkaTemplate.send(topic, jsonInString);
log.info("Order Producer sent data from the Order microservice: {}", kafkaOrderDto);
return orderDto;
}
}
Java
복사
이처럼 Producer 내에서 해당하는 Schema 데이터와 Payload 데이터를 채워 KafkaTemplate로 전달하도록 작성한다.
@PostMapping("/{userId}/orders")
public ResponseEntity<ResponseOrder> createOrder(@PathVariable String userId,
@RequestBody RequestOrder orderDetails) {
modelMapper.getConfiguration().setMatchingStrategy(MatchingStrategies.STRICT);
OrderDto orderDto = modelMapper.map(orderDetails, OrderDto.class);
orderDto.setUserId(userId);
/* jpa */
// orderService.createOrder(orderDto);
/* kafka */
orderDto.setOrderId(UUID.randomUUID().toString());
orderDto.setTotalPrice(orderDetails.getQuantity() * orderDetails.getUnitPrice());
/* send order to kafka */
kafkaProducer.send("example-catalog-topic", orderDto);
orderProducer.send("orders", orderDto);
ResponseOrder responseOrder = modelMapper.map(orderDto, ResponseOrder.class);
return ResponseEntity.status(HttpStatus.CREATED).body(responseOrder);
}
Java
복사
이후 이와 같이 주문 요청 시 JPA로 저장하는 부분을 주석처리하고, 새로 만든 OrderProducer를 통해 Kafka에 메세지로 전달하도록 수정한다.
Kafka에서 데이터를 받으면 이를 데이터베이스에 저장할 Kafka Connect가 필요하다.
이와 같이 데이터를 받아서 데이터베이스에 orders 테이블에 저장하는 Sink Connect를 추가해준다.
이후 order-service를 2개 이상 실행시키고
이와 같이 4개의 주문 요청을 넣으면,
첫 번째 애플리케이션에서 2개의 주문 요청을 처리하고
다른 한개의 애플리케이션에서 나머지 2개의 주문 요청을 처리한다.
하지만 결과적으로 데이터베이스를 살펴보면,
이와 같이 4개의 주문이 잘 저장되어 있는 것을 볼 수 있다.