Выбрать главу

9.2.1 Каналы сообщений

Каналы сообщений - это средства, с помощью которых сообщения проходят через интеграционный конвейер (рисунок 9.2). Это трубы, которые соединяют между собой все остальные части Spring Integration.

Рисунок 9.2 Каналы сообщений - это каналы, по которым потоки данных проходят между другими компонентами в потоке интеграции.

Spring Integration предоставляет несколько реализаций канала, включая следующие:

-PublishSubscribeChannel—Сообщения, опубликованные в PublishSubscribeChannel, передаются одному или нескольким потребителям. Если есть несколько потребителей, все они получают сообщение.

-QueueChannel—Сообщения, опубликованные в QueueChannel, сохраняются в очереди до тех пор, пока пользователь не извлечет их в порядке поступления (FIFO). Если есть несколько потребителей, только один из них получает сообщение.

-PriorityChannel—Как и QueueChannel, но вместо поведения FIFO сообщения извлекаются потребителями на основе заголовка приоритета сообщения.

-RendezvousChannel—Как и QueueChannel, за исключением того, что отправитель блокирует канал до тех пор, пока потребитель не получит сообщение, эффективно синхронизируя отправителя с потребителем.

-DirectChannel - Подобно PublishSubscribeChannel, но отправляет сообщение одному потребителю, вызывая потребителя в том же потоке, что и отправитель. Это позволяет транзакциям распространяться по каналу.

-ExecutorChannel - аналогичен DirectChannel, но отправка сообщения происходит через TaskExecutor, происходящий в отдельном потоке от отправителя. Этот тип канала не поддерживает транзакции, которые охватывают канал.

-FluxMessageChannel - канал сообщений Reactive Streams Publisher, основанный на Flux в Project Reactor. (Мы поговорим подробнее о Reactive Streams, Reactor и Flux в главе 10.)

Как в конфигурации Java, так и в стилях Java DSL входные каналы создаются автоматически с DirectChannel по умолчанию. Но если вы хотите использовать другую реализацию канала, вам необходимо явно объявить канал как bean-компонент и сослаться на него в потоке интеграции. Например, чтобы объявить PublishSubscribeChannel, вы должны объявить следующий метод @Bean:

@Bean

public MessageChannel orderChannel() {

    return new PublishSubscribeChannel();

}

Тогда вы будете ссылаться на этот канал по имени в определении потока интеграции. Например, если канал использовался bean-компонентом-активатором службы, вы бы ссылались на него в атрибуте inputChannel @ServiceActivator:

@ServiceActivator(inputChannel="orderChannel")

Или, если вы используете стиль конфигурации JAVA DSL, вы должны ссылаться на него с помощью вызова channel():

@Beanpublic

IntegrationFlow orderFlow() {

    return IntegrationFlows

        …

        .channel("orderChannel")

        …

        .get();

}

Важно отметить, что если вы используете QueueChannel, потребители должны быть настроены с помощью средства опроса. Например, предположим, что вы объявили QueueChannel bean следующим образом:

@Bean

public MessageChannel orderChannel() {

    return new QueueChannel();

}

Вам нужно будет убедиться, что потребитель настроен на опрос канала для сообщений. В случае службы активатора аннотация @ServiceActivator может выглядеть следующим образом:

@ServiceActivator(inputChannel="orderChannel", poller=@Poller(fixedRate="1000"))

В этом примере служба активатора опрашивает канал с именем orderChannelevery каждую 1 секунду (или 1000 мс).

9.2.2 Фильтры

Фильтры могут быть размещены в середине конвейера интеграции, чтобы разрешить или запретить переход сообщений к следующему шагу в потоке (рис.9.3).

Рисунок 9.3 Фильтры, основанные на некоторых критериях, разрешают или запрещают передачу сообщений в конвейере.

Например, предположим, что сообщения, содержащие целочисленные значения, публикуются через канал с именем numberChannel, но вы хотите, чтобы на канал с именем evenNumberChannel передавались только четные числа. В этом случае вы можете объявить фильтр с аннотацией @Filter следующим образом:

@Filter(inputChannel="numberChannel", outputChannel="evenNumberChannel")

public boolean evenNumberFilter(Integer number) {

    return number % 2 == 0;

}

Кроме того, если вы используете стиль конфигурации JAVA DSL для определения потока интеграции, вы можете сделать вызов filter() следующим образом:

@Bean

public IntegrationFlow evenNumberFlow(AtomicInteger integerSource) {

    return IntegrationFlows

    …

    .<Integer>filter((p) -> p % 2 == 0)

    …

    .get();

}

В этом случае для реализации фильтра используется лямбда. Но, по правде говоря, метод filter() принимает GenericSelector в качестве аргумента. Это означает, что вы можете реализовать интерфейс GenericSelector вместо этого, если ваша фильтрация слишком сложна для простой лямбды.