package tacos.messaging;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaOrderMessagingService implements OrderMessagingService {
private KafkaTemplate<String, Order> kafkaTemplate;
@Autowired
public KafkaOrderMessagingService(
KafkaTemplate<String, Order> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
@Override
public void sendOrder(Order order) {
kafkaTemplate.send("tacocloud.orders.topic", order);
}
}
В этой новой реализации OrderMessagingService метод sendOrder() использует метод send() внедренного KafkaTemplate для отправки Order в тему с именем tacocloud.orders.topic. За исключением слова «Кафка», разбросанного по коду, оно не сильно отличается от кода, который вы написали для JMS и Rabbit.
Если задать тему по умолчанию, можно немного упростить метод sendOrder(). Во-первых, установить тему по умолчанию в tacocloud.orders.topic, установив свойство spring.kafka.template.default-topic:
spring:
kafka:
template:
default-topic: tacocloud.orders.topic
Затем в методе sendOrder() вы можете вызвать sendDefault() вместо send() и не указывать имя темы:
@Override
public void sendOrder(Order order) {
kafkaTemplate.sendDefault(order);
}
Теперь, когда ваш код для отправки сообщений был написан, давайте обратим наше внимание на написание кода, который будет получать эти сообщения от Kafka.
8.3.3 Написание Kafka листинеров
Помимо уникальных сигнатур методов для send() и sendDefault(), KafkaTemplate отличается от JmsTemplate и RabbitTemplate тем, что не предлагает никаких методов для получения сообщений. Это означает, что единственный способ использовать сообщения из темы Kafka с помощью Spring - написать листинер сообщений.
Для Kafka листинеры сообщений определяются как методы, аннотированные @KafkaListener. Аннотация @KafkaListener примерно аналогична @JmsListener и @RabbitListener и используется практически одинаково. В следующем листинге показано, как может выглядеть получатель заказа на основе слушателя, если он написан для Kafka.
Листинг 8.9. Получение заказов с помощью @KafkaListener
package tacos.kitchen.messaging.kafka.listener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import tacos.Order;
import tacos.kitchen.KitchenUI;
@Component
public class OrderListener {
private KitchenUI ui;
@Autowired
public OrderListener(KitchenUI ui) {
this.ui = ui;
}
@KafkaListener(topics="tacocloud.orders.topic")
public void handle(Order order) {
ui.displayOrder(order);
}
}
Метод handle() имеет аннотацию @KafkaListener, чтобы указать, что его следует вызывать при поступлении сообщения в теме с именем tacocloud.orders.topic. Как написано в листинге 8.9, для handle() предоставляется только Order (полезная нагрузка). Но если вам нужны дополнительные метаданные из сообщения, он также может принять объект ConsumerRecord или Message.
Например, следующая реализация handle() принимает ConsumerRecord, чтобы вы могли регистрировать раздел и метку времени сообщения:
@KafkaListener(topics="tacocloud.orders.topic")
public void handle(Order order, ConsumerRecord<Order> record) {
log.info("Received from partition {} with timestamp {}",
record.partition(), record.timestamp());
ui.displayOrder(order);
}
Точно так же вы можете запросить Message вместо ConsumerRecord и достичь того же:
@KafkaListener(topics="tacocloud.orders.topic")
public void handle(Order order, Message<Order> message) {
MessageHeaders headers = message.getHeaders();
log.info("Received from partition {} with timestamp {}",
headers.get(KafkaHeaders.RECEIVED_PARTITION_ID)
headers.get(KafkaHeaders.RECEIVED_TIMESTAMP));
ui.displayOrder(order);
}
Стоит отметить, что полезная нагрузка сообщения также доступна через ConsumerRecord.value() или Message.getPayload(). Это означает, что вы можете запросить Order через эти объекты вместо того, чтобы запрашивать его напрямую как параметр handle().
ИТОГ
-Асинхронный обмен сообщениями обеспечивает уровень перенаправления между взаимодействующими приложениями, что обеспечивает более слабую связь и большую масштабируемость.
-Spring поддерживает асинхронный обмен сообщениями с JMS, RabbitMQ или Apache Kafka.
-Приложения могут использовать основанные на шаблонах клиенты (JmsTemplate, RabbitTemplate или KafkaTemplate) для отправки сообщений через брокер сообщений.
-Приложения могут использовать сообщения в модели на основе запросов, используя те же клиенты на основе шаблонов.