Выбрать главу

package tacos.messaging;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.kafka.core.KafkaTemplate;

import org.springframework.stereotype.Service;

@Service

public class KafkaOrderMessagingService implements OrderMessagingService {

    private KafkaTemplate<String, Order> kafkaTemplate;

    @Autowired

    public KafkaOrderMessagingService(

        KafkaTemplate<String, Order> kafkaTemplate) {

        this.kafkaTemplate = kafkaTemplate;

    }

    @Override

    public void sendOrder(Order order) {

        kafkaTemplate.send("tacocloud.orders.topic", order);

    }

}

В этой новой реализации OrderMessagingService метод sendOrder() использует метод send() внедренного KafkaTemplate для отправки Order в тему с именем tacocloud.orders.topic. За исключением слова «Кафка», разбросанного по коду, оно не сильно отличается от кода, который вы написали для JMS и Rabbit.

Если задать тему по умолчанию, можно немного упростить метод sendOrder(). Во-первых, установить тему по умолчанию в tacocloud.orders.topic, установив свойство spring.kafka.template.default-topic:

spring:

    kafka:

        template:

            default-topic: tacocloud.orders.topic

Затем в методе sendOrder() вы можете вызвать sendDefault() вместо send() и не указывать имя темы:

@Override

public void sendOrder(Order order) {

    kafkaTemplate.sendDefault(order);

}

Теперь, когда ваш код для отправки сообщений был написан, давайте обратим наше внимание на написание кода, который будет получать эти сообщения от Kafka.

8.3.3 Написание Kafka листинеров

Помимо уникальных сигнатур методов для send() и sendDefault(), KafkaTemplate отличается от JmsTemplate и RabbitTemplate тем, что не предлагает никаких методов для получения сообщений. Это означает, что единственный способ использовать сообщения из темы Kafka с помощью Spring - написать листинер сообщений.

Для Kafka листинеры сообщений определяются как методы, аннотированные @KafkaListener. Аннотация @KafkaListener примерно аналогична @JmsListener и @RabbitListener и используется практически одинаково. В следующем листинге показано, как может выглядеть получатель заказа на основе слушателя, если он написан для Kafka.

Листинг 8.9. Получение заказов с помощью @KafkaListener

package tacos.kitchen.messaging.kafka.listener;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.kafka.annotation.KafkaListener;

import org.springframework.stereotype.Component;

import tacos.Order;

import tacos.kitchen.KitchenUI;

@Component

public class OrderListener {

    private KitchenUI ui;

    @Autowired

    public OrderListener(KitchenUI ui) {

        this.ui = ui;

    }

    @KafkaListener(topics="tacocloud.orders.topic")

    public void handle(Order order) {

        ui.displayOrder(order);

    }

}

Метод handle() имеет аннотацию @KafkaListener, чтобы указать, что его следует вызывать при поступлении сообщения в теме с именем tacocloud.orders.topic. Как написано в листинге 8.9, для handle() предоставляется только Order (полезная нагрузка). Но если вам нужны дополнительные метаданные из сообщения, он также может принять объект ConsumerRecord или Message.

Например, следующая реализация handle() принимает ConsumerRecord, чтобы вы могли регистрировать раздел и метку времени сообщения:

@KafkaListener(topics="tacocloud.orders.topic")

public void handle(Order order, ConsumerRecord<Order> record) {

    log.info("Received from partition {} with timestamp {}",

        record.partition(), record.timestamp());

    ui.displayOrder(order);

}

Точно так же вы можете запросить Message вместо ConsumerRecord и достичь того же:

@KafkaListener(topics="tacocloud.orders.topic")

public void handle(Order order, Message<Order> message) {

    MessageHeaders headers = message.getHeaders();

    log.info("Received from partition {} with timestamp {}",

        headers.get(KafkaHeaders.RECEIVED_PARTITION_ID)

        headers.get(KafkaHeaders.RECEIVED_TIMESTAMP));

    ui.displayOrder(order);

}

Стоит отметить, что полезная нагрузка сообщения также доступна через ConsumerRecord.value() или Message.getPayload(). Это означает, что вы можете запросить Order через эти объекты вместо того, чтобы запрашивать его напрямую как параметр handle().

ИТОГ

-Асинхронный обмен сообщениями обеспечивает уровень перенаправления между взаимодействующими приложениями, что обеспечивает более слабую связь и большую масштабируемость.

-Spring поддерживает асинхронный обмен сообщениями с JMS, RabbitMQ или Apache Kafka.

-Приложения могут использовать основанные на шаблонах клиенты (JmsTemplate, RabbitTemplate или KafkaTemplate) для отправки сообщений через брокер сообщений.

-Приложения могут использовать сообщения в модели на основе запросов, используя те же клиенты на основе шаблонов.