Spring 프로젝트에 Kafka 사용하기

사용 버전

  • Spring boot 2.6.6
  • AdoptOpenJDK 11
  • zookeeper 3.8.1
  • kafka 3.3.2

Spring Boot에서 Kafka 적용

  • build.gradle에 kafka 추가

    implementation 'org.springframework.kafka:spring-kafka'
    
  • application.yml

    kafka:
        bootstrap-servers: kafka1:9092,kafka2:9092,kafka3:9092
        properties:
          enable.idempotence: true # 멱등성 처리, retries > 0 and acks = all 필요.
          enable.auto.commit: false
          isolation.level: read_committed
          acks: all # default all,  acks=-1도 all과 같음.
          max.in.flight.requests.per.connection: 5 # default 5, 단일 연결에서 보낼 최대 요청 수
          max.block.ms: 5000 # default 60000(1분),  Broker, 코디네이터 연결, 메타데이터 가져오기, 버퍼 할당 등 기다리는 총 시간
          request.timeout.ms: 5000 # default 30000(30초), 클라이언트가 요청 응답을 기다리는 최대 시간, 시간 초과시 재시도
          delivery.timeout.ms: 10000 # default 120000(2분), delivery.timeout.ms > (request.timeout.ms + linger.ms)
          session.timeout.ms: 60000 # default 45000(45초)
          max.poll.interval.ms:
            180000 # default 300000(5분), 소비자 그룹 관리를 사용할 때 poll() 호출 사이의 최대 지연,
            # max.poll.records로 가져간 메시지들을 처리하는 시간이 max.poll.interval.ms을 넘으면 리밸런스가 일어남.
          max.poll.records: 5 # default 500, poll() 요청으로 가져올 수 있는 최대 레코드 수.
        consumer:
          auto-offset-reset: earliest
          # latest: 가장 최근에 생산된 메시지로 offeset reset
          # earliest: 가장 오래된 메시지로 offeset reset
          # none: offset 정보가 없으면 Exception 발생
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
          retry:
            interval.ms: 1000
            maxAttempts: 1
      
        producer:
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
    
  • kafka config

    @EnableKafka
    @Configuration
    @RequiredArgsConstructor
    @Slf4j
    public class KafkaConfig {
        private final KafkaConfigProperties kafkaConfigProperties;
        private final KafkaTopicProperties kafkaTopicProperties;
        private final EventMessageService eventMessageService;
        private final Gson gson;
      
        @Bean
        public ProducerFactory<String, DomainEvent> producerFactory() {
            Map<String, Object> props = new HashMap<>();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.kafkaConfigProperties.getBootstrapAddress());
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, this.kafkaConfigProperties.getProducerKeySerializer());
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, this.kafkaConfigProperties.getProducerValueSerializer());
      
            props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, this.kafkaConfigProperties.getEnableIdempotence()); 
            props.put(ProducerConfig.ACKS_CONFIG, this.kafkaConfigProperties.getAcks()); 
            props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, this.kafkaConfigProperties.getMaxInFlightRequestsPerConnection()); 
            props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, this.kafkaConfigProperties.getMaxBlockMs());
            props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, this.kafkaConfigProperties.getRequestTimeoutMs());
            props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, this.kafkaConfigProperties.getDeliveryTimeoutMs());
            props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, UUID.randomUUID().toString());
      
            return new DefaultKafkaProducerFactory<>(props);
        }
      
        @Bean
        public KafkaTemplate<String, DomainEvent> kafkaTemplate() {
            return new KafkaTemplate<>(producerFactory());
        }
      
        @Bean
        public ConsumerFactory<String, DomainEvent> consumerFactory() {
            Map<String, Object> props = new HashMap<>();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.kafkaConfigProperties.getBootstrapAddress());
            props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, this.kafkaConfigProperties.getSessionTimeoutMs());
            props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, this.kafkaConfigProperties.getMaxPollIntervalMs());
            props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, this.kafkaConfigProperties.getMaxPollRecords());
      
            props.put(ConsumerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString());
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, this.kafkaConfigProperties.getConsumerAutoOffsetReset());
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, this.kafkaConfigProperties.getConsumerKeyDeserializer());
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, this.kafkaConfigProperties.getConsumerValueDeserializer());
      
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, this.kafkaConfigProperties.getEnableAutoCommit());
            props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, this.kafkaConfigProperties.getIsolationLevel());
      
            JsonDeserializer<DomainEvent> deserializer = new JsonDeserializer<>(DomainEvent.class);
            deserializer.setRemoveTypeHeaders(false);
            deserializer.addTrustedPackages("*");
            deserializer.setUseTypeMapperForKey(true);
      
            return new DefaultKafkaConsumerFactory<>(props,
                    new StringDeserializer(),
                    deserializer);
        }
      
        @Bean
        public ConcurrentKafkaListenerContainerFactory<String, DomainEvent> kafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<String, DomainEvent> factory =
                    new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory());
            factory.setCommonErrorHandler(errorHandler());
            return factory;
        }
      
        @Bean
        public DefaultErrorHandler errorHandler() {
            BackOff fixedBackOff = new FixedBackOff(this.kafkaConfigProperties.getConsumerRetryIntervalMs(),
                    this.kafkaConfigProperties.getConsumerRetryMaxAttempts());
      
            DefaultErrorHandler errorHandler = new DefaultErrorHandler((consumerRecord, exception) -> {
                // 재시도 후에도 오류가 발생하면 이벤트를 database에 저장함.
                DomainEvent domainEvent = (DomainEvent) consumerRecord.value();
                this.eventMessageService.createEventMessage(consumerRecord.topic(), gson.toJson(consumerRecord.value()),
                        EventStatus.FAILURE, domainEvent.getFailCount()+1, EventType.SUB);
            }, fixedBackOff);
      
            errorHandler.defaultFalse(); //addNotRetryableExceptions에서 지정되지 않은 모든 예외가 재시도하지 않도록 함.
            errorHandler.addRetryableExceptions(SocketTimeoutException.class, ConsumerRetryException.class); // 재시도 가능 예외
      
            return errorHandler;
        }
    }
    
  • service

    @Transactional
    public void orderComplete(...) {
          
    	...
              
        this.applicationEventPublisher.publishEvent(DomainEvent.builder()
                            .eventName(this.kafkaTopicProperties.getOrderComplete().getName())
                            .eventPayload(OrderCompleteEvent.builder()
                                    .ordSeq(ordSeq)
                                    .mbrSeq(order.getMbrSeq())
                                    .shopSeq(shopSeq)
                                    .build())
                            .failCount(0)
                            .build());
    }
    
  • producer

    @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
    @Transactional
    @Async(value = "eventExecutor")
    public void produceEvent(DomainEvent event) {
      
        String payloadJson = gson.toJson(event);
      
        try {
            this.kafkaTemplate.send(event.getEventName(), event)
                .addCallback(new ListenableFutureCallback<>() {
                    @Override
                    public void onSuccess(SendResult<String, DomainEvent> result) {
                        log.info("produceEvent Sent message offset=[" + result.getRecordMetadata().offset() + "]");
                    }
      
                    @Override
                    public void onFailure(Throwable ex) {
                        createFailureEvent(event.getEventName(), payloadJson);
                    }
                });
        } catch (Exception e) {
            // 실패 로그 & db에 실패 이벤트를 저장하고 batch로 처리하기
            createFailureEvent(event.getEventName(), payloadJson);
        }
    }
      
    private void createFailureEvent(String eventName, Object eventPayload) {
        this.eventMessageService.createEventMessage(eventName, eventPayload, EventStatus.FAILURE, 1, EventType.PUB);
    }
    
  • consumer

    @Transactional
    @KafkaListener(topics = "${spring.kafka.topic.order-complete.name}", groupId = "order-complete-group", clientIdPrefix = "order-complete-")
    public void orderCompleteConsumer(DomainEvent event) {
        OrderCompleteEvent orderCompleteEvent = objectMapper.convertValue(event.getEventPayload(), OrderCompleteEvent.class);
     	...   
    }
              
    
    • topics: 구독할 토픽명
    • groupId: 구독 그룹명
    • clientIdPrefix: 여러개의 consumer를 구분하기 위한 값

Graceful Shutdown

spring graceful shutdown 시 poll()로 가져온 메시지를 처리를 완료 후 종료하게 된다.

2023-05-13 11:10:31.519  INFO 17708 --- [tomcat-shutdown] o.s.b.w.e.tomcat.GracefulShutdown        : Graceful shutdown complete


2023-05-13 11:10:34.045  INFO 17708 --- [ntainer#0-0-C-1] n.k.a.p.v.c.s.PaymentCancelService       : >>> End Received Messasge : test contents
2023-05-13 11:10:34.045  INFO 17708 --- [ntainer#0-0-C-1] n.k.a.p.v.c.s.PaymentCancelService       : >>> Start Received Messasge : test contents
2023-05-13 11:10:39.047  INFO 17708 --- [ntainer#0-0-C-1] n.k.a.p.v.c.s.PaymentCancelService       : >>> End Received Messasge : test contents
2023-05-13 11:10:39.047  INFO 17708 --- [ntainer#0-0-C-1] n.k.a.p.v.c.s.PaymentCancelService       : >>> Start Received Messasge : test contents
2023-05-13 11:10:44.048  INFO 17708 --- [ntainer#0-0-C-1] n.k.a.p.v.c.s.PaymentCancelService       : >>> End Received Messasge : test contents
2023-05-13 11:10:44.049  INFO 17708 --- [ntainer#0-0-C-1] n.k.a.p.v.c.s.PaymentCancelService       : >>> Start Received Messasge : test contents
2023-05-13 11:10:49.049  INFO 17708 --- [ntainer#0-0-C-1] n.k.a.p.v.c.s.PaymentCancelService       : >>> End Received Messasge : test contents


2023-05-13 11:10:49.082  INFO 17708 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : consumergroup: partitions revoked: [payment-cancel-topic-0]
3-05-13 11:10:49.083  INFO 17708 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=9af9bdc8-23cf-41be-887e-ff0aca6065bc-0, groupId=consumergroup] Member 9af9bdc8-23cf-41be-887e-ff0aca6065bc-0-96a796de-fe43-44db-955b-d564b465b915 sending LeaveGroup request to coordinator localhost:9092 (id: 2147483646 rack: null) due to the consumer unsubscribed from all topics
2023-05-13 11:10:49.087  INFO 17708 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=9af9bdc8-23cf-41be-887e-ff0aca6065bc-0, groupId=consumergroup] Resetting generation due to: consumer pro-actively leaving the group
2023-05-13 11:10:49.087  INFO 17708 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=9af9bdc8-23cf-41be-887e-ff0aca6065bc-0, groupId=consumergroup] Request joining group due to: consumer pro-actively leaving the group
2023-05-13 11:10:49.088  INFO 17708 --- [ntainer#0-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=9af9bdc8-23cf-41be-887e-ff0aca6065bc-0, groupId=consumergroup] Unsubscribed all topics or patterns and assigned partitions
2023-05-13 11:10:49.088  INFO 17708 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=9af9bdc8-23cf-41be-887e-ff0aca6065bc-0, groupId=consumergroup] Resetting generation due to: consumer pro-actively leaving the group
2023-05-13 11:10:49.088  INFO 17708 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=9af9bdc8-23cf-41be-887e-ff0aca6065bc-0, groupId=consumergroup] Request joining group due to: consumer pro-actively leaving the group
2023-05-13 11:10:49.105  INFO 17708 --- [ntainer#0-0-C-1] org.apache.kafka.common.metrics.Metrics  : Metrics scheduler closed
2023-05-13 11:10:49.105  INFO 17708 --- [ntainer#0-0-C-1] org.apache.kafka.common.metrics.Metrics  : Closing reporter org.apache.kafka.common.metrics.JmxReporter
2023-05-13 11:10:49.105  INFO 17708 --- [ntainer#0-0-C-1] org.apache.kafka.common.metrics.Metrics  : Metrics reporters closed
2023-05-13 11:10:49.113  INFO 17708 --- [ntainer#0-0-C-1] o.a.kafka.common.utils.AppInfoParser     : App info kafka.consumer for 9af9bdc8-23cf-41be-887e-ff0aca6065bc-0 unregistered
2023-05-13 11:10:49.114  INFO 17708 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : consumergroup: Consumer stopped

2023-05-13 11:10:52.367  INFO 17708 --- [ionShutdownHook] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown completed.

References

Kafka 공식 Document, Kafka 공식 v3.3 Document

Spring Kafka Rebalancing 처리

Kafka 구성 및 SpringBoot 연동

댓글남기기