Java/Spring Boot

[MSA] Spring Cloud로 MSA를 개발해보자 12편 [장애 처리, 분산 추적]

누리는 귀여워 2024. 9. 9. 00:59

1. Resilience4j-CircuitBreaker란? 

- 장애가 발생하는 서비스에 반복적인 호출이 되지 못하게 차단

- 특정 서비스가 정상적으로 동작하지 않을 경우 다른 기능으로 대체 수행 -> 장애 회피

 

 

다른 마이크로 서비스에서 오류가 발생했음에도 지금 서비스에서도 오류가 발생하는 경우가 종종 있다

문제가 있는 서비스에는 요청을 보내지 않아야 한다.Feign Client에서는 임시로 그 에러를 대신, 우회, 정상적으로 보여지는 다른 데이터를 보여줄 준비를 해야된다.

2. 테스트

1). 기존 오류 테스트

config, 유레카, gateway, user, rabbitmq 기동

사용자 조회를 해보면 order-service를 조회할 수 없다고 나옴

 

 

2). CircuitBreaker

UserServiceImpl.java

@Override
    public UserDto getUserByUserId(String userId) {
        UserEntity userEntity = userRepository.findAllByUserId(userId);

        if (userEntity == null)
            throw new UsernameNotFoundException("User not found");

        UserDto userDto = new ModelMapper().map(userEntity, UserDto.class);

        /* ErrorDecoder */
//        List<ResponseOrder> orderList = orderServiceClient.getOrders(userId);

        CircuitBreaker circuitBreaker = circuitBreakerFactory.create("circuitbreaker");
        List<ResponseOrder> orderList = circuitBreaker.run(() -> orderServiceClient.getOrders(userId),
                throwable -> new ArrayList<>());

        userDto.setOrders(orderList);

        return userDto;
    }

 

 

 

인텔리제이 콘솔에서는 동일한 오류 코드가 나오지만 사용자가 요청한 데이터를 확인하는데에는 이상이 없다

 

3). Resilience4J

Resilience4JConfig.java

@Configuration
public class Resilience4JConfig {
    @Bean
    public Customizer<Resilience4JCircuitBreakerFactory> globalCustomConfiguration() {
        CircuitBreakerConfig circuitBreakerConfig = CircuitBreakerConfig.custom()

                // CircuitBreaker을 열지 결정하는 failure rate
                // default: 50, 10번 중 5번을 실패하면 연다
                .failureRateThreshold(4)

                // CircuitBreaker을 open한 상태를 유지하는 지속 시간
                // default: 60s
                .waitDurationInOpenState(Duration.ofMillis(1000))

                // CircuitBreaker가 닫힐 때 결과를 기록하는데 사용되는 유형을 구성
                // 카운트 or 시간 기반
                .slidingWindowType(CircuitBreakerConfig.SlidingWindowType.COUNT_BASED)

                // CircuitBreaker가 닫힐 때 결괄르 기록하는데 사용되는 슬라이딩 창 크기 구성
                // default: 100
                .slidingWindowSize(2)
                .build();

        TimeLimiterConfig timeLimiterConfig = TimeLimiterConfig.custom()

                // time limit을 정하는 api, 서플라이어가 어느정도까지 문제가 생겼을 때 오류로 간주할건지
                // default: 1s
                .timeoutDuration(Duration.ofSeconds(4))
                .build();

        return factory -> factory.configureDefault(id -> new Resilience4JConfigBuilder(id)
                .timeLimiterConfig(timeLimiterConfig)
                .circuitBreakerConfig(circuitBreakerConfig)
                .build()
        );
    }
}

 

Order-Service에 kafka 부분을 일시적으로 주석 후 기동

주문 1건 등록

 

주문 서비스가 정상적으로 작동중이기 때문에 조회가 잘 됨

 

Order-Service 중지

오류가 발생하지만 기본적인 데이터만 반환할 수 있도록 함

 

3. Zipkin

Zipkin이란?

- Twitter에서 사용하는 분산 환경의 Timing 데이터 수집, 추적 시스템

- Google Drapper에서 발전하였으며, 분산 환경에서의 시스템 병목 현상 파악

- Collector, Query Service, Databasem WebUI로 구성

- Span

  - 하나의 요청에 사용되는 작업 단위 | 64 비트 unique ID

- Trace

 -트리 구조로 이뤄진 Span 셋 | 하나의 요청에 대한 같은 Trace ID 발급

 

Spring Cloud Sleuth이란?

- 스프링 부트 애플리케이션을 Zipkin과 연동

- 요청 값에 따른 Trace ID, Span ID 부여

- Trace와 Span Ids를 로그에 추가 가능

 - servlet filter

 - rest template

 - scheduled actions

 - message channels

 - feign client

 

1). Zipkin 설치

https://zipkin.io/pages/quickstart.html

 

2). Zipkin 실행

PS C:\Work> java -jar .\zipkin-server-3.4.1-exec.jar

// 포트 사용중일 때
PS C:\Users\PC> netstat -ano | findstr :9411
taskkill /PID <PID 번호> /F

 

3). 코드

User, Order-Service -> application.yml

spring:
  zipkin:
    base-url: http://127.0.0.1:9411
    enabled: true
  sleuth:
    sampler:
      probability: 1.0

 

UserServiceImpl.java

@Override
    public UserDto getUserByUserId(String userId) {
        UserEntity userEntity = userRepository.findAllByUserId(userId);

        if (userEntity == null)
            throw new UsernameNotFoundException("User not found");

        UserDto userDto = new ModelMapper().map(userEntity, UserDto.class);

        log.info("Before call orders microservice");
        CircuitBreaker circuitBreaker = circuitBreakerFactory.create("circuitbreaker");
        List<ResponseOrder> orderList = circuitBreaker.run(() -> orderServiceClient.getOrders(userId),
                throwable -> new ArrayList<>());
        log.info("After call orders microservice");

        userDto.setOrders(orderList);

        return userDto;
    }

 

OrderServiceController.java

@PostMapping("/{userId}/orders")
    public ResponseEntity<ResponseOrder> createOrder(@PathVariable("userId") String userId,
                                                     @RequestBody RequestOrder orderDetails) {
        log.info("Before retrieve orders data");
        ModelMapper mapper = new ModelMapper();
        mapper.getConfiguration().setMatchingStrategy(MatchingStrategies.STRICT);

        OrderDto orderDto = mapper.map(orderDetails, OrderDto.class);
        orderDto.setUserId(userId);

        /* kafka */
        orderDto.setOrderId(UUID.randomUUID().toString());
        orderDto.setTotalPrice(orderDetails.getQty() * orderDetails.getUnitPrice());

        /* send this order to the kafka */
        kafkaProducer.send("example-catalog-topic", orderDto);
        orderProducer.send("orders", orderDto);

        ResponseOrder responseOrder = mapper.map(orderDto, ResponseOrder.class);

        log.info("After retrieve orders data");
        return ResponseEntity.status(HttpStatus.CREATED).body(responseOrder);
    }

    @GetMapping("/{userId}/orders")
    public ResponseEntity<List<ResponseOrder>> getOrder(@PathVariable("userId") String userId) throws Exception {
        Iterable<OrderEntity> orderList = orderService.getOrdersByUserId(userId);
        log.info("Before retrieve orders data");
        List<ResponseOrder> result = new ArrayList<>();
        orderList.forEach(v -> {
            result.add(new ModelMapper().map(v, ResponseOrder.class));
        });

        try {
            Thread.sleep(1000);
            throw new Exception("장애 발생");
        } catch (InterruptedException ex) {
            log.warn(ex.getMessage());
        }

        log.info("After retrieve orders data");

        return ResponseEntity.status(HttpStatus.OK).body(result);
    }

 

4). 테스트