Spring 프로젝트에 Kafka 사용하기
사용 버전
- Spring boot 2.6.6
- AdoptOpenJDK 11
- zookeeper 3.8.1
- kafka 3.3.2
Spring Boot에서 Kafka 적용
-
build.gradle에 kafka 추가
implementation 'org.springframework.kafka:spring-kafka'
-
application.yml
kafka: bootstrap-servers: kafka1:9092,kafka2:9092,kafka3:9092 properties: enable.idempotence: true # 멱등성 처리, retries > 0 and acks = all 필요. enable.auto.commit: false isolation.level: read_committed acks: all # default all, acks=-1도 all과 같음. max.in.flight.requests.per.connection: 5 # default 5, 단일 연결에서 보낼 최대 요청 수 max.block.ms: 5000 # default 60000(1분), Broker, 코디네이터 연결, 메타데이터 가져오기, 버퍼 할당 등 기다리는 총 시간 request.timeout.ms: 5000 # default 30000(30초), 클라이언트가 요청 응답을 기다리는 최대 시간, 시간 초과시 재시도 delivery.timeout.ms: 10000 # default 120000(2분), delivery.timeout.ms > (request.timeout.ms + linger.ms) session.timeout.ms: 60000 # default 45000(45초) max.poll.interval.ms: 180000 # default 300000(5분), 소비자 그룹 관리를 사용할 때 poll() 호출 사이의 최대 지연, # max.poll.records로 가져간 메시지들을 처리하는 시간이 max.poll.interval.ms을 넘으면 리밸런스가 일어남. max.poll.records: 5 # default 500, poll() 요청으로 가져올 수 있는 최대 레코드 수. consumer: auto-offset-reset: earliest # latest: 가장 최근에 생산된 메시지로 offeset reset # earliest: 가장 오래된 메시지로 offeset reset # none: offset 정보가 없으면 Exception 발생 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer retry: interval.ms: 1000 maxAttempts: 1 producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
-
kafka config
@EnableKafka @Configuration @RequiredArgsConstructor @Slf4j public class KafkaConfig { private final KafkaConfigProperties kafkaConfigProperties; private final KafkaTopicProperties kafkaTopicProperties; private final EventMessageService eventMessageService; private final Gson gson; @Bean public ProducerFactory<String, DomainEvent> producerFactory() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.kafkaConfigProperties.getBootstrapAddress()); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, this.kafkaConfigProperties.getProducerKeySerializer()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, this.kafkaConfigProperties.getProducerValueSerializer()); props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, this.kafkaConfigProperties.getEnableIdempotence()); props.put(ProducerConfig.ACKS_CONFIG, this.kafkaConfigProperties.getAcks()); props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, this.kafkaConfigProperties.getMaxInFlightRequestsPerConnection()); props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, this.kafkaConfigProperties.getMaxBlockMs()); props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, this.kafkaConfigProperties.getRequestTimeoutMs()); props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, this.kafkaConfigProperties.getDeliveryTimeoutMs()); props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, UUID.randomUUID().toString()); return new DefaultKafkaProducerFactory<>(props); } @Bean public KafkaTemplate<String, DomainEvent> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } @Bean public ConsumerFactory<String, DomainEvent> consumerFactory() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.kafkaConfigProperties.getBootstrapAddress()); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, this.kafkaConfigProperties.getSessionTimeoutMs()); props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, this.kafkaConfigProperties.getMaxPollIntervalMs()); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, this.kafkaConfigProperties.getMaxPollRecords()); props.put(ConsumerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, this.kafkaConfigProperties.getConsumerAutoOffsetReset()); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, this.kafkaConfigProperties.getConsumerKeyDeserializer()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, this.kafkaConfigProperties.getConsumerValueDeserializer()); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, this.kafkaConfigProperties.getEnableAutoCommit()); props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, this.kafkaConfigProperties.getIsolationLevel()); JsonDeserializer<DomainEvent> deserializer = new JsonDeserializer<>(DomainEvent.class); deserializer.setRemoveTypeHeaders(false); deserializer.addTrustedPackages("*"); deserializer.setUseTypeMapperForKey(true); return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), deserializer); } @Bean public ConcurrentKafkaListenerContainerFactory<String, DomainEvent> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, DomainEvent> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setCommonErrorHandler(errorHandler()); return factory; } @Bean public DefaultErrorHandler errorHandler() { BackOff fixedBackOff = new FixedBackOff(this.kafkaConfigProperties.getConsumerRetryIntervalMs(), this.kafkaConfigProperties.getConsumerRetryMaxAttempts()); DefaultErrorHandler errorHandler = new DefaultErrorHandler((consumerRecord, exception) -> { // 재시도 후에도 오류가 발생하면 이벤트를 database에 저장함. DomainEvent domainEvent = (DomainEvent) consumerRecord.value(); this.eventMessageService.createEventMessage(consumerRecord.topic(), gson.toJson(consumerRecord.value()), EventStatus.FAILURE, domainEvent.getFailCount()+1, EventType.SUB); }, fixedBackOff); errorHandler.defaultFalse(); //addNotRetryableExceptions에서 지정되지 않은 모든 예외가 재시도하지 않도록 함. errorHandler.addRetryableExceptions(SocketTimeoutException.class, ConsumerRetryException.class); // 재시도 가능 예외 return errorHandler; } }
-
service
@Transactional public void orderComplete(...) { ... this.applicationEventPublisher.publishEvent(DomainEvent.builder() .eventName(this.kafkaTopicProperties.getOrderComplete().getName()) .eventPayload(OrderCompleteEvent.builder() .ordSeq(ordSeq) .mbrSeq(order.getMbrSeq()) .shopSeq(shopSeq) .build()) .failCount(0) .build()); }
-
producer
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT) @Transactional @Async(value = "eventExecutor") public void produceEvent(DomainEvent event) { String payloadJson = gson.toJson(event); try { this.kafkaTemplate.send(event.getEventName(), event) .addCallback(new ListenableFutureCallback<>() { @Override public void onSuccess(SendResult<String, DomainEvent> result) { log.info("produceEvent Sent message offset=[" + result.getRecordMetadata().offset() + "]"); } @Override public void onFailure(Throwable ex) { createFailureEvent(event.getEventName(), payloadJson); } }); } catch (Exception e) { // 실패 로그 & db에 실패 이벤트를 저장하고 batch로 처리하기 createFailureEvent(event.getEventName(), payloadJson); } } private void createFailureEvent(String eventName, Object eventPayload) { this.eventMessageService.createEventMessage(eventName, eventPayload, EventStatus.FAILURE, 1, EventType.PUB); }
-
consumer
@Transactional @KafkaListener(topics = "${spring.kafka.topic.order-complete.name}", groupId = "order-complete-group", clientIdPrefix = "order-complete-") public void orderCompleteConsumer(DomainEvent event) { OrderCompleteEvent orderCompleteEvent = objectMapper.convertValue(event.getEventPayload(), OrderCompleteEvent.class); ... }
- topics: 구독할 토픽명
- groupId: 구독 그룹명
- clientIdPrefix: 여러개의 consumer를 구분하기 위한 값
Graceful Shutdown
spring graceful shutdown 시 poll()로 가져온 메시지를 처리를 완료 후 종료하게 된다.
2023-05-13 11:10:31.519 INFO 17708 --- [tomcat-shutdown] o.s.b.w.e.tomcat.GracefulShutdown : Graceful shutdown complete
2023-05-13 11:10:34.045 INFO 17708 --- [ntainer#0-0-C-1] n.k.a.p.v.c.s.PaymentCancelService : >>> End Received Messasge : test contents
2023-05-13 11:10:34.045 INFO 17708 --- [ntainer#0-0-C-1] n.k.a.p.v.c.s.PaymentCancelService : >>> Start Received Messasge : test contents
2023-05-13 11:10:39.047 INFO 17708 --- [ntainer#0-0-C-1] n.k.a.p.v.c.s.PaymentCancelService : >>> End Received Messasge : test contents
2023-05-13 11:10:39.047 INFO 17708 --- [ntainer#0-0-C-1] n.k.a.p.v.c.s.PaymentCancelService : >>> Start Received Messasge : test contents
2023-05-13 11:10:44.048 INFO 17708 --- [ntainer#0-0-C-1] n.k.a.p.v.c.s.PaymentCancelService : >>> End Received Messasge : test contents
2023-05-13 11:10:44.049 INFO 17708 --- [ntainer#0-0-C-1] n.k.a.p.v.c.s.PaymentCancelService : >>> Start Received Messasge : test contents
2023-05-13 11:10:49.049 INFO 17708 --- [ntainer#0-0-C-1] n.k.a.p.v.c.s.PaymentCancelService : >>> End Received Messasge : test contents
2023-05-13 11:10:49.082 INFO 17708 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : consumergroup: partitions revoked: [payment-cancel-topic-0]
3-05-13 11:10:49.083 INFO 17708 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=9af9bdc8-23cf-41be-887e-ff0aca6065bc-0, groupId=consumergroup] Member 9af9bdc8-23cf-41be-887e-ff0aca6065bc-0-96a796de-fe43-44db-955b-d564b465b915 sending LeaveGroup request to coordinator localhost:9092 (id: 2147483646 rack: null) due to the consumer unsubscribed from all topics
2023-05-13 11:10:49.087 INFO 17708 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=9af9bdc8-23cf-41be-887e-ff0aca6065bc-0, groupId=consumergroup] Resetting generation due to: consumer pro-actively leaving the group
2023-05-13 11:10:49.087 INFO 17708 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=9af9bdc8-23cf-41be-887e-ff0aca6065bc-0, groupId=consumergroup] Request joining group due to: consumer pro-actively leaving the group
2023-05-13 11:10:49.088 INFO 17708 --- [ntainer#0-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=9af9bdc8-23cf-41be-887e-ff0aca6065bc-0, groupId=consumergroup] Unsubscribed all topics or patterns and assigned partitions
2023-05-13 11:10:49.088 INFO 17708 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=9af9bdc8-23cf-41be-887e-ff0aca6065bc-0, groupId=consumergroup] Resetting generation due to: consumer pro-actively leaving the group
2023-05-13 11:10:49.088 INFO 17708 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=9af9bdc8-23cf-41be-887e-ff0aca6065bc-0, groupId=consumergroup] Request joining group due to: consumer pro-actively leaving the group
2023-05-13 11:10:49.105 INFO 17708 --- [ntainer#0-0-C-1] org.apache.kafka.common.metrics.Metrics : Metrics scheduler closed
2023-05-13 11:10:49.105 INFO 17708 --- [ntainer#0-0-C-1] org.apache.kafka.common.metrics.Metrics : Closing reporter org.apache.kafka.common.metrics.JmxReporter
2023-05-13 11:10:49.105 INFO 17708 --- [ntainer#0-0-C-1] org.apache.kafka.common.metrics.Metrics : Metrics reporters closed
2023-05-13 11:10:49.113 INFO 17708 --- [ntainer#0-0-C-1] o.a.kafka.common.utils.AppInfoParser : App info kafka.consumer for 9af9bdc8-23cf-41be-887e-ff0aca6065bc-0 unregistered
2023-05-13 11:10:49.114 INFO 17708 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : consumergroup: Consumer stopped
2023-05-13 11:10:52.367 INFO 17708 --- [ionShutdownHook] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Shutdown completed.
댓글남기기