Выбрать главу

<dependency>

    <groupId>org.springframework.kafka</groupId>

    <artifactId>spring-kafka</artifactId>

</dependency>

Эта зависимость вносит все, что нужно для Кафки в проект. Более того, её присутствие инициирует автоконфигурацию Spring Boot для Kafka, которая, помимо прочего, организует KafkaTemplate в контексте приложения Spring. Все, что вам нужно сделать, это заинжектить KafkaTemplate и приступить к отправке и получению сообщений.

Однако прежде чем приступить к отправке и получению сообщений, вы должны знать о некоторых свойствах, которые пригодятся при работе с Kafka. В частности, KafkaTemplate по умолчанию работает с брокером Kafka на локальном хосте, прослушивая порт 9092. Это нормально, если брокер Kafka запускается локально при разработке приложения, но когда придет время перейти к продакшену, вам потребуется настроить другой хост и порт.

Свойство spring.kafka.bootstrap-servers задает расположение одного или нескольких серверов Kafka, используемых для установления начального соединения с кластером Kafka. Например, если один из серверов Kafka в кластере работает kafka.tacocloud.com и прослушивая порт 9092, вы можете настроить его местоположение в YAML следующим образом:

spring:

    kafka:

        bootstrap-servers:

            - kafka.tacocloud.com:9092

Но обратите внимание spring.kafka.bootstrap-servers - является множественным числом и принимает список. Таким образом, вы можете предоставить ему несколько серверов Kafka в кластере:

spring:

    kafka:

        bootstrap-servers:

            - kafka.tacocloud.com:9092

            - kafka.tacocloud.com:9093

            - kafka.tacocloud.com:9094

С настройкой Kafka в вашем проекте вы готовы отправлять и получать сообщения. Начнем с отправки объектов Order в Kafka с помощью KafkaTemplate.

8.3.2 Отправка сообщений с помощью KafkaTemplate

Во многих отношениях KafkaTemplate похож на своих аналогов в JMS и RabbitMQ. В то же время, это совсем другое. Это становится очевидным, когда мы рассмотрим методы отправки сообщений:

ListenableFuture<SendResult<K, V>> send(String topic, V data);

ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);

ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);

ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);

ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);

ListenableFuture<SendResult<K, V>> send(Message<?> message);

ListenableFuture<SendResult<K, V>> sendDefault(V data);

ListenableFuture<SendResult<K, V>> sendDefault(K key, V data);

ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);

ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);

Первое, что вы могли заметить, это то, что нет методов convertAndSend(). Это потому, что KafkaTemplate написана с помощью дженериков и может работать с типами доменов непосредственно при отправке сообщений. В некотором смысле, все методы send() выполняют функцию convertAndSend ().

Вы также могли заметить, что есть несколько параметров send() и sendDefaul (), которые сильно отличаются от тех, которые вы использовали с JMS и Rabbit. При отправке сообщений в Kafka вы можете указать следующие параметры, которые будут определять способ отправки сообщения:

-topic - тема для отправки сообщения (требуется для send())

-partition - раздел для записи topic (необязательно)

-key - ключ для отправки на запись (необязательно)

-timestamp - время (необязательно; по умолчанию используется System.currentTimeMillis())

-полезные данные (полезная нагрузка) (обязательно)

Тема и полезные данные являются двумя наиболее важными параметрами. Разделы и ключи мало влияют на то, как вы используете KafkaTemplate, кроме дополнительной информации, предоставляемой в качестве параметров для send() и sendDefault(). Для наших целей мы собираемся сосредоточиться на отправке полезной нагрузки сообщения в заданную тему и не беспокоиться о разделах и ключах.

Для метода send() вы также можете выбрать отправку ProducerRecord, который является немногим больше, чем тип, который содержит все предыдущие параметры в одном объекте. Вы также можете отправить объект Message, но для этого потребуется преобразовать ваши доменные объекты в Message. Как правило, проще использовать один из других методов, чем создавать и отправлять объект ProducerRecord или Message.

Используя KafkaTemplate и его метод send(), вы можете написать реализацию OrderMessagingService на основе Kafka. Следующий листинг показывает, как может выглядеть такая реализация.

Листинг 8.8. Отправка заказов с помощью KafkaTemplate