Project/์‹นํ‹”์›€

[์‹นํ‹”์›€] 04/06 ๊ฐœ๋ฐœ์ผ์ง€ Kafka ๋„์ž…

ํ•œ33 2025. 4. 7. 11:39

๐Ÿ’ก ๋ชฉํ‘œ

์‹นํ‹”์›€ ์„œ๋ฒ„์— Kafka ๋ฅผ ๋„์ž…ํ•ด ๋ฉ”์ธ์„œ๋ฒ„์™€ ์•Œ๋ฆผ์„œ๋ฒ„ ์‚ฌ์ด์—์„œ ๋ฐ์ดํ„ฐ๋“ค์ด ์„ฑ๊ณต์ ์œผ๋กœ ์ฃผ๊ณ  ๋ฐ›์„ ์ˆ˜ ์žˆ๊ณ ์ž ํ–ˆ๋‹ค.

๐Ÿ“š ๋ฉ”์ธ์„œ๋ฒ„ Kafka ์„ค์ •

KafkaConfig

๋”๋ณด๊ธฐ
@Configuration
@EnableKafka
public class KafkaConfig {

    @Value("${app.kafka-brokers}")
    private String kafkaBrokers;

    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        Map<String, Object> producerConfig = new HashMap<>();
        producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokers);
        producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        producerConfig.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "org.apache.kafka.clients.producer.UniformStickyPartitioner");

        return new DefaultKafkaProducerFactory<>(producerConfig);
    }
    @Bean
    public ConsumerFactory<String, Object> consumerFactory() {
        Map<String, Object> myConfig = new HashMap<>();
        myConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokers);
        myConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        myConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
        myConfig.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
        myConfig.put(JsonDeserializer.TRUSTED_PACKAGES, "*");

        return new DefaultKafkaConsumerFactory<>(myConfig);
    }
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

KafkaConfig ํŒŒ์ผ์„ ํ†ตํ•ด ์ฃผ๊ณ  ๋ฐ›์€ ๋ฐ์ดํ„ฐ๋“ค์„ ์ง๋ ฌํ™”, ์—ญ์ง๋ ฌํ™”๋กœ ๋ณ€ํ™˜ํ•ด์ค€๋‹ค.

 

NotificationProducer

@Service
@RequiredArgsConstructor
public class NotificationProducer {

    private final KafkaTemplate<String, Object> kafkaTemplate;

    public void sendNotification(NotificationMessage message) {
        kafkaTemplate.send("notifications", message);
    }
}

 

topic ๋ช…์„ notifications ๋กœ ์„ค์ •ํ•˜๊ณ  message ๋ฅผ ์ „๋‹ฌํ•ด์ค€๋‹ค.

 

NotificationMessage

@Getter
@NoArgsConstructor
@AllArgsConstructor
public class NotificationMessage {
    private Long userId;
    private EventType eventType;
    private String message;
}

 

message ๋Š” ์œ„์ฒ˜๋Ÿผ ์„ค์ •ํ•ด์ฃผ์—ˆ๋‹ค.

 

testService

public String testKafka(Long userId) {
    notificationProducer.sendNotification(
            new NotificationMessage(userId,
                    EventType.FRIEND_REQUESTED,
                    "test"));
    return "์นดํ”„์นด ํ…Œ์ŠคํŠธ";
}

 

/v1/test ์—”๋“œํฌ์ธํŠธ๋กœ ์œ„์™€ ๊ฐ™์€ ์š”์ฒญ์„ ๋ณด๋‚ผ ์ˆ˜ ์žˆ๊ฒŒ ํ•ด ์—ฐ๊ฒฐ ํ…Œ์ŠคํŠธ๋ฅผ ํ•ด๋ดค๋‹ค.


Notification

@Slf4j
@Component
@RequiredArgsConstructor
public class KafkaListenerHandler {

    private final SseEmitterHandler sseEmitterHandler;
    private final NotificationRepository notificationRepository;

    @KafkaListener(topics = "notifications", groupId = "notification-group")
    public void consume(NotificationMessage message) {
        log.info("๐Ÿ“ฅ Received Kafka message: {}", message);
        String topic = "notifications-" + message.getUserId();
        String data = "EventType: " + message.getEventType() + ", Message: " + message.getMessage();
        Notification notification = new Notification(message.getUserId(), message.getEventType(), message.getMessage());
        log.info("notification DB ์ €์žฅ");
        notificationRepository.save(notification);
        sseEmitterHandler.broadcast(topic, data);
    }
}

 

์•Œ๋ฆผ์„œ๋ฒ„์—์„œ ๋งˆ์ฐฌ๊ฐ€์ง€๋กœ KafkaConfig ๋ฅผ ์„ค์ •ํ•ด์ฃผ๊ณ , topic ๋ช… notifications , ์ปจ์Šˆ๋จธ ๊ทธ๋ฃน id ๋Š” notification-group ์œผ๋กœ ์„ค์ •์„ ํ•ด์คฌ๋‹ค. ์ด๋ ‡๊ฒŒ ๋˜๋ฉด ๋ฉ”์ธ ์„œ๋ฒ„์—์„œ Kafka ์˜ notifications topic ์— ๋ฐ์ดํ„ฐ๊ฐ€ ๋“ค์–ด๊ฐ€๋ฉด ์ž๋™์œผ๋กœ ๊ทธ ๋ฐ์ดํ„ฐ๋ฅผ ๋ถˆ๋Ÿฌ์˜จ๋‹ค.

์œ„์™€ ๊ฐ™์ด ์š”์ฒญ์„ ๋ณด๋ƒˆ๋‹ค. ( 8080 ํฌํŠธ )

์š”์ฒญ ์™„๋ฃŒ

ํ™•์ธ์„ ์œ„ํ•ด ์ด๋ฅผ notification Repository DB ์— ์ €์žฅ์‹œ์ผฐ๊ณ  ์œ„์™€ ๊ฐ™์ด ํ™•์ธ์ด ๊ฐ€๋Šฅํ–ˆ๋‹ค.