Top.Mail.Ru

Реактивный Web

16 сентября 2014 года был опубликован Reactive Manifesto, в котором были определены ключевые принципы построения реактивных систем. В данном манифесте авторы заявляют:
В последние годы требования к приложениям кардинально изменились. Всего несколько лет назад большим считалось приложение, состоящее из десятков серверов, тогда время ответа измерялось секундами, простои при техническом обслуживании — часами, а данные для обработки — гигабайтами. Сегодня приложения устанавливаются на всем, от мобильных устройств до облачных кластеров, насчитывающих тысячи многоядерных процессоров. Пользователи привыкли к миллисекундным задержкам и стопроцентной доступности, а данные тем временем измеряются петабайтами. Программные архитектуры вчерашнего дня не способны удовлетворить требованиям дня сегодняшнего.
— The Reactive Manifesto, русский перевод
Попробуем разобраться в причинах, которые легли в основу отказа от привычных инструментов проектирования и разработки приложений.

Компания Software Cats уже более пяти лет занимается аутстафом и аутсорсом по направлениям

Если у вас есть ИТ-проблема, оставьте ваши контакты, и мы поможем составить план ее решения.

Предпосылки появления реактивных систем

Проблема производительности

Web приложения разрабатываются с использованием контейнера сервлетов, который обеспечивает жизнненый цикл сервлетов, согласно запросов пользователей (см. рисунок).
Потоковая модель контейнера сервлетов на примере TomCat
На рисунке схематически изображена реализация модели thread per request на примере самого распространенного контейнера сервлетов TomCat. Из названия thread per request(поток на запрос) не сложно догадаться, что каждому новому запросу будет назначаться поток из пула потоков, если свободных потоков нет и размер пула меньше максимально допустимого, то TomCat создаст новый поток. В противном случае запрос отправляется в очередь. TomCat продолжит принимать новые запросы, пока есть место в очереди — maxConnection, после он перестанет принимать новые запросы и они будут складироваться в очередь на уровне ОС до достижения значения acceptCount. Затем все запросы будут отклоняться.

Для небольших приложений эта модель великолепно справлялась. Но давайте рассчитаем необходимое количество потоков для современного сервиса. Гугл мне рассказал, что отдельные компоненты Ozon обрабатывают 250 000 запросов в секунду. Уменьшим эту цифру до 100 000. Примем среднее время обработки запроса 250 мс. Путем нехитрых вычислений получаем thread_pool_size = 25 000. На одном ядре процессора может выполняться только один поток. Каждому потоку планировщик потоков ОС выделят небольшой промежуток времени (квант) по истечении которого прерывает его и передает управление следующему потоку (preemptive multitasking). Таким образом достигается "параллельное выполнение потоков". Стоит отметить два момента. Первый — это то, что сами потоки расходуют ресурсы системы, помимо процессорного времени. Второй — это то, что при смене одного потока другим планировщик потоков сохраняет состояние исполняемого в данный момент потока (контекст: регистры процессора, стек и т.д.) и загружает состояние следующего потока (context switching). Нужно понимать, что данная операция не происходит мгновенно, а учитывая количество обрабатываемых потоков, которое мы вычислили, огромное количество процессорного времени будет съедено этой операцией, носящей исключительно технический характер. До Java 19 это был единственный возможный способ работы с потоками, но об этом дальше.
Реализация многопоточности в Java до 19 версии
Расчет RPS

RPS = (1000 / (Tavg)) * thread_pool_size

RPS — количество обрабатываемых запросов в секунду
Tavg — среднее время выполнения запроса
thread_pool_size — размер пула потоков

Проблема блокирующих операций

Но это еще все. Давайте чуть углубимся в детали возможной реализации. Например, мы хотим реализовать процесс оформления заказа такси. Первым делом нам придется найти свободные машины рядом с пользователем, мы будем обращаться к стороннему ресурсу, который контролирует свободные машины. Затем произведем расчет стоимости, в зависимости от количества доступных машин, отправим запрос на подтверждение пользователю и т.д.
interface DriverService {
    Drivers findDrivers(Area area);
}

class HttpDriverService implements DriverService {
    public findDrivers(Area area) {
    ......
    return restTemplate.getForObject(area);
    }
}

class OrderService {
    private final DriverService drService;

    void process() {
    Area area = ...;
    Drivers drivers = drService.findDrivers(area);
    ...
   }
}
Реализация запроса к стороннему ресурсу
Нетрудно заметить, что drService блокирует поток, в котором выполняется OrderService. В ожидании завершения операции ввода-вывода ресурсы системы, выделенные на этот поток, просто простаивают. Что еще больше увеличит нагрузку на систему. Можно конечно отправить выполнение этой задачи в отдельный поток, используя callback-функции, Future или CompletableFuture, но это только породит еще потоки, которые необходимо обрабатывать.

Начиная с Java 19, можно использовать Project Loom (в Java 21 он уже входит в стандартную реализацию JDK), который реализует так называемые виртуальные потоки и проблему блокирующих запросов можно избежать. Виртуальные потоки добавляют еще один уровень абстракции над обычными Java потоками и их обработкой занимается уже не планировщик ОС, а JVM, что значительно уменьшает нагрузку и избавляет от необходимости каждый раз создавать новый дорогостоящий поток.
Реализация виртуальных потоков
Реализация логики происходит внутри виртуальных потоков, которые управляются JVM и используют ресурсы уже созданных потоков. Scheduler JVM работает на по принципу preemptive multitasking, т.е. выделения времени, а позволяет виртуальному потоку дойти до блокирующей операции, после чего передает выполнение другому виртуальному потоку. При этом необходимость переключать процессорный контекст отсутствует, т.к. сами потоки хранятся в хипе, как объекты, и просто имеют статус mounted или unmounted. Данный подход позволяет избавиться от блокирующих операций.

Проблема масштабирования

Можно предположить, что проблемы, которые описали выше, легко исправить разворачиванием дополнительных экземпляров приложения, т.к. простое увеличение производительности быстро достигнет потолка. Но данный подход имеет некоторые ограничения:

  • Отсутствие обратной связи: система не сообщает о перегрузке до начала сбоев.
  • Линейный рост ресурсов: 2 экземпляра = 2x потоков, 2x памяти.
  • Сложность балансировки: традиционные подходы (например, Round Robin) не учитывают реальную загрузку узлов.

Реактивные системы

Реактивная система — это архитектурный подход к проектированию и построению приложений, которые способны эффективно реагировать на изменения, обрабатывать большие объемы данных и обеспечивать высокую производительность, отказоустойчивость и масштабируемость. Реактивные системы основаны на принципах реактивного программирования и реактивных манифестов, которые определяют ключевые характеристики таких систем.

Основные принципы реактивных систем

Реактивные системы строятся на четырех ключевых принципах, описанных в Reactive Manifesto:

  • Responsive (Отзывчивость): Система должна быстро и предсказуемо реагировать на запросы. Это означает, что время отклика должно быть минимальным, даже при высокой нагрузке.
  • Resilient (Устойчивость к сбоям): Система должна оставаться работоспособной даже в случае сбоев. Это достигается за счет изоляции компонентов, репликации данных и автоматического восстановления.
  • Elastic (Упругость): Система должна масштабироваться в зависимости от нагрузки. Это означает, что она может динамически увеличивать или уменьшать ресурсы (например, добавлять или удалять серверы).
  • Message-Driven (Ориентированность на сообщения): Компоненты системы взаимодействуют друг с другом асинхронно через обмен сообщениями. Это позволяет достичь слабой связанности (loose coupling) и высокой производительности.
Ключевые принципы реактивных систем
Для построения реактивных систем используются следующие технологии и подходы:

  • Реактивное программирование: Библиотеки и фреймворки, такие как Project Reactor, RxJava, Akka Streams, Vert.x.
  • Микросервисы: Реактивные системы часто строятся на основе микросервисной архитектуры, где каждый сервис является независимым и взаимодействует с другими через сообщения.
  • Message Brokers: Брокеры сообщений, такие как Kafka, RabbitMQ, Apache Pulsar, используются для асинхронного обмена сообщениями между компонентами.
  • Распределенные базы данных: Базы данных, такие как Cassandra, MongoDB, CockroachDB, обеспечивают отказоустойчивость и масштабируемость.
  • Контейнеризация и оркестрация: Технологии, такие как Docker и Kubernetes, используются для управления и масштабирования компонентов системы.

Примеры реактивных систем:

  • Социальные сети: Системы, такие как Twitter или Facebook, должны обрабатывать миллионы запросов в секунду и быстро реагировать на действия пользователей.
  • Торговые платформы: Платформы, такие как Amazon или Alibaba, должны масштабироваться в зависимости от нагрузки (например, во время распродаж) и оставаться отзывчивыми.
  • Стриминговые сервисы: Сервисы, такие как Netflix или Spotify, должны обрабатывать большие объемы данных в реальном времени и обеспечивать бесперебойную работу.
  • Финансовые системы: Биржевые системы или банковские приложения должны быть устойчивы к сбоям и обрабатывать транзакции с минимальной задержкой.

Давайте спроектируем простую реактивную систему на микросервисах для примера — платформу доставки еды. Система будет соответствовать принципам реактивности (отзывчивость, устойчивость, эластичность, управление сообщениями) и использовать асинхронное взаимодействие.

Архитектура системы

Система будет состоять из 5 микросервисов:
  1. Order Service — управление заказами.
  2. Restaurant Service — проверка доступности блюд в ресторанах.
  3. Delivery Service — планирование доставки.
  4. Notification Service — уведомления пользователей.
  5. API Gateway — единая точка входа для клиентов.

Технологии

  • Язык Java + WebFlux (реактивный стек).
  • Брокер сообщений: Apache Kafka (асинхронная коммуникация).
  • Базы данных: MongoDB (для каждого сервиса своя БД).
  • Контейнеризация: Docker.
  • Оркестрация: Kubernetes (масштабирование).
Схема взаимодействия
Подробное описание микросервисов

1. API Gateway
Роль: Маршрутизация запросов, аутентификация, кэширование.
Реактивные технологии: Spring Cloud Gateway.
Эндпоинты:
POST /orders — создать заказ.
GET /orders/{id} — получить статус заказа.

2. Order Service
Роль: Создание заказа, управление его жизненным циклом.
База данных: MongoDB (реактивный драйвер).
Логика: Принимает запрос на создание заказа. Сохраняет заказ в статусе PENDING. Публикует событие OrderCreated в Kafka.

3. Restaurant Service
Роль: Проверка доступности блюд в ресторанах.
Логика: Подписывается на топик OrderCreated. Проверяет, есть ли блюда в нужном количестве. Публикует событие: OrderConfirmed (если всё доступно) и OrderRejected (если блюд нет).

4. Delivery Service
Роль: Назначение курьера и расчет времени доставки.
Логика: Подписывается на OrderConfirmed. Ищет свободного курьера. Публикует DeliveryScheduled с временем доставки.

5. Notification Service
Роль: Уведомления через WebSocket/Email/SMS.
Логика: Подписывается на:
  1. OrderConfirmed → "Ваш заказ готовится".
  2. DeliveryScheduled → "Курьер выехал".
  3. OrderRejected → "Заказ отклонен".

Преимущества архитектуры

  • Отзывчивость: Клиент получает мгновенный ответ (например, "заказ принят в обработку"), даже если проверка ресторана занимает время.
  • Устойчивость: Если Restaurant Service упадет, события будут накапливаться в Kafka и обработаны после восстановления.
  • Эластичность: Сервисы масштабируются горизонтально (например, Delivery Service при большом количестве заказов).
  • Гибкость: Можно добавлять новые сервисы (например, Payment Service) без изменения существующей логики.

Реактивное программирование

Для реализации микросервисов используется Spring WebFlux, использующий реактивную парадигму программирования. Что же это такое? Это парадигма программирования, ориентированная на работу с потоками данных и распространение изменений. Оно основано на нескольких ключевых принципах, которые делают его мощным инструментом для создания отзывчивых, масштабируемых и устойчивых систем, основанных на обмене сообщениями (да, да принципы заявленные в Reactive Manifesto применимы и для системы состоящей всего из одного приложения). Рассмотрим основные принципы реактивного программирования и как их реализует WebFlux:

Асинхронность (Asynchronous)

  • API Gateway (Spring Cloud Gateway) маршрутизирует запросы без блокировки потоков:
// Пример маршрута в Spring Cloud Gateway
@Bean
public RouteLocator routes(RouteLocatorBuilder builder) {
    return builder.routes()
        .route("create_order", r -> r.path("/orders")
            .uri("lb://order-service"))
        .build();
}
  • Все микросервисы обрабатывают HTTP-запросы через реактивные контроллеры (возвращают Mono/Flux):
@PostMapping("/orders")
public Mono<Order> createOrder(@RequestBody OrderRequest request) {
    return orderService.createOrder(request); // Неблокирующий вызов
}
  • Реактивные драйверы для MongoDB
Каждый сервис использует реактивный драйвер MongoDB (ReactiveMongoTemplate), который не блокирует потоки при операциях с БД:
public Mono<Order> saveOrder(Order order) {
    return reactiveMongoTemplate.save(order); // Возвращает Mono<Order>
}
Запросы к базе выполняются асинхронно: поток освобождается сразу после отправки запроса и используется для других задач.
Схема взаимодействия с БД
  • Асинхронная коммуникация через Kafka
Потребители Kafka обрабатывают сообщения реактивно:
@KafkaListener(topics = "order-created")
public Mono<Void> handleOrderCreated(OrderEvent event) {
    return restaurantService.validateOrder(event); // Цепочка реактивных вызовов
}
Для интеграции с Kafka используется реактивный клиент (например, ReactiveKafkaProducerTemplate):
@Autowired
private ReactiveKafkaProducerTemplate<String, OrderEvent> kafkaTemplate;

public Mono<Void> publishOrderCreated(OrderEvent event) {
    return kafkaTemplate.send("order-created", event.key(), event)
                                     .then(); // Неблокирующая отправка
}
  • Реактивные цепочки (Service Layer)
Логика сервисов строится на цепочках реактивных операций (Mono/Flux), чтобы избегать блокировки:
public Mono<Order> createOrder(OrderRequest request) {
    return validateRequest(request)
        .flatMap(req -> saveOrder(req)) // Асинхронное сохранение в MongoDB
        .flatMap(order -> publishEvent(order)) // Асинхронная отправка в Kafka
        .onErrorResume(e -> handleError(e)); // Неблокирующая обработка ошибок
}
  • Реактивные WebSocket (Notification Service)
Уведомления через WebSocket отправляются асинхронно с использованием WebSocketHandler из WebFlux:
@Bean
public HandlerMapping webSocketHandlerMapping(NotificationHandler handler) {
    Map<String, WebSocketHandler> map = Map.of("/notifications", handler);
    return new SimpleUrlHandlerMapping(map, -1);
}
Отправка сообщений через Flux:
public Flux<String> getNotifications(User user) {
    return kafkaReceiver.receive()
        .filter(record -> record.key().equals(user.getId()))
        .map(record -> record.value().getMessage());
}
  • Schedulers для управления потоками
Для CPU-intensive задач или блокирующих операций (например, отправка SMS) используются отдельные пулы потоков:
public Mono<Void> sendSms(String phone, String message) {
    return Mono.fromRunnable(() -> smsClient.send(phone, message))
        .subscribeOn(Schedulers.boundedElastic()) // Изоляция блокирующего кода
        .then();
}

Неблокирующие операции (Non-blocking)

По умолчанию WebFlux работает на сервере Netty, который организует Event Loop в виде группы потоков (обычно по числу ядер процессора). Каждый поток Event Loop обрабатывает множество соединений, переключаясь между ними при наступлении событий (например, завершение чтения/записи данных). Как это происходит:

1. Прием запроса: Event Loop регистрирует новое соединение и начинает асинхронно обрабатывать входящие данные.
2. Неблокирующая обработка:
  • Если запрос требует I/O-операции (например, обращение к БД), WebFlux отправляет асинхронный запрос и освобождает Event Loop для обработки других задач.
  • Для блокирующихся операций (если их нельзя избежать) используются отдельные пулы потоков (например, Schedulers.boundedElastic()), чтобы не блокировать Event LoopКогда I/O-операция завершается, генерируется событие, и Event Loop вызывает привязанный к нему обработчик (callback).
3. Формирование ответа: После получения данных из асинхронного источника (например, БД) Event Loop передает их через цепочку реактивных операторов (Project Reactor) и отправляет клиенту.
Event loop

Reactive Streams Api

WebFlux использует Project Reactor (реализацию Reactive Streams, подробное описание будет далее) для управления потоками данных. Это позволяет обрабатывать данные в виде потоков (Flux/Mono) и использовать backpressure (механизм предотвращения перегрузки, когда клиент не успевает обрабатывать данные).

Backpressure

Это механизм управления потоком данных, чтобы потребитель не был перегружен, если производитель генерирует данные быстрее. В реактивном программировании это важно, потому что все асинхронное и неблокирующее. Давайте рассмотрим на примерах:

• Order Service (WebFlux + Kafka)
@Service
public class OrderService {
    private final ReactiveMongoTemplate mongoTemplate;
    private final KafkaSender<String, OrderEvent> kafkaSender;

    public Mono<Order> createOrder(OrderRequest request) {
        return mongoTemplate.save(new Order(request))
            .doOnSuccess(order ->
                 kafkaSender.send(
                        KafkaMessageBuilder.withPayload(new OrderCreated(order.getId()))
                                .build()
• Restaurant Service (Kafka Consumer + Backpressure)
@Bean
public Disposable processOrderCreated() {
     return KafkaReceiver.create(receiverOptions)
            .receive()
            .onBackpressureBuffer(100) // Максимум 100 сообщений в буфере
            .delayElements(Duration.ofMillis(10)) // Искусственное замедление
            .flatMap(record -> {
                 return checkAvailability(record.key())
                        .then(confirmOrRejectOrder(record.value()))
                        .then(record.receiverOffset().commit());
            }, 5) // Ограничение: 5 параллельных обработок
            .subscribe();
}
  • Delivery Service (Ограничение параллелизма)
@Bean
public Disposable scheduleDelivery() {
    return KafkaReceiver.create(receiverOptions)
        .receive()
        .flatMap(record ->
             findAvailableCourier()
                 .delayElement(Duration.ofSeconds(1)) // Задержка для эмуляции
                 .flatMap(courier ->
                     kafkaSender.send(new DeliveryScheduled(record.value().orderId())))
                 .then(record.receiverOffset().commit()), 3 // Не более 3 одновременных обработок
                  )
         .onBackpressureDrop(record ->
             log.error("Delivery scheduling overload! Dropping record: {}", record)
         )
        .subscribe();
}
Как это работает:

1. Order Service: onBackpressureDrop сбрасывает заказы, если MongoDB/Kafka не успевают.
2. Restaurant Service:
  • delayElements искусственно замедляет обработку.
  • flatMap(…, 5) ограничивает параллелизм.
3. Delivery Service: flatMap(…, 3) + onBackpressureDrop контролируют нагрузку.
4. Notification Service:
  • retryBackoff для устойчивости.
  • flatMap(…, 10) ограничивает одновременные WebSocket-отправки.

Композиция (Composition)

В WebFlux (на базе Project Reactor) композиция вычислений реализуется через объединение асинхронных операций в реактивные цепочки с помощью операторов Mono и Flux. Это позволяет строить сложные сценарии обработки данных без блокировки потоков. Для этого используются следующие операторы:

  • flatMap — для последовательной асинхронной обработки.
  • zip — для параллельного выполнения и объединения результатов.
  • then — для выполнения действий после завершения цепочки.
  • onErrorResume — для обработки ошибок в композиции.

Примеры:

  • OrderService (здесь flatMap связывает сохранение заказа и отправку события в Kafka в одну цепочку):
public Mono<Order> createOrder(OrderRequest request) {
    return reactiveMongoTemplate.save(request.toOrder()) // Сохранить в MongoDB (Mono<Order>)
        .flatMap(savedOrder ->
            kafkaTemplate.send("order-created", savedOrder.toEvent()) // Отправить событие в Kafka
                    .thenReturn(savedOrder) // Вернуть сохраненный заказ после отправки
        );
    }
  • RestaurantService (если нужно выполнить независимые операции параллельно, используется Mono.zip):
// В Restaurant Service при проверке доступности блюд
public Mono<Boolean> checkDishesAvailability(Order order) {
   Mono<Boolean> checkDish1 = checkDishInStock(order.getDish1Id(), order.getQuantity());
    Mono<Boolean> checkDish2 = checkDishInStock(order.getDish2Id(), order.getQuantity());

    return Mono.zip(checkDish1, checkDish2)
            .map(results -> results.getT1() && results.getT2()); // true, если  оба  блюда доступны
}
  • В DeliveryService (композиция может включать условную логику):
    public Mono<DeliveryScheduledEvent> scheduleDelivery(OrderConfirmedEvent event) {  
        return findAvailableCourier(event.getRestaurantLocation())
                .flatMap(courier -> calculateDeliveryTime( courier, event.getRestaurantLocation(),             
                    event.getUserAddress())
                                .flatMap(deliveryTime ->
                                    // Назначить курьеру заказ  (Mono<Void>)
                                        sendAssignmentToCourier(courier, event) 
                                    // Затем  создать  событие
                                   .then(Mono.just(new DeliveryScheduledEvent(deliveryTime)))
                                  )
                    )
                    .switchIfEmpty(Mono.defer(() -> 
                            // Если  курьера  нет, запустить  ручное назначение
                            escalateToManualAssignment(event) 
                    ));
    }
  • В NotificationService (композиция включает обработку исключений без прерывания потока)
// В  Notification Service при  отправке  уведомления
    public Mono<Void> sendNotification(NotificationEvent event) {
        return webSocketClient.send(event.getUserId(), event.getMessage())
               // Если WebSocket недоступен → отправить email.
               onErrorResume(ex ->  sendFallbackEmail(event.getUserId(), event.getMessage())
                )
               // Игнорировать  ошибку, если  не  удалось то, и  другое
                .onErrorResume(ex -> Mono.empty()); 
}

Ленивые вычисления (Lazy Evaluation)

Ленивые вычисления WebFlux позволяют:

  • Откладывать выполнение операций до необходимости.
  • Минимизировать использование ресурсов (память, потоки).
  • Обрабатывать тысячи одновременных запросов без блокировки.
  • Строить эффективные цепочки асинхронных операций

Примеры:

  • Запись в БД и отправка события в Kafka происходят только при вызове subscribe() или в рамках реактивного пайплайна:
public Mono<Order> createOrder(Order order) {
    return reactiveMongoRepository.save(order) // Запись в MongoDB (не выполняется сразу)
        .doOnSuccess(savedOrder ->  
            kafkaTemplate.send("OrderCreated", savedOrder) // Отправка в Kafka (также ленивая)
        );
}
  • Обработка события начинается только после его поступления в топик:
@KafkaListener(topics = "OrderCreated")
public void handleOrderCreated(OrderCreatedEvent event) {
    Flux.just(event)
        .flatMap(this::checkAvailability) // Проверка доступности блюд (ленивая)
        .subscribe(result -> {
            if (result.isAvailable()) 
                kafkaTemplate.send("OrderConfirmed", result);
            else 
                kafkaTemplate.send("OrderRejected", result);
        });
}
  • Подготовка и отправка уведомления откладываются до момента поступления события:
@KafkaListener(topics = {"OrderConfirmed", "DeliveryScheduled"})
public void sendNotification(Event event) {
    Mono.fromCallable(() -> prepareMessage(event)) // Подготовка сообщения (ленивая)
        .delayUntil(message -> webSocketClient.send(message)) // Отправка через WebSocket
        .subscribe();
}
  • Запросы к базе данных инициируются только при вызове методов subscribe(), block() или в цепочке реактивных операций:
public interface OrderRepository extends ReactiveMongoRepository<Order, String> {
    @Query("{ 'status': 'PENDING' }")
    Flux<Order> findPendingOrders(); // Запрос выполняется при подписке
}
Стоит выделить из вышеописанного так называемые cold и hot Publishers:

  • Cold Publisher генерирует данные для каждого подписчика отдельно (данные "стартуют с начала" при каждой подписке). Примеры в системе:
  • запросы в БД
  • http запросы
  • Hot Publishers транслирует данные всем подписчикам одновременно (данные "живут" независимо от подписок). Примеры в системе:
  • События Kafka
  • WebSocket-уведомления

Масштабируемость (Scalability)

WebFlux же использует event-loop и асинхронные операции. Когда сервис делает запрос к MongoDB (через реактивный драйвер) или отправляет сообщение в Kafka, он не блокирует поток. Вместо этого поток освобождается и может обрабатывать другие запросы, пока ждет ответа от БД или брокера. Таким образом, один экземпляр сервиса может обслуживать гораздо больше запросов с тем же количеством ресурсов. Это позволяет лучше утилизировать ресурсы и уменьшает необходимость в большом количестве экземпляров, но при этом при увеличении нагрузки можно масштабироваться горизонтально, добавляя больше инстансов, каждый из которых эффективно использует ресурсы.

Функциональный стиль (Functional Style)

Как мы видим, в примерах активно используются функциональные подходы, такие как: функции высшего порядка (map, filter, reduce), лямбда-выражения, композиция операторов, что обеспечивает чистый и декларативный код.

Reactive Streams API

Как мы выяснили, WebFlux использует Project Reactor, как одну из реализаций Reactive Streams API. Reactive Streams API — это мощная спецификация, которая решает ключевые проблемы асинхронной обработки данных, такие как управление backpressure и эффективное использование ресурсов. Она широко используется в современных библиотеках и фреймворках, а также стала частью стандартной библиотеки Java, начиная с версии 9, и представлена в пакете java.util.concurrent.Flow

В java.util.concurrent.Flow определены четыре основных интерфейса:

1. Publisher <T>:
  • Производитель данных. Производит данные типа Т в поток
  • Метод: void subscribe(Subscriber<? super T> subscriber);
2. Subscriber <T>:
  • Потребитель данных. Получает данные типа Т и реагирует на события
  • Методы: void onSubscribe(Subscription subscription); void onNext(T item); void onError(Throwable throwable); void onComplete();
3. Subscription <T>:
  • Связь между Publisher и Subscriber. Управляет запросами данных и отменой подписки.
  • Методы: void request(long n); void cancel();
4. Processor <T, R>:
  • Комбинирует функции Publisher и Subscriber. Принимает элементы типа T, обрабатывает их и возвращает в поток элементы типа R.
  • Наследует: Processor<T, R> extends Subscriber<T>, Publisher<R>

Этот API реализует несколько ключевых паттернов, которые лежат в основе реактивного программирования и управления асинхронными потоками данных. Рассмотрим основные:

1. Producer-Consumer (Производитель-Потребитель)
  • Описание: паттерн «Производитель-Потребитель» является частным случаем паттерна Observer (Наблюдатель). Используется для реализации механизма подписки и уведомлений и разделения ответственности между компонентами.
  • Реализация:
Publisher – производитель данных
Subscriber – потребитель данных
Subscription – управляет потоком данных между производителем и потребителем
  • Пример:
import java.util.concurrent.Flow.*;

public class ObserverPatternExample {
    public static void main(String[] args) {
// Производитель
        Publisher<Integer> publisher = new Publisher<>() {
            @Override
            public void subscribe(Subscriber<? super Integer> subscriber) {
                subscriber.onSubscribe(new Subscription() {
                    @Override
                    public void request(long n) {
                        for (int i = 0; i < n; i++) {
                            subscriber.onNext(i); // Отправляем данные
                        }
                        subscriber.onComplete(); // Завершаем поток
                    }

                    @Override
                    public void cancel() {
                        System.out.println("Subscription cancelled");
                    }
                });
            }
        };
// Потребитель
        Subscriber<Integer> subscriber = new Subscriber<>() {
            @Override
            public void onSubscribe(Subscription subscription) {
                subscription.request(5); // Запрашиваем 5 элементов
            }

            @Override
            public void onNext(Integer item) {
                System.out.println("Received: " + item);
            }

            @Override
            public void onError(Throwable throwable) {
                throwable.printStackTrace();
            }

            @Override
            public void onComplete() {
                System.out.println("Stream completed");
            }
        };

        publisher.subscribe(subscriber); // связываем производителя и потребителя
    }
}
2. Backpressure:
  • Описание: Backpressure — это механизм, который позволяет подписчику контролировать скорость, с которой издатель отправляет данные. Это предотвращает перегрузку подписчика, если он не успевает обрабатывать данные.
  • Реализация:
Интерфейс Subscription предоставляет методы request(long n) и cancel().
Подписчик использует request(n), чтобы указать, сколько элементов он готов принять.
Издатель должен уважать запросы подписчика и не отправлять больше данных, чем запрошено.
  • Пример:
Subscription subscription = new Subscription() {
    @Override
    public void request(long n) {
        // Отправляем не более n элементов
        for (int i = 0; i < n; i++) {
            subscriber.onNext(i);
        }
    }

    @Override
    public void cancel() {
        System.out.println("Subscription cancelled");
    }
};
3. Processor (Обработчик)
  • Описание: Паттерн "Обработчик" используется для создания компонентов, которые одновременно являются и производителями, и потребителями. Они принимают данные, обрабатывают их и передают дальше.
  • Реализация:
Интерфейс Processor<T, R> наследует Subscriber<T> и Publisher<R>.
Processor принимает данные типа T, обрабатывает их и emits данные типа R.
  • Пример:
  Processor<Integer, String> processor = new Processor<>() {
    private Subscriber<? super String> subscriber;

    @Override
    public void subscribe(Subscriber<? super String> subscriber) {
        this.subscriber = subscriber;
    }

    @Override
    public void onNext(Integer item) {
        // Преобразуем Integer в String
        subscriber.onNext("Processed: " + item);
    }

    @Override
    public void onError(Throwable throwable) {
        subscriber.onError(throwable);
    }

    @Override
    public void onComplete() {
        subscriber.onComplete();
    }

    @Override
    public void onSubscribe(Subscription subscription) {
        subscriber.onSubscribe(subscription);
    }
};
4. Chain of Responsibility (Цепочка обязанностей)
  • Описание: Паттерн "Цепочка обязанностей" используется для создания цепочек обработчиков, где каждый обработчик выполняет свою часть работы и передает данные следующему.
  • Реализация:
Processor может быть использован для создания цепочек обработчиков.
Например, можно создать несколько Processor, которые последовательно обрабатывают данные.
  • Пример:
Processor<Integer, String> processor1 = ...; // Первый обработчик
Processor<String, Double> processor2 = ...; // Второй обработчик

publisher.subscribe(processor1);
processor1.subscribe(processor2);
processor2.subscribe(subscriber);
5. Asynchronous Processing (Асинхронная обработка)
  • Описание: Паттерн "Асинхронная обработка" позволяет выполнять операции без блокировки основного потока выполнения.
  • Реализация:
Все методы в Publisher, Subscriber и Subscription работают асинхронно.
Это позволяет эффективно использовать ресурсы системы и избегать блокировок.
  • Пример:
Publisher<Integer> publisher = new Publisher<>() {
    @Override
    public void subscribe(Subscriber<? super Integer> subscriber) {
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                subscriber.onNext(i); // Асинхронная отправка данных
            }
            subscriber.onComplete();
        }).start();
    }
};
Reactive Streams API реализуют многие популярные библиотеки и фреймворки, предоставляя готовые реализации механизмов асинхронной обработки потоковых данных, предусматривающих различные операторы для компоновки и преобразования, а также различные способы обеспечения отказоустойчивости.

Популярные фреймворки реализующие Reactive Stream API

Вот основные из них:
1. Project Reactor
  • Описание: Одна из самых популярных библиотек для работы с реактивными потоками. Она является основой для реактивного стека Spring WebFlux.
  • Основные классы: Mono (для одного элемента) и Flux (для потока элементов).
  • Особенности:
Интеграция с Spring Framework.
Поддержка backpressure (обратного давления).
Богатый API для работы с асинхронными и реактивными потоками.


2. RxJava
  • Описание: Реализация Reactive Extensions (Rx.NET) на виртуальной машине Java используя паттерн Observer (Наблюдатель)
  • Основные классы: Observable, Flowable, Single, Completable, Maybe.
  • Особенности:
Поддержка множества операторов для преобразования, фильтрации и комбинирования потоков.
Широкая экосистема и поддержка.

3. Akka Streams
  • Описание: Библиотека для работы с реактивными потоками, построенная на основе акторной модели (Akka).
  • Особенности:
Интеграция с Akka для построения распределенных систем.
Поддержка backpressure.
Высокая производительность и масштабируемость.

4. Vert.x
  • Описание: Реактивный фреймворк для построения асинхронных и неблокирующих приложений. Поддерживает реактивные потоки через Flowable и Observable.
  • Особенности:
Легковесный и высокопроизводительный.
Поддержка реактивных потоков через RxJava или собственные реализации.

5. SmallRye Mutiny
  • Описание: Реактивная библиотека, ориентированная на простоту и удобство использования. Часто используется в Quarkus.
  • Основные классы: Uni (для одного элемента) и Multi (для потока элементов).
  • Особенности:
Простой и интуитивно понятный API.
Интеграция с Quarkus.

6. Reactive Streams Commons
  • Описание: Общая библиотека, на которой построены многие другие реализации реактивных потоков (например, Project Reactor).
  • Особенности:
Низкоуровневая реализация.
Используется для создания собственных реактивных библиотек.

Выбор конкретной библиотеки зависит от требований и контекста использования. Например, Project Reactor разумно использовать вместе с Spring Framework, а SmallRye Mutiny вместе с Quarkus. Для обеспечения максимальной производительности лучше использовать Akka Streams.

Переход на реактивную парадигму требует пересмотра подходов к проектированию архитектуры и выбора инструментов, но обеспечивает значительные преимущества для бизнеса в условиях растущих требований к производительности и доступности сервисов.

Наша команда уже более пяти лет занимается реализацией проектов на Java и усилением команд по направлениям

За время существования компании, мы принимали участие в работе над более чем 100 проектами различного объема и длительности.

Если перед вами стоят вызовы, для достижения которых вам может понадобится наша экспертиза, просто напишите нам,

Мы договоримся с вами об онлайн-встрече, чтобы подробнее обсудить ваш проект и вашу проблему.
Александр Немков
Java Developer

Еще почитать по теме:

    Обсудить проект _
    Если у вас есть ИТ-проблема , оставьте свои контакты, и мы поможем правительству спланировать ее решение . Обещаем не рассылать спам.
    hello@softwarecats.dev
    Новосибирск, ул. Демакова
    23/5, оф.308
    Контакты _

    Выполненные проекты: