}
Обычно Flux публикует данные настолько быстро, насколько это возможно. Таким образом, вы используете операцию delayElements() в обоих созданных потоках Flux, чтобы немного их замедлить - отправляя запись каждые 500 мс. Кроме того, чтобы поток продуктов начинал передаваться, после Flux имен, вы применяете операцию delaySubscription() к потоку продуктов, чтобы он не отправлял никаких данных, пока не пройдет 250 мс после подписки.
После объединения двух объектов Flux создается новый объединенный Flux. Когда StepVerifier подписывается на объединенный поток, он, в свою очередь, подписывается на два исходных Flux потока, начиная поток данных.
Порядок предметов, отдаваемый из объединенного потока, совпадает со временем их передачи из источников. Поскольку оба объекта Flux настроены на отдачу с регулярной скоростью, значения будут чередоваться через объединенный поток, в результате чего будет получено имя, затем пища, затем имя и т. Д. Если время либо Flux должно быть изменены, возможно, вы увидите два персонажа или два продукта, опубликованные один за другим.
Поскольку mergeWith() не может гарантировать идеальное взаимодействие между его источниками, вы можете рассмотреть операцию zip() вместо этого. Когда два объекта Flux сжимаются вместе, это приводит к новому Flux, который создает кортеж элементов, где кортеж содержит один элемент из каждого исходного потока. На рис. 10.7 показано, как два объекта Flux можно сжать вместе.
Рис. 10.7. Сжатие двух потоков Flux приводит к созданию Flux, содержащего наборы по одному элементу от каждого потока.
Чтобы увидеть действие zip() в действии, рассмотрите следующий метод тестирования, который объединяет Flux персонажей и Flux продукты вместе:
@Test
public void zipFluxes() {
Flux<String> characterFlux = Flux
.just("Garfield", "Kojak", "Barbossa");
Flux<String> foodFlux = Flux
.just("Lasagna", "Lollipops", "Apples");
Flux<Tuple2<String, String>> zippedFlux =
Flux.zip(characterFlux, foodFlux);
StepVerifier.create(zippedFlux)
.expectNextMatches(p ->
p.getT1().equals("Garfield") &&
p.getT2().equals("Lasagna"))
.expectNextMatches(p ->
p.getT1().equals("Kojak") &&
p.getT2().equals("Lollipops"))
.expectNextMatches(p ->
p.getT1().equals("Barbossa") &&
p.getT2().equals("Apples"))
.verifyComplete();
}
Обратите внимание, что в отличие от mergeWith(), операция zip() является статической операцией создания. Созданный Flux имеет идеальное выравнивание между персонажами и их любимыми блюдами. Каждый элемент, испускаемый из сжатого потока, представляет собой Tuple2 (контейнерный объект, который содержит два других объекта), содержащий элементы из каждого исходного потока в порядке их публикации.
Если вы предпочитаете не работать с Tuple2, а работать с каким-то другим типом, вы можете предоставить функцию zip(), которая создает любой объект, который вы хотите, учитывая два элемента (как показано на диаграмме marble на рисунке 10.8).
Рисунок 10.8 альтернативная форма операции zip приводит к Flux сообщений, созданных из одного элемента каждого входящего Flux.
Например, в следующем методе тестирования показано, как связать Flux имен с Flux продуктов питания, чтобы получить в результате Flux из String объектов:
@Test
public void zipFluxesToObject() {
Flux<String> characterFlux = Flux
.just("Garfield", "Kojak", "Barbossa");
Flux<String> foodFlux = Flux
.just("Lasagna", "Lollipops", "Apples");
Flux<String> zippedFlux =
Flux.zip(characterFlux, foodFlux, (c, f) -> c + " eats " + f);
StepVerifier.create(zippedFlux)
.expectNext("Garfield eats Lasagna")
.expectNext("Kojak eats Lollipops")
.expectNext("Barbossa eats Apples")
.verifyComplete();
}
Функция, заданная для zip() (заданная здесь как лямбда), просто объединяет два элемента в предложение, которое отдается зипованным Flux.
ВЫБОР ПЕРВОГО РЕАКТИВНОГО ТИПА ДЛЯ ПУБЛИКАЦИИ
Предположим, у вас есть два Flux объекта, и вместо того, чтобы объединить их, вы просто хотите создать новый Flux, который будет генерировать значения из первого Flux, который создает значение. Как показано на рисунке 10.9, операция first() выбирает первый из двух объектов Flux и отображает значения, которые она публикует.
Рис. 10.9. first операция выбирает первый Flux, который отправляет сообщение, и после этого создает только сообщения из этого потока.
Следующий метод тестирования создает быстрый Flux и медленный Flux (где “медленный " означает, что он не будет публиковать элемент до 100 мс после подписки). Используя first(), он создает новый Flux, который будет публиковать значения только из первого исходного Flux для публикации значения:
@Test
public void firstFlux() {
Flux<String> slowFlux = Flux.just("tortoise", "snail", "sloth")
.delaySubscription(Duration.ofMillis(100));
Flux<String> fastFlux = Flux.just("hare", "cheetah", "squirrel");