๐ก ๋ชฉํ
์นํ์ ์๋ฒ์ Kafka ๋ฅผ ๋์ ํด ๋ฉ์ธ์๋ฒ์ ์๋ฆผ์๋ฒ ์ฌ์ด์์ ๋ฐ์ดํฐ๋ค์ด ์ฑ๊ณต์ ์ผ๋ก ์ฃผ๊ณ ๋ฐ์ ์ ์๊ณ ์ ํ๋ค.
๐ ๋ฉ์ธ์๋ฒ Kafka ์ค์
KafkaConfig
@Configuration
@EnableKafka
public class KafkaConfig {
@Value("${app.kafka-brokers}")
private String kafkaBrokers;
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> producerConfig = new HashMap<>();
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokers);
producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
producerConfig.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "org.apache.kafka.clients.producer.UniformStickyPartitioner");
return new DefaultKafkaProducerFactory<>(producerConfig);
}
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
Map<String, Object> myConfig = new HashMap<>();
myConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokers);
myConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
myConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
myConfig.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
myConfig.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
return new DefaultKafkaConsumerFactory<>(myConfig);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
KafkaConfig ํ์ผ์ ํตํด ์ฃผ๊ณ ๋ฐ์ ๋ฐ์ดํฐ๋ค์ ์ง๋ ฌํ, ์ญ์ง๋ ฌํ๋ก ๋ณํํด์ค๋ค.
NotificationProducer
@Service
@RequiredArgsConstructor
public class NotificationProducer {
private final KafkaTemplate<String, Object> kafkaTemplate;
public void sendNotification(NotificationMessage message) {
kafkaTemplate.send("notifications", message);
}
}
topic ๋ช ์ notifications ๋ก ์ค์ ํ๊ณ message ๋ฅผ ์ ๋ฌํด์ค๋ค.
NotificationMessage
@Getter
@NoArgsConstructor
@AllArgsConstructor
public class NotificationMessage {
private Long userId;
private EventType eventType;
private String message;
}
message ๋ ์์ฒ๋ผ ์ค์ ํด์ฃผ์๋ค.
testService
public String testKafka(Long userId) {
notificationProducer.sendNotification(
new NotificationMessage(userId,
EventType.FRIEND_REQUESTED,
"test"));
return "์นดํ์นด ํ
์คํธ";
}
/v1/test ์๋ํฌ์ธํธ๋ก ์์ ๊ฐ์ ์์ฒญ์ ๋ณด๋ผ ์ ์๊ฒ ํด ์ฐ๊ฒฐ ํ ์คํธ๋ฅผ ํด๋ดค๋ค.
Notification
@Slf4j
@Component
@RequiredArgsConstructor
public class KafkaListenerHandler {
private final SseEmitterHandler sseEmitterHandler;
private final NotificationRepository notificationRepository;
@KafkaListener(topics = "notifications", groupId = "notification-group")
public void consume(NotificationMessage message) {
log.info("๐ฅ Received Kafka message: {}", message);
String topic = "notifications-" + message.getUserId();
String data = "EventType: " + message.getEventType() + ", Message: " + message.getMessage();
Notification notification = new Notification(message.getUserId(), message.getEventType(), message.getMessage());
log.info("notification DB ์ ์ฅ");
notificationRepository.save(notification);
sseEmitterHandler.broadcast(topic, data);
}
}
์๋ฆผ์๋ฒ์์ ๋ง์ฐฌ๊ฐ์ง๋ก KafkaConfig ๋ฅผ ์ค์ ํด์ฃผ๊ณ , topic ๋ช notifications , ์ปจ์๋จธ ๊ทธ๋ฃน id ๋ notification-group ์ผ๋ก ์ค์ ์ ํด์คฌ๋ค. ์ด๋ ๊ฒ ๋๋ฉด ๋ฉ์ธ ์๋ฒ์์ Kafka ์ notifications topic ์ ๋ฐ์ดํฐ๊ฐ ๋ค์ด๊ฐ๋ฉด ์๋์ผ๋ก ๊ทธ ๋ฐ์ดํฐ๋ฅผ ๋ถ๋ฌ์จ๋ค.
์์ ๊ฐ์ด ์์ฒญ์ ๋ณด๋๋ค. ( 8080 ํฌํธ )
์์ฒญ ์๋ฃ
ํ์ธ์ ์ํด ์ด๋ฅผ notification Repository DB ์ ์ ์ฅ์์ผฐ๊ณ ์์ ๊ฐ์ด ํ์ธ์ด ๊ฐ๋ฅํ๋ค.