Выбрать главу

Хотя subscribeOn() называется очень похоже на subscribe(), но они совершенно разные. В то время как subscribe() - это глагол, подписывающийся на реактивный поток и эффективно запускающий его, subscribeOn() - является более описательной, определяя, как подписка должна обрабатываться параллельно. Reactor не навязывает какую-либо конкретную модель параллелизма; с помощью subscribeOn() вы можете указать модель параллелизма, используя один из статических методов из планировщиков, который вы хотите использовать. В этом примере вы использовали parallel(), которая использует рабочие потоки из фиксированного пула (размер которого соответствует числу ядер ЦП). Но планировщики поддерживают несколько моделей параллелизма, например, описанных в таблице 10.1.

Таблица 10.1 Модели параллелизма для планировщиков (Schedulers)

Метод планировщика: Описание

.immediate() - Выполняет подписку в текущем потоке.

.single() - Выполняет подписку в одном многоразовом потоке. Повторно использует один и тот же поток для всех абонентов.

.newSingle() - Выполняет подписку в выделенном потоке для каждого вызова.

.elastic() - Выполняет подписку в в рабочем пуле из неограниченного эластичного пула. Новые рабочие потоки создаются по мере необходимости, а простаивающие рабочие потоки удаляются (по умолчанию через 60 секунд).

.parallel() - Выполняет подписку в рабочем пуле из из пула фиксированного размера, размер которого соответствует числу ядер ЦП.

Преимуществом использования flatMap() и subscribeOn() является то, что вы можете увеличить пропускную способность потока, разделив работу между несколькими параллельными потоками. Но поскольку работа выполняется параллельно, без гарантий того, что будет постоянный порядок выполнения потоков, невозможно определить порядок элементов, передаваемых в результирующий поток. Таким образом, StepVerifier может только проверить, что каждый исходящий элемент существует в ожидаемом списке объектов Player и что до завершения потока будет три таких элемента.

БУФЕРИЗАЦИЯ ДАННЫХ В РЕАКТИВНОМ ПОТОКЕ

В процессе обработки данных, проходящих через Flux, может оказаться полезным разбить поток данных на небольшие части. Операция buffer(), показанная на рисунке 10.18, может помочь в этом.

Рисунок 10.18 Операция buffer приводит к листу Flux заданного максимального размера, которые формируется на основе входящего Flux.

Учитывая, что Flux у нас String значений, каждое из которых содержит имя фрукта, вы можете создать новый Flux коллекции List, в которой каждый List содержит не более указанного числа элементов:

@Test

public void buffer() {

   Flux<String> fruitFlux = Flux.just(

      "apple", "orange", "banana", "kiwi", "strawberry");

   Flux<List<String>> bufferedFlux = fruitFlux.buffer(3);

   StepVerifier

      .create(bufferedFlux)

      .expectNext(Arrays.asList("apple", "orange", "banana"))

      .expectNext(Arrays.asList("kiwi", "strawberry"))

      .verifyComplete();

}

В этом случае String элементы Flux помещаются в новые Flux коллекции List, содержащие не более трех элементов в каждой. Следовательно, исходный Flux, который передает пять значений String, будет преобразован в Flux, который передает две коллекции List, одна из которых содержит три фрукта, а другая - два фрукта.

И что? Буферизация значений из реактивного потока в нереактивные коллекции List представляется контрпродуктивной. Но когда вы комбинируете buffer() с flatMap(), это позволяет параллельно обрабатывать каждую из коллекций List:

Flux.just(

   "apple", "orange", "banana", "kiwi", "strawberry")

   .buffer(3)

   .flatMap(x ->

      Flux.fromIterable(x)

      .map(y -> y.toUpperCase())

      .subscribeOn(Schedulers.parallel())

      .log()

   ).subscribe();

В этом новом примере вы по-прежнему буферизуете Flux из пяти String значений в Flux коллекций List. Но затем вы применяете flatMap() к этому Flux коллекций List. Это берет каждый List буфер и создает новый Flux из его элементов, а затем применяет к нему операцию map(). Следовательно, каждый буферный List дополнительно обрабатывается параллельно в отдельных потоках.

Чтобы доказать, что это работает, я также включил операцию log() для применения к каждому под-Flux-у. Операция log() просто регистрирует все события Reactive Streams, чтобы вы могли видеть, что на самом деле происходит. В результате в log записываются следующие записи (для краткости удален компонент времени):

[main] INFO reactor.Flux.SubscribeOn.1 -