9.2.1 Каналы сообщений
Каналы сообщений - это средства, с помощью которых сообщения проходят через интеграционный конвейер (рисунок 9.2). Это трубы, которые соединяют между собой все остальные части Spring Integration.
Рисунок 9.2 Каналы сообщений - это каналы, по которым потоки данных проходят между другими компонентами в потоке интеграции.
Spring Integration предоставляет несколько реализаций канала, включая следующие:
-PublishSubscribeChannel—Сообщения, опубликованные в PublishSubscribeChannel, передаются одному или нескольким потребителям. Если есть несколько потребителей, все они получают сообщение.
-QueueChannel—Сообщения, опубликованные в QueueChannel, сохраняются в очереди до тех пор, пока пользователь не извлечет их в порядке поступления (FIFO). Если есть несколько потребителей, только один из них получает сообщение.
-PriorityChannel—Как и QueueChannel, но вместо поведения FIFO сообщения извлекаются потребителями на основе заголовка приоритета сообщения.
-RendezvousChannel—Как и QueueChannel, за исключением того, что отправитель блокирует канал до тех пор, пока потребитель не получит сообщение, эффективно синхронизируя отправителя с потребителем.
-DirectChannel - Подобно PublishSubscribeChannel, но отправляет сообщение одному потребителю, вызывая потребителя в том же потоке, что и отправитель. Это позволяет транзакциям распространяться по каналу.
-ExecutorChannel - аналогичен DirectChannel, но отправка сообщения происходит через TaskExecutor, происходящий в отдельном потоке от отправителя. Этот тип канала не поддерживает транзакции, которые охватывают канал.
-FluxMessageChannel - канал сообщений Reactive Streams Publisher, основанный на Flux в Project Reactor. (Мы поговорим подробнее о Reactive Streams, Reactor и Flux в главе 10.)
Как в конфигурации Java, так и в стилях Java DSL входные каналы создаются автоматически с DirectChannel по умолчанию. Но если вы хотите использовать другую реализацию канала, вам необходимо явно объявить канал как bean-компонент и сослаться на него в потоке интеграции. Например, чтобы объявить PublishSubscribeChannel, вы должны объявить следующий метод @Bean:
@Bean
public MessageChannel orderChannel() {
return new PublishSubscribeChannel();
}
Тогда вы будете ссылаться на этот канал по имени в определении потока интеграции. Например, если канал использовался bean-компонентом-активатором службы, вы бы ссылались на него в атрибуте inputChannel @ServiceActivator:
@ServiceActivator(inputChannel="orderChannel")
Или, если вы используете стиль конфигурации JAVA DSL, вы должны ссылаться на него с помощью вызова channel():
@Beanpublic
IntegrationFlow orderFlow() {
return IntegrationFlows
…
.channel("orderChannel")
…
.get();
}
Важно отметить, что если вы используете QueueChannel, потребители должны быть настроены с помощью средства опроса. Например, предположим, что вы объявили QueueChannel bean следующим образом:
@Bean
public MessageChannel orderChannel() {
return new QueueChannel();
}
Вам нужно будет убедиться, что потребитель настроен на опрос канала для сообщений. В случае службы активатора аннотация @ServiceActivator может выглядеть следующим образом:
@ServiceActivator(inputChannel="orderChannel", poller=@Poller(fixedRate="1000"))
В этом примере служба активатора опрашивает канал с именем orderChannelevery каждую 1 секунду (или 1000 мс).
9.2.2 Фильтры
Фильтры могут быть размещены в середине конвейера интеграции, чтобы разрешить или запретить переход сообщений к следующему шагу в потоке (рис.9.3).
Рисунок 9.3 Фильтры, основанные на некоторых критериях, разрешают или запрещают передачу сообщений в конвейере.
Например, предположим, что сообщения, содержащие целочисленные значения, публикуются через канал с именем numberChannel, но вы хотите, чтобы на канал с именем evenNumberChannel передавались только четные числа. В этом случае вы можете объявить фильтр с аннотацией @Filter следующим образом:
@Filter(inputChannel="numberChannel", outputChannel="evenNumberChannel")
public boolean evenNumberFilter(Integer number) {
return number % 2 == 0;
}
Кроме того, если вы используете стиль конфигурации JAVA DSL для определения потока интеграции, вы можете сделать вызов filter() следующим образом:
@Bean
public IntegrationFlow evenNumberFlow(AtomicInteger integerSource) {
return IntegrationFlows
…
.<Integer>filter((p) -> p % 2 == 0)
…
.get();
}
В этом случае для реализации фильтра используется лямбда. Но, по правде говоря, метод filter() принимает GenericSelector в качестве аргумента. Это означает, что вы можете реализовать интерфейс GenericSelector вместо этого, если ваша фильтрация слишком сложна для простой лямбды.