Компания Software Cats уже более пяти лет занимается аутстафом и аутсорсом по направлениям
Если у вас есть ИТ-проблема, оставьте ваши контакты, и мы поможем составить план ее решения.
Проблема производительности
Проблема блокирующих операций
interface DriverService {
Drivers findDrivers(Area area);
}
class HttpDriverService implements DriverService {
public findDrivers(Area area) {
......
return restTemplate.getForObject(area);
}
}
class OrderService {
private final DriverService drService;
void process() {
Area area = ...;
Drivers drivers = drService.findDrivers(area);
...
}
}
Проблема масштабирования
Основные принципы реактивных систем
Примеры реактивных систем:
Асинхронность (Asynchronous)
// Пример маршрута в Spring Cloud Gateway
@Bean
public RouteLocator routes(RouteLocatorBuilder builder) {
return builder.routes()
.route("create_order", r -> r.path("/orders")
.uri("lb://order-service"))
.build();
}
@PostMapping("/orders")
public Mono<Order> createOrder(@RequestBody OrderRequest request) {
return orderService.createOrder(request); // Неблокирующий вызов
}
public Mono<Order> saveOrder(Order order) {
return reactiveMongoTemplate.save(order); // Возвращает Mono<Order>
}
@KafkaListener(topics = "order-created")
public Mono<Void> handleOrderCreated(OrderEvent event) {
return restaurantService.validateOrder(event); // Цепочка реактивных вызовов
}
@Autowired
private ReactiveKafkaProducerTemplate<String, OrderEvent> kafkaTemplate;
public Mono<Void> publishOrderCreated(OrderEvent event) {
return kafkaTemplate.send("order-created", event.key(), event)
.then(); // Неблокирующая отправка
}
public Mono<Order> createOrder(OrderRequest request) {
return validateRequest(request)
.flatMap(req -> saveOrder(req)) // Асинхронное сохранение в MongoDB
.flatMap(order -> publishEvent(order)) // Асинхронная отправка в Kafka
.onErrorResume(e -> handleError(e)); // Неблокирующая обработка ошибок
}
@Bean
public HandlerMapping webSocketHandlerMapping(NotificationHandler handler) {
Map<String, WebSocketHandler> map = Map.of("/notifications", handler);
return new SimpleUrlHandlerMapping(map, -1);
}
public Flux<String> getNotifications(User user) {
return kafkaReceiver.receive()
.filter(record -> record.key().equals(user.getId()))
.map(record -> record.value().getMessage());
}
public Mono<Void> sendSms(String phone, String message) {
return Mono.fromRunnable(() -> smsClient.send(phone, message))
.subscribeOn(Schedulers.boundedElastic()) // Изоляция блокирующего кода
.then();
}
Неблокирующие операции (Non-blocking)
Reactive Streams Api
Backpressure
@Service
public class OrderService {
private final ReactiveMongoTemplate mongoTemplate;
private final KafkaSender<String, OrderEvent> kafkaSender;
public Mono<Order> createOrder(OrderRequest request) {
return mongoTemplate.save(new Order(request))
.doOnSuccess(order ->
kafkaSender.send(
KafkaMessageBuilder.withPayload(new OrderCreated(order.getId()))
.build()
@Bean
public Disposable processOrderCreated() {
return KafkaReceiver.create(receiverOptions)
.receive()
.onBackpressureBuffer(100) // Максимум 100 сообщений в буфере
.delayElements(Duration.ofMillis(10)) // Искусственное замедление
.flatMap(record -> {
return checkAvailability(record.key())
.then(confirmOrRejectOrder(record.value()))
.then(record.receiverOffset().commit());
}, 5) // Ограничение: 5 параллельных обработок
.subscribe();
}
@Bean
public Disposable scheduleDelivery() {
return KafkaReceiver.create(receiverOptions)
.receive()
.flatMap(record ->
findAvailableCourier()
.delayElement(Duration.ofSeconds(1)) // Задержка для эмуляции
.flatMap(courier ->
kafkaSender.send(new DeliveryScheduled(record.value().orderId())))
.then(record.receiverOffset().commit()), 3 // Не более 3 одновременных обработок
)
.onBackpressureDrop(record ->
log.error("Delivery scheduling overload! Dropping record: {}", record)
)
.subscribe();
}
Композиция (Composition)
public Mono<Order> createOrder(OrderRequest request) {
return reactiveMongoTemplate.save(request.toOrder()) // Сохранить в MongoDB (Mono<Order>)
.flatMap(savedOrder ->
kafkaTemplate.send("order-created", savedOrder.toEvent()) // Отправить событие в Kafka
.thenReturn(savedOrder) // Вернуть сохраненный заказ после отправки
);
}
// В Restaurant Service при проверке доступности блюд
public Mono<Boolean> checkDishesAvailability(Order order) {
Mono<Boolean> checkDish1 = checkDishInStock(order.getDish1Id(), order.getQuantity());
Mono<Boolean> checkDish2 = checkDishInStock(order.getDish2Id(), order.getQuantity());
return Mono.zip(checkDish1, checkDish2)
.map(results -> results.getT1() && results.getT2()); // true, если оба блюда доступны
}
public Mono<DeliveryScheduledEvent> scheduleDelivery(OrderConfirmedEvent event) {
return findAvailableCourier(event.getRestaurantLocation())
.flatMap(courier -> calculateDeliveryTime( courier, event.getRestaurantLocation(),
event.getUserAddress())
.flatMap(deliveryTime ->
// Назначить курьеру заказ (Mono<Void>)
sendAssignmentToCourier(courier, event)
// Затем создать событие
.then(Mono.just(new DeliveryScheduledEvent(deliveryTime)))
)
)
.switchIfEmpty(Mono.defer(() ->
// Если курьера нет, запустить ручное назначение
escalateToManualAssignment(event)
));
}
// В Notification Service при отправке уведомления
public Mono<Void> sendNotification(NotificationEvent event) {
return webSocketClient.send(event.getUserId(), event.getMessage())
// Если WebSocket недоступен → отправить email.
onErrorResume(ex -> sendFallbackEmail(event.getUserId(), event.getMessage())
)
// Игнорировать ошибку, если не удалось то, и другое
.onErrorResume(ex -> Mono.empty());
}
Ленивые вычисления (Lazy Evaluation)
public Mono<Order> createOrder(Order order) {
return reactiveMongoRepository.save(order) // Запись в MongoDB (не выполняется сразу)
.doOnSuccess(savedOrder ->
kafkaTemplate.send("OrderCreated", savedOrder) // Отправка в Kafka (также ленивая)
);
}
@KafkaListener(topics = "OrderCreated")
public void handleOrderCreated(OrderCreatedEvent event) {
Flux.just(event)
.flatMap(this::checkAvailability) // Проверка доступности блюд (ленивая)
.subscribe(result -> {
if (result.isAvailable())
kafkaTemplate.send("OrderConfirmed", result);
else
kafkaTemplate.send("OrderRejected", result);
});
}
@KafkaListener(topics = {"OrderConfirmed", "DeliveryScheduled"})
public void sendNotification(Event event) {
Mono.fromCallable(() -> prepareMessage(event)) // Подготовка сообщения (ленивая)
.delayUntil(message -> webSocketClient.send(message)) // Отправка через WebSocket
.subscribe();
}
public interface OrderRepository extends ReactiveMongoRepository<Order, String> {
@Query("{ 'status': 'PENDING' }")
Flux<Order> findPendingOrders(); // Запрос выполняется при подписке
}
Масштабируемость (Scalability)
Функциональный стиль (Functional Style)
import java.util.concurrent.Flow.*;
public class ObserverPatternExample {
public static void main(String[] args) {
// Производитель
Publisher<Integer> publisher = new Publisher<>() {
@Override
public void subscribe(Subscriber<? super Integer> subscriber) {
subscriber.onSubscribe(new Subscription() {
@Override
public void request(long n) {
for (int i = 0; i < n; i++) {
subscriber.onNext(i); // Отправляем данные
}
subscriber.onComplete(); // Завершаем поток
}
@Override
public void cancel() {
System.out.println("Subscription cancelled");
}
});
}
};
// Потребитель
Subscriber<Integer> subscriber = new Subscriber<>() {
@Override
public void onSubscribe(Subscription subscription) {
subscription.request(5); // Запрашиваем 5 элементов
}
@Override
public void onNext(Integer item) {
System.out.println("Received: " + item);
}
@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("Stream completed");
}
};
publisher.subscribe(subscriber); // связываем производителя и потребителя
}
}
Subscription subscription = new Subscription() {
@Override
public void request(long n) {
// Отправляем не более n элементов
for (int i = 0; i < n; i++) {
subscriber.onNext(i);
}
}
@Override
public void cancel() {
System.out.println("Subscription cancelled");
}
};
Processor<Integer, String> processor = new Processor<>() {
private Subscriber<? super String> subscriber;
@Override
public void subscribe(Subscriber<? super String> subscriber) {
this.subscriber = subscriber;
}
@Override
public void onNext(Integer item) {
// Преобразуем Integer в String
subscriber.onNext("Processed: " + item);
}
@Override
public void onError(Throwable throwable) {
subscriber.onError(throwable);
}
@Override
public void onComplete() {
subscriber.onComplete();
}
@Override
public void onSubscribe(Subscription subscription) {
subscriber.onSubscribe(subscription);
}
};
Processor<Integer, String> processor1 = ...; // Первый обработчик
Processor<String, Double> processor2 = ...; // Второй обработчик
publisher.subscribe(processor1);
processor1.subscribe(processor2);
processor2.subscribe(subscriber);
Publisher<Integer> publisher = new Publisher<>() {
@Override
public void subscribe(Subscriber<? super Integer> subscriber) {
new Thread(() -> {
for (int i = 0; i < 10; i++) {
subscriber.onNext(i); // Асинхронная отправка данных
}
subscriber.onComplete();
}).start();
}
};
Популярные фреймворки реализующие Reactive Stream API