<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
Эта зависимость вносит все, что нужно для Кафки в проект. Более того, её присутствие инициирует автоконфигурацию Spring Boot для Kafka, которая, помимо прочего, организует KafkaTemplate в контексте приложения Spring. Все, что вам нужно сделать, это заинжектить KafkaTemplate и приступить к отправке и получению сообщений.
Однако прежде чем приступить к отправке и получению сообщений, вы должны знать о некоторых свойствах, которые пригодятся при работе с Kafka. В частности, KafkaTemplate по умолчанию работает с брокером Kafka на локальном хосте, прослушивая порт 9092. Это нормально, если брокер Kafka запускается локально при разработке приложения, но когда придет время перейти к продакшену, вам потребуется настроить другой хост и порт.
Свойство spring.kafka.bootstrap-servers задает расположение одного или нескольких серверов Kafka, используемых для установления начального соединения с кластером Kafka. Например, если один из серверов Kafka в кластере работает kafka.tacocloud.com и прослушивая порт 9092, вы можете настроить его местоположение в YAML следующим образом:
spring:
kafka:
bootstrap-servers:
- kafka.tacocloud.com:9092
Но обратите внимание spring.kafka.bootstrap-servers - является множественным числом и принимает список. Таким образом, вы можете предоставить ему несколько серверов Kafka в кластере:
spring:
kafka:
bootstrap-servers:
- kafka.tacocloud.com:9092
- kafka.tacocloud.com:9093
- kafka.tacocloud.com:9094
С настройкой Kafka в вашем проекте вы готовы отправлять и получать сообщения. Начнем с отправки объектов Order в Kafka с помощью KafkaTemplate.
8.3.2 Отправка сообщений с помощью KafkaTemplate
Во многих отношениях KafkaTemplate похож на своих аналогов в JMS и RabbitMQ. В то же время, это совсем другое. Это становится очевидным, когда мы рассмотрим методы отправки сообщений:
ListenableFuture<SendResult<K, V>> send(String topic, V data);
ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);
ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);
ListenableFuture<SendResult<K, V>> send(Message<?> message);
ListenableFuture<SendResult<K, V>> sendDefault(V data);
ListenableFuture<SendResult<K, V>> sendDefault(K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);
Первое, что вы могли заметить, это то, что нет методов convertAndSend(). Это потому, что KafkaTemplate написана с помощью дженериков и может работать с типами доменов непосредственно при отправке сообщений. В некотором смысле, все методы send() выполняют функцию convertAndSend ().
Вы также могли заметить, что есть несколько параметров send() и sendDefaul (), которые сильно отличаются от тех, которые вы использовали с JMS и Rabbit. При отправке сообщений в Kafka вы можете указать следующие параметры, которые будут определять способ отправки сообщения:
-topic - тема для отправки сообщения (требуется для send())
-partition - раздел для записи topic (необязательно)
-key - ключ для отправки на запись (необязательно)
-timestamp - время (необязательно; по умолчанию используется System.currentTimeMillis())
-полезные данные (полезная нагрузка) (обязательно)
Тема и полезные данные являются двумя наиболее важными параметрами. Разделы и ключи мало влияют на то, как вы используете KafkaTemplate, кроме дополнительной информации, предоставляемой в качестве параметров для send() и sendDefault(). Для наших целей мы собираемся сосредоточиться на отправке полезной нагрузки сообщения в заданную тему и не беспокоиться о разделах и ключах.
Для метода send() вы также можете выбрать отправку ProducerRecord, который является немногим больше, чем тип, который содержит все предыдущие параметры в одном объекте. Вы также можете отправить объект Message, но для этого потребуется преобразовать ваши доменные объекты в Message. Как правило, проще использовать один из других методов, чем создавать и отправлять объект ProducerRecord или Message.
Используя KafkaTemplate и его метод send(), вы можете написать реализацию OrderMessagingService на основе Kafka. Следующий листинг показывает, как может выглядеть такая реализация.
Листинг 8.8. Отправка заказов с помощью KafkaTemplate