diff --git a/README.md b/README.md index 13dca74..472e92e 100644 --- a/README.md +++ b/README.md @@ -11,6 +11,7 @@ + [Потоки ввода-вывода в Java](#Потоки-вводавывода-в-java) ![icon][done] + [Сериализация](#Сериализация) ![icon][done] + [Многопоточность](#Многопоточность) ![icon][done] ++ [Реактивное программирование](#реактивное-программирование) ![icon][done] + [Servlets, JSP, JSTL](#servlets-jsp-jstl) ![icon][done] + [Базы данных](#Базы-данных) ![icon][done] + [SQL](#sql) ![icon][done] @@ -23,6 +24,7 @@ + [Основы HTML](#Основы-html) ![icon][done] + [Основы CSS](#Основы-css) ![icon][done] + [Основы Web](#Основы-web) ![icon][done] ++ [Apache Kafka](#apache-kafka) ![icon][done] + [Дополнительные материалы](#Дополнительные-материалы) ![icon][done] [done]:done.png @@ -445,6 +447,20 @@ [к оглавлению](#Вопросы-для-собеседования-на-java-developer) +## Реактивное программирование + +* [Что такое реактивное программирование и чем оно отличается от процедурного программирования?](reactive.md#что-такое-реактивное-программирование-и-чем-оно-отличается-от-процедурного-программирования) +* [Объясните концепцию потоков данных в реактивном программировании](reactive.md#объясните-концепцию-потоков-данных-в-реактивном-программировании) +* [Что такое паттерн Observer и как он лежит в основе реактивного программирования?](reactive.md#что-такое-паттерн-observer-и-как-он-лежит-в-основе-реактивного-программирования) +* [Опишите роль Observable и Observer в реактивном программировании](reactive.md#опишите-роль-observable-и-observer-в-реактивном-программировании) +* [Что такое backpressure в контексте реактивного программирования?](reactive.md#что-такое-backpressure-в-контексте-реактивного-программирования) +* [Объясните разницу между Hot и Cold Observable](reactive.md#объясните-разницу-между-hot-и-cold-observable) +* [Какова роль Подписки в реактивном программировании?](reactive.md#какова-роль-подписки-в-реактивном-программировании) +* [Как отписаться от потока для предотвращения утечки памяти?](reactive.md#как-отписаться-от-потока-для-предотвращения-утечки-памяти) +* [Какие есть операторы в Project Reactor и для чего они используются?](reactive.md#какие-есть-операторы-в-project-reactor-и-для-чего-они-используются) + +[к оглавлению](#Вопросы-для-собеседования-на-java-developer) + ## Servlets, JSP, JSTL + [Что такое _«сервлет»_?](servlets.md#Что-такое-сервлет) + [В чем заключаются преимущества технологии сервлетов над CGI (Common Gateway Interface)?](servlets.md#В-чем-заключаются-преимущества-технологии-сервлетов-над-cgi-common-gateway-interface) @@ -745,6 +761,69 @@ [к оглавлению](#Вопросы-для-собеседования-на-java-developer) +## Apache Kafka + +* [Что такое Apache Kafka?](kafka.md#что-такое-apache-kafka) +* [Основные компоненты Kafka](kafka.md#основные-компоненты-kafka) + +**Архитектура компонентов** + +* Topic + * [Архитектура топика](kafka.md#архитектура-топика) + * [Настройки топика Kafka](kafka.md#настройки-топика-kafka) +* Broker + * [Архитектура брокера](kafka.md#архитектура-брокера) + * [Настройки брокера Kafka](kafka.md#настройки-брокера-kafka) +* Producer + * [Архитектура продюсера](kafka.md#архитектура-продюсера) + * [Настройки продюсера](kafka.md#настройки-продюсера) + * [Пример конфигурации Kafka Producer](kafka.md#пример-конфигурации-kafka-producer) +* Consumer + * [Архитектура консюмера](kafka.md#архитектура-консюмера) + * [Настройки консюмера](kafka.md#настройки-консюмера) + * [Пример конфигурации Kafka Consumer](kafka.md#пример-конфигурации-kafka-consumer) + +**Kafka API** + +* [Основные API Kafka](kafka.md#основные-api-kafka) +* [Какова роль Producer API?](kafka.md#какова-роль-producer-api) +* [Какова роль Consumer API?](kafka.md#какова-роль-consumer-api) +* [Какова роль Connector API?](kafka.md#какова-роль-connector-api) +* [Какова роль Streams API?](kafka.md#какова-роль-streams-api) +* [Какова роль Transactions API?](kafka.md#какова-роль-transactions-api) +* [Какова роль Quota API?](kafka.md#какова-роль-quota-api) +* [Какова роль AdminClient API?](kafka.md#какова-роль-AdminClient-api) + +**Kafka Consumer** + +* [Для чего нужен координатор группы?](kafka.md#для-чего-нужен-координатор-группы) +* [Для чего нужен Consumer heartbeat thread?](kafka.md#для-чего-нужен-consumer-heartbeat-thread) +* [Как Kafka обрабатывает сообщения?](kafka.md#как-kafka-обрабатывает-сообщения) +* [Как Kafka обрабатывает задержку консюмера?](kafka.md#как-kafka-обрабатывает-задержку-консюмера) +* [Для чего нужны методы subscribe() и poll()?](kafka.md#для-чего-нужны-методы-subscribe-и-poll) +* [Для чего нужен метод position()?](kafka.md#для-чего-нужен-метод-position) +* [Для чего нужны методы commitSync() и commitAsync()?](kafka.md#для-чего-нужны-методы-commitsync-и-commitasync) + +**Другие вопросы** + +* [Для чего нужен идемпотентный продюсер?](kafka.md#для-чего-нужен-идемпотентный-продюсер) +* [Для чего нужен интерфейс Partitioner?](kafka.md#для-чего-нужен-интерфейс-partitioner) +* [Для чего нужен Broker log cleaner thread?](kafka.md#для-чего-нужен-broker-log-cleaner-thread) +* [Для чего нужен Kafka Mirror Maker?](kafka.md#для-чего-нужен-kafka-mirror-maker) +* [Для чего нужна Schema Registry?](kafka.md#для-чего-нужна-schema-registry) +* [Для чего нужен Streams DSL?](kafka.md#для-чего-нужен-streams-dsl) +* [Как Kafka обеспечивает версионирование сообщений?](kafka.md#как-kafka-обеспечивает-версионирование-сообщений) +* [Как потребители получают сообщения от брокера?](kafka.md#как-потребители-получают-сообщения-от-брокера) + +**Сравнение с другими компонентами и системами** + +* [В чем разница между Kafka Consumer и Kafka Stream?](kafka.md#в-чем-разница-между-kafka-consumer-и-kafka-stream) +* [В чем разница между Kafka Streams и Apache Flink?](kafka.md#в-чем-разница-между-kafka-streams-и-apache-flink) +* [В чем разница между Kafka и Flume?](kafka.md#в-чем-разница-между-kafka-и-flume) +* [В чем разница между Kafka и RabbitMQ?](kafka.md#в-чем-разница-между-kafka-и-rabbitmq) + +[к оглавлению](#Вопросы-для-собеседования-на-java-developer) + ## Дополнительные материалы + [4 толковых канала на Youtube про технические собеседования](https://habr.com/ru/post/454264/) + [A list of fancy questions I've been asked during the interviews I had](https://github.com/d1mnewz/interviews) @@ -755,6 +834,7 @@ + [Top Spring Framework Interview Questions](https://www.baeldung.com/spring-interview-questions) + [Spring Interview Questions](https://www.interviewbit.com/spring-interview-questions/) + [Hibernate Interview Questions](https://www.adaface.com/blog/hibernate-interview-questions/) ++ [Java Interview Questions](https://labex.io/interview-questions/java) [к оглавлению](#Вопросы-для-собеседования-на-java-developer) diff --git a/core.md b/core.md index f20e28b..f7b6100 100644 --- a/core.md +++ b/core.md @@ -1273,8 +1273,7 @@ public native int hashCode(); ## Если у класса `Point{int x, y;}` реализовать метод `equals(Object that) {(return this.x == that.x && this.y == that.y)}`, но сделать хэш код в виде `int hashCode() {return x;}`, то будут ли корректно такие точки помещаться и извлекаться из `HashSet`? `HashSet` использует `HashMap` для хранения элементов. При добавлении элемента в `HashMap` вычисляется хэш код, по которому определяется позиция в массиве, куда будет вставлен новый элемент. У всех экземпляров класса `Point` хэш код будет одинаковым для всех объектов с одинаковым `x`, что приведёт к вырождению хэш таблицы в список. -При возникновении коллизии в `HashMap` осуществляется проверка на наличие элемента в списке: `e.hash == hash && ((k = e.key) == key || key.equals(k))`. Если элемент найден, то его значение перезаписывается. В нашем случае для разных объектов метод `equals()` будет возвращать `false`. Соответственно новый элемент будет успешно добавлен в `HashSet`. Извлечение элемента также будет осуществляться успешно. Но производительность такого кода будет невысокой и преимущества хэш таблиц использоваться не будут. - +При возникновении коллизии в `HashMap` осуществляется проверка на наличие элемента в списке: `e.hash == hash && ((k = e.key) == key || key.equals(k))`. Если элемент найден, то его значение перезаписывается. В нашем случае для разных объектов метод `equals()` будет возвращать `false`. Соответственно новый элемент будет успешно добавлен в `HashSet`. Извлечение элемента также будет осуществляться успешно. Но производительность такого кода будет невысокой, из-за неэффективности хэш-функции, которая cпособна породить большое количество колизий. [к оглавлению](#java-core) ## Могут ли у разных объектов `(ref0 != ref1)` быть `ref0.equals(ref1) == true`? diff --git a/db.md b/db.md index 3f7aa80..32aae03 100644 --- a/db.md +++ b/db.md @@ -179,7 +179,7 @@ __По количественному составу__ __По характеристике содержимого__ + _уникальный индекс_ состоит из множества уникальных значений поля; -+ _плотный индекс_ (NoSQL) — индекс, при котором, каждом документе в индексируемой коллекции соответствует запись в индексе, даже если в документе нет индексируемого поля. ++ _плотный индекс_ (NoSQL) — индекс, при котором, каждому документу в индексируемой коллекции соответствует запись в индексе, даже если в документе нет индексируемого поля. + _разреженный индекс_ (NoSQL) — тот, в котором представлены только те документы, для которых индексируемый ключ имеет какое-то определённое значение (существует). + _пространственный индекс_ — оптимизирован для описания географического местоположения. Представляет из себя многоключевой индекс состоящий из широты и долготы. + _составной пространственный индекс_ — индекс, включающий в себя кроме широты и долготы ещё какие-либо мета-данные (например теги). Но географические координаты должны стоять на первом месте. diff --git a/java8.md b/java8.md index 904d885..f430b6a 100644 --- a/java8.md +++ b/java8.md @@ -230,7 +230,7 @@ public static void main(String[] args) { ## Какие виды ссылок на методы вы знаете? + на статический метод; + на метод экземпляра; -+ на конструкторе. ++ на конструктор. [к оглавлению](#java-8) diff --git a/jcf.md b/jcf.md index d856cd9..9e9dcf2 100644 --- a/jcf.md +++ b/jcf.md @@ -883,7 +883,7 @@ __Один__ новый объект статического вложенног [к оглавлению](#java-collections-framework) ## Как перебрать все пары «ключ-значение» в `Map`? -Использовать метод `entrySet()`, который возвращает множество `Set` пар «ключ-значение». +Использовать метод `entrySet()`, который возвращает множество `Set>` пар «ключ-значение». [к оглавлению](#java-collections-framework) diff --git a/jdbc.md b/jdbc.md index 3211f10..56754a6 100644 --- a/jdbc.md +++ b/jdbc.md @@ -190,9 +190,9 @@ __Уровень изолированности транзакций__ — зн Объекты-носители интерфейсов создаются при помощи методов объекта `java.sql.Connection`: -+ `java.sql.createStatement()` возвращает объект _Statement_; -+ `java.sql.prepareStatement()` возвращает объект _PreparedStatement_; -+ `java.sql.prepareCall()` возвращает объект _CallableStatement_; ++ `java.sql.Connection.createStatement()` возвращает объект _Statement_; ++ `java.sql.Connection.prepareStatement()` возвращает объект _PreparedStatement_; ++ `java.sql.Connection.prepareCall()` возвращает объект _CallableStatement_; [к оглавлению](#jdbc) diff --git a/kafka.md b/kafka.md new file mode 100644 index 0000000..c579cbf --- /dev/null +++ b/kafka.md @@ -0,0 +1,1420 @@ +[Вопросы для собеседования](README.md) + +# Apache Kafka +* [Что такое Apache Kafka?](#что-такое-apache-kafka) +* [Основные компоненты Kafka](#основные-компоненты-kafka) + +**Архитектура компонентов** + +* Topic + * [Архитектура топика](#архитектура-топика) + * [Настройки топика Kafka](#настройки-топика-kafka) +* Broker + * [Архитектура брокера](#архитектура-брокера) + * [Настройки брокера Kafka](#настройки-брокера-kafka) +* Producer + * [Архитектура продюсера](#архитектура-продюсера) + * [Настройки продюсера](#настройки-продюсера) + * [Пример конфигурации Kafka Producer](#пример-конфигурации-kafka-producer) +* Consumer + * [Архитектура консюмера](#архитектура-консюмера) + * [Настройки консюмера](#настройки-консюмера) + * [Пример конфигурации Kafka Consumer](#пример-конфигурации-kafka-consumer) + +**Kafka API** + +* [Основные API Kafka](#основные-api-kafka) +* [Какова роль Producer API?](#какова-роль-producer-api) +* [Какова роль Consumer API?](#какова-роль-consumer-api) +* [Какова роль Connector API?](#какова-роль-connector-api) +* [Какова роль Streams API?](#какова-роль-streams-api) +* [Какова роль Transactions API?](#какова-роль-transactions-api) +* [Какова роль Quota API?](#какова-роль-quota-api) +* [Какова роль AdminClient API?](#какова-роль-AdminClient-api) + +**Kafka Consumer** + +* [Для чего нужен координатор группы?](#для-чего-нужен-координатор-группы) +* [Для чего нужен Consumer heartbeat thread?](#для-чего-нужен-consumer-heartbeat-thread) +* [Как Kafka обрабатывает сообщения?](#как-kafka-обрабатывает-сообщения) +* [Как Kafka обрабатывает задержку консюмера?](#как-kafka-обрабатывает-задержку-консюмера) +* [Для чего нужны методы subscribe() и poll()?](#для-чего-нужны-методы-subscribe-и-poll) +* [Для чего нужен метод position()?](#для-чего-нужен-метод-position) +* [Для чего нужны методы commitSync() и commitAsync()?](#для-чего-нужны-методы-commitsync-и-commitasync) + +**Другие вопросы** + +* [Для чего нужен идемпотентный продюсер?](#для-чего-нужен-идемпотентный-продюсер) +* [Для чего нужен интерфейс Partitioner?](#для-чего-нужен-интерфейс-partitioner) +* [Для чего нужен Broker log cleaner thread?](#для-чего-нужен-broker-log-cleaner-thread) +* [Для чего нужен Kafka Mirror Maker?](#для-чего-нужен-kafka-mirror-maker) +* [Для чего нужна Schema Registry?](#для-чего-нужна-schema-registry) +* [Для чего нужен Streams DSL?](#для-чего-нужен-streams-dsl) +* [Как Kafka обеспечивает версионирование сообщений?](#как-kafka-обеспечивает-версионирование-сообщений) +* [Как потребители получают сообщения от брокера?](#как-потребители-получают-сообщения-от-брокера) + +**Сравнение с другими компонентами и системами** + +* [В чем разница между Kafka Consumer и Kafka Stream?](#в-чем-разница-между-kafka-consumer-и-kafka-stream) +* [В чем разница между Kafka Streams и Apache Flink?](#в-чем-разница-между-kafka-streams-и-apache-flink) +* [В чем разница между Kafka и Flume?](#в-чем-разница-между-kafka-и-flume) +* [В чем разница между Kafka и RabbitMQ?](#в-чем-разница-между-kafka-и-rabbitmq) + + +## Что такое Apache Kafka? + +Это распределённая система с открытым исходным кодом, разработанная для высокоскоростной передачи больших объёмов данных +с минимальной задержкой. + +### Преимущества + +* Персистентность данных +* Высокая производительность +* Независимость пайплайнов обработки +* Возможность просмотреть историю записей заново +* Гибкость в использовании + +### Когда использовать + +* λ-архитектура или k-архитектура +* Стриминг больших данных +* Много клиентов (producer и consumer) +* Требуется кратное масштабирование + +### Чего в Kafka нет из коробки + +* Это не брокер сообщений +* Отложенные сообщения +* DLQ +* AMQP / MQTT +* TTL на сообщение +* Очереди с приоритетами + +[к оглавлению](#apache-kafka) + +## Основные компоненты Kafka + +* **Producer (Производитель)** — приложение, которое публикует сообщения в топики Kafka +* **Consumer (Потребитель)** — приложение, которое подписывается на топики и читает сообщения +* **Broker (Брокер)** — сервер Kafka, который принимает, хранит и распределяет сообщения. В кластере Kafka может быть несколько брокеров +* **Topic (Топик)** — логическое разделение, по которому организуются данные. Производители отправляют сообщения в топики, а потребители читают из них +* **Partition (Раздел)** — каждый топик разделён на партиции для параллельной обработки. Сообщения в партициях упорядочены +* **Zookeeper** — сервис, используемый Kafka для управления состоянием кластера и координации брокеров. +Однако в новых версиях Kafka отказывается от Zookeeper в пользу собственного механизма метаданных KRaft (Kafka Raft). +Это новая внутренняя архитектура метаданных Kafka, которая устраняет зависимость от Zookeeper. Она основана на Raft-консенсусе, +позволяя Kafka брокерам самостоятельно управлять метаданными и координировать взаимодействие между собой. + +[к оглавлению](#apache-kafka) + +## Архитектура топика + +* **Топик разбит на партиции** — сообщения в топике распределяются по партициям для более эффективной параллельной обработки и хранения +* **Партиции хранятся на диске** — Kafka сохраняет данные на диск, что позволяет долговременно хранить сообщения +* **Партиции делятся на сегменты** — сегмент представляет собой обычный файл на диске, сегменты делятся на пассивные и активный. + Запись происходит в активный сегмент +* **Данные удаляются либо по времени, либо по размеру**. Удаление происходит посегментно, с самого старого сегмента + * **retention.bytes** - по максимальному размеру + * **retention.ms** - по времени +* **Сообщение можно быстро найти по его Offset** — каждому сообщению в партиции присваивается уникальный смещающий индекс (offset), по которому можно легко найти сообщение + +[к оглавлению](#apache-kafka) + +## Настройки топика Kafka + +### Репликация + +* `replication.factor` + * **Описание**: Количество реплик для каждой партиции топика + * **Пример**: `replication.factor=3` +* `min.insync.replicas` + * **Описание**: Минимальное количество синхронизированных реплик + * **Пример**: `min.insync.replicas=2` + +### Хранение данных + +* `retention.ms` + * **Описание**: Время хранения сообщений в топике в миллисекундах + * **Пример**: `retention.ms=604800000` (7 дней) +* `retention.bytes` + * **Описание**: Максимальный объём данных в топике, после чего старые сообщения удаляются + * **Пример**: `retention.bytes=10737418240` (10 GB) +* `segment.bytes` + * **Описание**: Размер сегмента логов топика + * **Пример**: `segment.bytes=1073741824` (1 GB) + +### Политики очистки + +* `cleanup.policy` + * **Описание**: Как Kafka обрабатывает старые сообщения + * **Значения**: `delete`, `compact` + * **Пример**: `cleanup.policy=delete` + +### Партиции + +* `num.partitions` + * **Описание**: Количество партиций в топике + * **Пример**: `num.partitions=3` + +[к оглавлению](#apache-kafka) + +## Архитектура брокера + +* **У каждой партиции свой лидер** — в Kafka для каждой партиции в топике назначается лидер-брокер, который отвечает + за запись и чтение данных +* **Сообщения пишутся в лидера** — производители отправляют сообщения напрямую в брокер-лидер партиции +* **Данные реплицируются между брокерами** — для обеспечения отказоустойчивости Kafka реплицирует данные партиций на + другие брокеры, которые становятся репликами +* **Автоматический фейловер лидера** — в случае сбоя брокера-лидера Kafka автоматически назначает новый лидер из числа + реплик, обеспечивая бесшовную работу системы + +[к оглавлению](#apache-kafka) + +## Настройки брокера Kafka + +### Репликация и консистентность + +* `min.insync.replicas` + * **Описание**: Минимальное количество синхронизированных реплик для подтверждения записи + * **Пример**: `min.insync.replicas=2` +* `unclean.leader.election.enable` + * **Описание**: Разрешает выбор лидера из неактуальных реплик, если нет синхронизированных реплик + * **Пример**: `unclean.leader.election.enable=false` + +### Логирование и хранение данных + +* `log.dirs` + * **Описание**: Директория на диске, где хранятся логи партиций + * **Пример**: `log.dirs=/var/lib/kafka/logs` +* `log.retention.hours` + * **Описание**: Максимальное время хранения данных в логах + * **Пример**: `log.retention.hours=168` (7 дней) +* `log.segment.bytes` + * **Описание**: Максимальный размер сегмента лога, после чего создаётся новый + * **Пример**: `log.segment.bytes=1073741824` (1 GB) + +### Производительность и задержки + +* `num.network.threads` + * **Описание**: Количество потоков для обработки сетевых запросов + * **Пример**: `num.network.threads=3` +* `num.io.threads` + * **Описание**: Количество потоков для ввода-вывода + * **Пример**: `num.io.threads=8` +* `socket.send.buffer.bytes` + * **Описание**: Размер буфера для отправки данных по сети + * **Пример**: `socket.send.buffer.bytes=102400` + +### Управление сообщениями + +* `message.max.bytes` + * **Описание**: Максимальный размер сообщения, которое брокер может принять + * **Пример**: `message.max.bytes=1048576` (1 MB) +* `replica.fetch.max.bytes` + * **Описание**: Максимальный размер данных для запроса реплики + * **Пример**: `replica.fetch.max.bytes=1048576` (1 MB) + +### Безопасность + +* `ssl.keystore.location` + * **Описание**: Путь к хранилищу ключей SSL + * **Пример**: `ssl.keystore.location=/var/private/ssl/kafka.keystore.jks` +* `ssl.truststore.location` + * **Описание**: Путь к хранилищу доверенных сертификатов + * **Пример**: `ssl.truststore.location=/var/private/ssl/kafka.truststore.jks` + +[к оглавлению](#apache-kafka) + +## Архитектура продюсера + +* **Создание сообщения (Record)**: Продюсер формирует сообщение, содержащее ключ (необязательный), значение и метаданные, + такие как время отправки. Сообщение отправляется в топик (Topic), который состоит из одной или нескольких партиций +* **Выбор партиции**: Если ключ сообщения указан, Kafka использует его для хеширования и определения, в какую партицию + записать сообщение (сообщения с одинаковым ключом попадают в одну и ту же партицию). Если ключа нет, Kafka распределяет + сообщения по партициям с помощью round-robin или по другим правилам +* **Отправка сообщений в буфер (Batching)**: Для повышения производительности продюсер Kafka не отправляет каждое сообщение + по отдельности, а группирует несколько сообщений в пакеты (batching), прежде чем отправить их брокеру. Это снижает + сетевые задержки и нагрузку на брокера +* **Сжатие (Compression)**: Для уменьшения объёма передаваемых данных продюсер может сжимать сообщения с использованием + таких алгоритмов, как GZIP, Snappy или LZ4. Сжатие снижает нагрузку на сеть и хранение, но добавляет небольшие накладные + расходы на процессор +* **Асинхронная отправка**: Продюсер отправляет пакеты сообщений асинхронно. Это означает, что сообщения записываются в + буфер памяти и отправляются брокеру, не ожидая завершения предыдущих операций. Это повышает пропускную способность +* **Подтверждения (Acknowledgments)**: Kafka позволяет настраивать уровень подтверждений от брокеров +* **Ретрай и идемпотентность**: Если отправка сообщения не удалась, продюсер может повторить попытку отправки (ретрай). + Также можно включить идемпотентный режим продюсера, что предотвращает повторную отправку одного и того же сообщения в + случае сбоя, обеспечивая отправку уникального сообщения один раз +* **Error handling**: Продюсер обрабатывает ошибки при отправке сообщений. В зависимости от настроек продюсер может + попытаться переотправить сообщение или сообщить о проблеме через callback + +### Резюме + +* Продюсер выбирает партицию для сообщения +* Продюсер выбирает уровень гарантии доставки +* В продюсере можно тюнить производительность + +[к оглавлению](#apache-kafka) + +## Настройки продюсера + +### Bootstrap-серверы (`bootstrap.servers`) + +* **Описание**: Указывает адреса брокеров Kafka, к которым продюсер должен подключаться для отправки сообщений +* **Пример**: `bootstrap.servers: localhost:9092,localhost:9093` +* **Зачем это нужно**: Kafka продюсер использует эти брокеры для получения метаданных о кластере (например, информация о топиках и партициях). Эти брокеры служат точками входа в кластер Kafka. + +### Сериализация ключа и значения + +Продюсер должен преобразовывать (сериализовать) данные в байтовый формат перед отправкой в Kafka + +* **Ключевая настройка для сериализации ключа:** + * `key.serializer` + * Пример: `key.serializer: org.apache.kafka.common.serialization.StringSerializer` +* **Ключевая настройка для сериализации значения:** + * `value.serializer` + * Пример: `value.serializer: org.apache.kafka.common.serialization.StringSerializer` + +**Варианты сериализаторов:** +* `StringSerializer` для строк +* `ByteArraySerializer` для массива байтов +* `LongSerializer` для чисел +* Также можно реализовать свои собственные сериализаторы + +### Отправка сообщений в буфер + +Продюсер Kafka отправляет сообщения асинхронно, и для этого используется буферизация сообщений + +* **batch.size**: Размер одного пакета (batch), который продюсер отправляет брокеру + * **Описание**: Определяет количество байтов сообщений, которые могут быть буферизованы в одном пакете перед отправкой брокеру + * **Пример**: `"batch.size": 16384` (16 KB) + * **Зачем это нужно**: Большие пакеты могут повысить производительность, но могут увеличить задержки +* **linger.ms**: Максимальное время ожидания перед отправкой пакета + * **Описание**: Продюсер может немного подождать, пока буфер накопит сообщения, чтобы отправить больше данных за один раз + * **Пример**: `linger.ms: 5` (время ожидания 5 мс) + * **Зачем это нужно**: Позволяет продюсеру собирать больше сообщений в пакете перед отправкой, что может улучшить эффективность использования сети +* **buffer.memory**: Размер выделенной памяти для буферизации сообщений + * **Описание**: Общий объем памяти, который продюсер может использовать для хранения сообщений, ожидающих отправки + * **Пример**: `buffer.memory: 33554432` (32 MB) + * **Зачем это нужно**: Если буфер заполняется, продюсер приостанавливает отправку сообщений, пока буфер не освободится + +### Сжатие сообщений + +Продюсер может сжимать сообщения для уменьшения объема передаваемых данных + +* **compression.type** + * **Описание**: Указывает тип сжатия для сообщений + * **Пример**: `compression.type: gzip` (варианты: none, gzip, snappy, lz4, zstd) + * **Зачем это нужно**: Сжатие уменьшает объем данных, передаваемых по сети, что может снизить нагрузку на сеть и хранилище, + особенно при больших объемах сообщений. Однако это может потребовать дополнительных ресурсов на сжатие/разжатие + +### Распределение сообщений по партициям (партицирование) + +* **partitioner.class** + * **Описание**: определяет логику, по которой продюсер выбирает партицию для каждого сообщения + * **Примеры**: + * **если настройка не задана**, по умолчанию используется `DefaultPartitioner` , который может распределять сообщения по партициям + равномерно или на основе ключа сообщения + * `partitioner.class: o.a.k.clients.producer.RoundRobinPartitioner` использует метод Round Robin для распределения сообщений + * `partitioner.class: o.a.k.clients.producer.UniformStickyPartitioner` равномерно отправляет сообщения, привязываясь + к партиции на короткий промежуток времени, чтобы уменьшить нагрузку на брокеры + +### Подтверждения (acks) + +Настройка определяет, как много брокеров должны подтвердить получение сообщения перед тем, как продюсер будет считать его +успешно отправленным + +* **acks** + * **Описание**: Определяет количество подтверждений от брокеров + * **Значения**: + * `0`: Продюсер не ждёт подтверждений (самая быстрая отправка, но высокий риск потери сообщений) + * `1`: Продюсер ждёт подтверждения от лидера партиции + * `all` (или `-1`): Продюсер ждёт подтверждений от всех реплик (наибольшая надежность, но увеличенные задержки) + * **Пример**: `acks: all` + * **Зачем это нужно**: Позволяет выбрать баланс между скоростью и надежностью отправки данных. + +### Дополнительные важные настройки + +* **Количество повторных попыток (retries):** + * **Описание**: Определяет, сколько раз продюсер должен попытаться отправить сообщение при неудаче + * **Пример**: `retries: 3` + * **Зачем это нужно**: Если произошёл временный сбой, продюсер может попытаться повторить отправку сообщений, что + увеличивает шанс доставки +* **Идемпотентность продюсера (enable.idempotence):** + * **Описание**: Включение идемпотентного режима, что предотвращает дублирование сообщений при сбоях + * **Пример**: `enable.idempotence: true` + * **Зачем это нужно**: Гарантирует, что каждое сообщение будет доставлено ровно один раз +* **Максимальный размер сообщения (max.request.size):** + * **Описание**: Максимальный размер сообщения, которое продюсер может отправить брокеру + * **Пример**: `max.request.size: 1048576` (1 MB) + * **Зачем это нужно**: Ограничивает размер сообщений, которые могут быть отправлены, чтобы избежать перегрузки сети и брокеров. +* **Таймаут ожидания подтверждений (request.timeout.ms):** + * **Описание**: Максимальное время ожидания подтверждения от брокера + * **Пример**: `request.timeout.ms: 30000` (30 секунд) + * **Зачем это нужно**: Помогает избежать бесконечного ожидания ответа от брокера в случае его сбоя + +[к оглавлению](#apache-kafka) + +## Пример конфигурации Kafka Producer + +```java +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import java.util.Properties; + +public class KafkaStringArrayProducer { + + public static void main(String[] args) { + // Настройки Kafka Producer + Properties props = new Properties(); + props.put("bootstrap.servers", "localhost:9092"); + props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + + // Создание Kafka Producer + KafkaProducer producer = new KafkaProducer<>(props); + + String key = "user123"; + String[] value = {"message1", "message2", "message3"}; + + // Создание записи и добавление заголовков + ProducerRecord record = new ProducerRecord<>("my_topic", key, value); + record.headers().add("traceId", "someTraceId"); + + // Отправка сообщения в Kafka + producer.send(record, (metadata, exception) -> { + if (exception != null) { + System.out.println("Ошибка при отправке сообщения: " + exception.getMessage()); + } else { + System.out.println("Сообщение отправлено в топик " + metadata.topic() + " с партицией " + metadata.partition()); + } + }); + + producer.close(); + } +} +``` + +```properties +acks=all +retries=3 +compression.type=gzip +``` + +### С использованием Spring Kafka + +```java +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.config.ConcurrentMessageListenerContainer; +import org.springframework.kafka.listener.MessageListenerContainer; +import org.springframework.kafka.producer.Producer; +import org.springframework.kafka.producer.ProducerRecord; + +import java.util.HashMap; +import java.util.Map; + +@EnableKafka +@Configuration +public class KafkaProducerConfig { + + @Autowired + private KafkaProperties kafkaProperties; + + @Bean + public Map producerConfigs() { + Map props = new HashMap<>(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getServer()); + props.put(ProducerConfig.CLIENT_ID_CONFIG, kafkaProperties.getProducerId()); + props.put( + ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, + "com.example.configuration.kafka.KafkaProducerLoggingInterceptor" + ); + + if ("SASL_SSL".equals(kafkaProperties.getSecurityProtocol())) { + props.put("ssl.truststore.location", kafkaProperties.getSslTrustStoreLocation()); + props.put("ssl.truststore.password", kafkaProperties.getSslTrustStorePassword()); + props.put("ssl.truststore.type", kafkaProperties.getSslTrustStoreType()); + props.put("ssl.keystore.type", kafkaProperties.getSslKeyStoreType()); + + props.put("sasl.mechanism", kafkaProperties.getSaslMechanism()); + props.put("security.protocol", kafkaProperties.getSecurityProtocol()); + props.put("sasl.jaas.config", kafkaProperties.getJaasConfigCompiled()); + } + + return props; + } + + @Bean + public ProducerFactory producerFactory() { + var stringSerializerKey = new StringSerializer(); + stringSerializerKey.configure(Map.of("key.serializer.encoding", "UTF-8"), true); + stringSerializerKey.configure(Map.of("serializer.encoding", "UTF-8"), true); + + var stringSerializerValue = new StringSerializer(); + stringSerializerValue.configure(Map.of("value.serializer.encoding", "UTF-8"), false); + stringSerializerValue.configure(Map.of("serializer.encoding", "UTF-8"), false); + + return new DefaultKafkaProducerFactory<>(producerConfigs(), stringSerializerKey, stringSerializerValue); + } + + @Bean + public KafkaTemplate kafkaTemplate() { + return new KafkaTemplate<>(producerFactory()); + } +} +``` + +```java +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.stereotype.Service; + +@Service +public class KafkaProducerService { + + private final KafkaTemplate kafkaTemplate; + + public KafkaProducerService(KafkaTemplate kafkaTemplate) { + this.kafkaTemplate = kafkaTemplate; + } + + public void sendMessage(String message, String key, String topic) { + try { + log.info("Sending message {}", data); + kafkaTemplate.send(topic, key, message); + log.info("Successfully send message {}", data); + } catch (Exception ex) { + log.error("Failed send message to {} topic by key {}", key, topic); + throw ex; + } + } +} +``` + +```java +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.*; + +@RestController +@RequestMapping("/kafka") +public class KafkaController { + + @Autowired + private KafkaProducerService kafkaProducerService; + + @PostMapping("/send") + public String sendMessage(@RequestParam String message, @RequestParam String key, @RequestParam String topic) { + kafkaProducerService.sendMessage(message, key, topic); + return "Message sent to Kafka!"; + } +} +``` + +### С использованием Spring Cloud Stream + +```yaml +spring: + cloud: + stream: + bindings: + output: + destination: my_topic + kafka: + binder: + brokers: localhost:9092 +``` + +```java +import org.springframework.cloud.stream.annotation.EnableBinding; +import org.springframework.cloud.stream.messaging.Source; +import org.springframework.integration.support.MessageBuilder; +import org.springframework.messaging.Message; +import org.springframework.stereotype.Service; + +@Service +@EnableBinding(Source.class) // Подключение к каналу сообщений +public class KafkaStreamProducer { + + private final Source source; + + public KafkaStreamProducer(Source source) { + this.source = source; + } + + public void sendMessage(String message) { + Message msg = MessageBuilder.withPayload(message).build(); + source.output().send(msg); // Отправка сообщения в Kafka + } +} +``` + +```java +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.*; + +@RestController +@RequestMapping("/kafka-stream") +public class KafkaStreamController { + + @Autowired + private KafkaStreamProducer kafkaStreamProducer; + + @PostMapping("/send") + public String sendMessage(@RequestParam String message) { + kafkaStreamProducer.sendMessage(message); + return "Message sent to Kafka via Spring Cloud Stream!"; + } +} +``` + +[к оглавлению](#apache-kafka) + +## Архитектура консюмера + +Потребители используют **Kafka Consumer API** для взаимодействия с брокерами Kafka. Они получают сообщения и обрабатывают +их согласно своей логике. Потребители могут быть объединены в группы **Consumer Groups**. + +### Резюме + +* "Smart" консюмер +* Консюмер опрашивает кафку +* Консюмер отвечает за гарантию обработки +* Автоматические фейловер в консюмер-группе +* Независимая обработка разными консюмер-группе + +### Компоненты + +#### Consumer Group + +Kafka использует концепцию Consumer Groups, что позволяет нескольким потребителям работать вместе, чтобы параллельно +обрабатывать данные из топиков. Каждый потребитель в группе обрабатывает только часть данных из топика, обеспечивая масштабируемость и балансировку нагрузки. + +* Все сообщения из одного Kafka Topic делятся между всеми потребителями в группе +* Если в группе несколько потребителей, Kafka гарантирует, что каждая партиция топика будет обрабатываться только одним потребителем +* В случае если один из потребителей выходит из строя, его партиции автоматически перераспределяются между оставшимися активными потребителями + +#### Offset (Смещение) + +Потребитель отслеживает offset каждой партиции, чтобы понимать, с какого сообщения продолжать чтение. Смещение — это +уникальный идентификатор каждого сообщения в партиции. + +Потребители могут хранить offset в Kafka или вне её (например, в базе данных или файловой системе). Если потребитель +отключается, он может возобновить обработку с того места, где остановился, прочитав сохранённый offset. + +#### Poll (Опрос) + +Потребители используют метод poll() для опроса Kafka на наличие новых сообщений. Это асинхронный процесс, и Kafka будет +отправлять потребителю доступные сообщения по мере их поступления. + +* Потребитель может указывать тайм-аут, после которого метод poll() вернёт пустой результат, если сообщений нет. +* Потребитель должен обрабатывать сообщения, а затем снова опрашивать Kafka для получения новых данных. + +### Процесс работы + +1. **Инициализация**: Потребитель подключается к Kafka-брокерам и присоединяется к consumer group. Он получает информацию о партиции топика, который будет читать. +2. **Подписка на топик**: Потребитель подписывается на определённые топики с помощью метода `subscribe()`. +3. **Опрос**: Потребитель вызывает метод `poll()` для получения новых сообщений. Если в очереди есть сообщения, они передаются потребителю для обработки. +4. **Обработка сообщений**: Потребитель обрабатывает сообщения, извлекая полезную информацию из каждого. +5. **Подтверждение обработки**: После обработки сообщения потребитель подтверждает обработку с помощью `commit()`. + Это обновляет **offset**, позволяя потребителю продолжить чтение с места, на котором остановился. +6. **Обработка ошибок**: В случае ошибки потребитель может решить, как повторить обработку сообщения + (например, с использованием механизма повторных попыток). +7. **Завершение работы**: Когда потребитель завершает обработку, он выходит из consumer group и может закрыть соединение с Kafka. + +[к оглавлению](#apache-kafka) + +## Настройки консюмера + +* **bootstrap.servers** — список брокеров, к которым будет подключаться потребитель +* **group.id** — идентификатор группы потребителей +* **auto.offset.reset** — настройка поведения при отсутствии offset (`earliest` для чтения с самого начала или `latest` для чтения с конца) +* **enable.auto.commit** — указывает, должен ли потребитель автоматически коммитить offset. Если `false`, потребитель должен делать это вручную +* **auto.commit.interval.ms** — определяет интервал времени между автоматическими коммитами offset сообщений, если включена автоматическая фиксация +* **max.poll.records** — максимальное количество сообщений, которые потребитель будет получать за один вызов `poll()` +* **session.timeout.ms** — максимальное время без общения с Kafka перед тем, как потребитель считается недоступным +* **client.rack** — используется для указания серверной стойки или дата-центра. Это особенно важно в случае, если у вас + есть распределённая инфраструктура Kafka с несколькими стойками или дата-центрами, где сообщения могут быть реплицированы + между разными физическими местоположениями (например, несколькими дата-центрами). + +### Что такое Rack в контексте Kafka? + +**Rack** — это метка, которая идентифицирует физическое местоположение брокеров Kafka. В Kafka можно задать rack для каждого брокера +с помощью параметра `broker.rack`, чтобы управлять репликацией данных, предпочтительно размещая реплики на разных физических машинах или в разных дата-центрах. + +**Преимущества использования client.rack** + +* **Снижение задержек**: Kafka будет предпочитать, чтобы данные попадали в тот же rack, где находится клиент, что уменьшает время отклика +* **Повышенная отказоустойчивость**: С правильной настройкой client.rack и broker.rack можно улучшить отказоустойчивость + за счет размещения реплик в разных физически удаленных местах +* **Лучшее использование ресурсов**: Правильное распределение нагрузки по rack помогает избежать перегрузки одного физического местоположения + +[к оглавлению](#apache-kafka) + +## Пример конфигурации Kafka Consumer + +```java +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.StringDeserializer; + +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; +import java.util.Collections; + +public class KafkaConsumerExample { + + public static void main(String[] args) { + String bootstrapServers = "localhost:9092"; + String groupId = "my-consumer-group"; + String topic = "my-topic"; + + // Настройки Consumer + Map consumerConfigs = new HashMap<>(); + consumerConfigs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + consumerConfigs.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); + consumerConfigs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + consumerConfigs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + // Создание Consumer + KafkaConsumer consumer = new KafkaConsumer<>(consumerConfigs); + + // Подписка на тему + consumer.subscribe(Collections.singletonList(topic)); + + try { + // Чтение сообщений из Kafka + while (true) { + var records = consumer.poll(Duration.ofSeconds(1)); + records.forEach(record -> System.out.println("Received message: " + record.value())); + } + } finally { + consumer.close(); + } + } +} +``` + +**At least once** + +Чтобы гарантировать обработку сообщений хотя бы один раз, нужно коммитить после обработки. + +```java +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; + +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; +import java.util.Collections; + +public class KafkaConsumerAtLeastOnce { + + public static void main(String[] args) { + try { + // Чтение сообщений + while (true) { + var records = consumer.poll(Duration.ofSeconds(1)); // Ожидание 1 секунду для получения сообщений + process(records); + consumer.commitAsync(); // Commit после обработки + } + } finally { + consumer.close(); // Закрытие consumer + } + } +} +``` + +**At most once** + +Чтобы гарантировать обработку сообщений не более одного раза, нужно коммитить до обработки или включить авто-подтверждение смещений +`enable.auto.commit=true`. + +```java +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; + +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; +import java.util.Collections; + +public class KafkaConsumerAtLeastOnce { + + public static void main(String[] args) { + try { + // Чтение сообщений + while (true) { + var records = consumer.poll(Duration.ofSeconds(1)); // Ожидание 1 секунду для получения сообщений + consumer.commitAsync(); // Commit перед обработкой + process(records); + } + } finally { + consumer.close(); // Закрытие consumer + } + } +} +``` + +### С использованием Spring Kafka + +```java +@EnableKafka +@Configuration +public class KafkaConsumerConfig { + + @Autowired + private KafkaProperties kafkaProperties; + + @Bean + public ConsumerFactory consumerFactory() { + return new DefaultKafkaConsumerFactory<>(consumerConfigs()); + } + + @Bean + public Map consumerConfigs() { + Map configs = new HashMap<>(); + configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getServer()); + configs.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getConsumerGroupId()); + configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + return configs; + } + + @Bean + public KafkaListenerContainerFactory> kafkaListenerContainerFactory() { + ConcurrentMessageListenerContainerFactory factory = new ConcurrentMessageListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory()); + return factory; + } +} +``` + +```java +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.stereotype.Service; + +@Service +public class KafkaConsumer { + + @KafkaListener(topics = "my_topic", groupId = "group_id") + public void listen(@Payload String message, + @Header("traceId") String traceId, + @Header("correlationId") String correlationId) { + System.out.println("Received message: " + message); + System.out.println("Trace ID: " + traceId); + System.out.println("Correlation ID: " + correlationId); + } +} +``` + +**At least once** + +```yaml +spring: + kafka: + consumer: + enable-auto-commit: false # Отключение авто-commit + auto-offset-reset: earliest # Начинать чтение с самого начала (если нет смещения) + group-id: my-consumer-group + max-poll-records: 500 # Максимальное количество сообщений для обработки за один раз + listener: + ack-mode: manual # Ручное подтверждение +``` + +```java +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.listener.MessageListener; +import org.springframework.kafka.listener.MessageListenerContainer; +import org.springframework.kafka.listener.config.DefaultMessageListenerContainer; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.listener.MessageListener; +import org.springframework.kafka.listener.MessageListenerContainer; + +@EnableKafka +public class AtLeastOnceConsumer { + + @KafkaListener(topics = "my-topic", groupId = "my-consumer-group") + public void listen(String message, Acknowledgment acknowledgment) { + System.out.println("Received message: " + message); + // Обработка сообщения + // Подтверждение смещения вручную после успешной обработки + acknowledgment.acknowledge(); + } +} +``` + +**At most once** + +```yaml +spring: + kafka: + consumer: + enable-auto-commit: true # Включение авто-commit + group-id: my-consumer-group + auto-offset-reset: earliest # Начинать чтение с самого начала + max-poll-records: 100 # Максимальное количество сообщений для обработки за один раз +``` + +```java +import org.springframework.kafka.annotation.KafkaListener; + +public class AtMostOnceConsumer { + + @KafkaListener(topics = "my-topic", groupId = "my-consumer-group") + public void listen(String message) { + System.out.println("Received message: " + message); + // Обработка сообщения... + // Смещение будет автоматически зафиксировано после получения сообщения + } +} +``` + +### С использованием Spring Cloud Stream + +```yaml +spring: + cloud: + stream: + bindings: + input: + destination: my-topic + group: my-consumer-group + content-type: application/json + kafka: + binder: + brokers: localhost:9092 + auto-create-topics: false +``` + +```java +import org.springframework.cloud.stream.annotation.EnableBinding; +import org.springframework.cloud.stream.annotation.StreamListener; +import org.springframework.messaging.handler.annotation.Payload; +import org.springframework.stereotype.Service; + +@Service +@EnableBinding(KafkaProcessor.class) // Указывает на интерфейс, с которым связывается этот сервис +public class KafkaConsumerService { + + // Метод будет слушать сообщения из указанного канала + @StreamListener("input") + public void handle(@Payload String message) { + System.out.println("Received message: " + message); + } +} +``` + +```java +import org.springframework.cloud.stream.annotation.Input; +import org.springframework.messaging.SubscribableChannel; + +public interface KafkaProcessor { + + @Input("input") // Имя канала, которое мы используем в application.yml + SubscribableChannel input(); +} +``` + +**At least once** + +```yaml +spring: + cloud: + stream: + bindings: + input: + destination: my-topic + group: my-consumer-group + content-type: application/json + consumer: + ackMode: manual # Ручное подтверждение + maxAttempts: 3 # Максимальное количество попыток +``` + +```java +import org.springframework.cloud.stream.annotation.EnableBinding; +import org.springframework.cloud.stream.annotation.StreamListener; +import org.springframework.messaging.Message; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.messaging.simp.SimpMessagingTemplate; +import org.springframework.stereotype.Component; + +@Component +@EnableBinding(Sink.class) // Sink - это интерфейс, предоставляющий Binding для входных сообщений +public class AtLeastOnceConsumer { + + @StreamListener(Sink.INPUT) + public void handleMessage(Message message, @Header(name = "kafka_offset") String offset) { + // Обработка сообщения + System.out.println("Received message: " + message.getPayload()); + // После успешной обработки подтверждаем сообщение + // Spring Cloud Stream автоматически подтвердит сообщение после завершения метода + // благодаря ackMode=manual и настроенному acknowledgment + } +} +``` + +**At most once** + +```yaml +spring: + cloud: + stream: + bindings: + input: + destination: my-topic + group: my-consumer-group + content-type: application/json + consumer: + ackMode: batch # Автоматическое подтверждение после пакета сообщений +``` + +```java +import org.springframework.cloud.stream.annotation.EnableBinding; +import org.springframework.cloud.stream.annotation.StreamListener; +import org.springframework.messaging.Message; +import org.springframework.stereotype.Component; + +@Component +@EnableBinding(Sink.class) +public class AtMostOnceConsumer { + + @StreamListener(Sink.INPUT) + public void handleMessage(Message message) { + // Обработка сообщения + System.out.println("Received message: " + message.getPayload()); + // Смещение будет автоматически зафиксировано после получения сообщения + } +} +``` + +**Mostly Once** + +Это гибридный режим, который стремится быть чем-то средним между At least once и At most once. Он предполагает, что сообщения +будут доставлены обычно один раз, но иногда, в случае сбоев, может быть обработано больше одного раза. Для реализации +такого режима в Spring Cloud Stream потребуется дополнительная логика, например, фильтрация дублированных сообщений или +использование уникальных идентификаторов сообщений. + +В рамках Spring Cloud Stream, можно обработать Mostly Once с использованием уникальных идентификаторов сообщений или +кеширования состояния, чтобы отфильтровать повторно обработанные сообщения. + +```yaml +spring: + cloud: + stream: + bindings: + input: + destination: my-topic + group: my-consumer-group + content-type: application/json + consumer: + ackMode: manual # Ручное подтверждение + maxAttempts: 3 # Максимальное количество попыток +``` + +```java +import org.springframework.cloud.stream.annotation.EnableBinding; +import org.springframework.cloud.stream.annotation.StreamListener; +import org.springframework.messaging.Message; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.messaging.simp.SimpMessagingTemplate; +import org.springframework.stereotype.Component; + +import java.util.HashSet; +import java.util.Set; + +@Component +@EnableBinding(Sink.class) +public class MostlyOnceConsumer { + + private Set processedMessageIds = new HashSet<>(); + + @StreamListener(Sink.INPUT) + public void handleMessage(Message message, @Header("messageId") String messageId) { + if (processedMessageIds.contains(messageId)) { + System.out.println("Duplicate message: " + messageId); + return; // Пропускаем дублированное сообщение + } + // Обработка сообщения + System.out.println("Received message: " + message.getPayload()); + // Добавляем идентификатор в обработанные + processedMessageIds.add(messageId); + // После успешной обработки подтверждаем сообщение вручную + // Spring Cloud Stream подтвердит сообщение после выполнения метода + } +} +``` + +[к оглавлению](#apache-kafka) + +## Основные API Kafka + +* Producer API +* Consumer API +* Streams API +* Connector API + +[к оглавлению](#apache-kafka) + +## Какова роль Producer API? + +Используется для публикации потока сообщений в топики Kafka. Он управляет партицированием сообщений, сжатием и балансировкой +нагрузки между несколькими брокерами. Продюсер также отвечает за повторные неудачные попытки публикации и может быть +настроен на различные уровни гарантий доставки. + +[к оглавлению](#apache-kafka) + +## Какова роль Consumer API? + +Обеспечивает механизм для потребления сообщений топиков. Оно позволяет приложениям и микросервисам читать данные, +поступающие в Kafka, и обрабатывать их для дальнейшего использования, будь то хранение, анализ или реактивная обработка. + +[к оглавлению](#apache-kafka) + +## Какова роль Connector API? + +Connector API в Apache Kafka является частью Kafka Connect, которая представляет собой инфраструктуру для интеграции +внешних систем с Kafka. Connector API играет ключевую роль в упрощении процесса подключения различных источников данных +и систем-приемников к Kafka, предоставляя возможность автоматического перемещения данных между ними. + +[к оглавлению](#apache-kafka) + +## Какова роль Streams API? + +Это компонент Apache Kafka, предназначенный для создания приложений и микросервисов, которые обрабатывают потоки данных +в реальном времени. Его основная роль заключается в том, чтобы позволить разработчикам легко обрабатывать и анализировать +данные, поступающие в виде непрерывных потоков из топиков. Kafka Streams API предоставляет высокоуровневый интерфейс для +выполнения таких операций, как фильтрация, агрегация, объединение данных и вычисление оконных функций. + +[к оглавлению](#apache-kafka) + +## Какова роль Transactions API? + +Kafka Transactions API позволяет выполнять атомарные обновления для нескольких топиков. Он включает exactly-once +гарантию для приложений, которые читают данные из одного топика и пишут в другой. Это особенно полезно для приложений потоковой +обработки, которым необходимо гарантировать, что каждое входное событие влияет на выходные данные ровно один раз, даже в случае сбоев. + +[к оглавлению](#apache-kafka) + +## Какова роль Quota API? + +Quota API позволяет настраивать квоты для каждого клиента для ограничения скорости создания или потребления данных, чтобы +один клиент не потреблял слишком много ресурсов брокера. Это помогает обеспечить справедливое распределение ресурсов и +предотвратить сценарии отказа в обслуживании. + +[к оглавлению](#apache-kafka) + +## Какова роль AdminClient API? + +AdminClient API предоставляет операции для управления топиками, брокерами, конфигурацией и другими объектами Kafka. +Его можно использовать для создания, удаления и описания топиков, управления списками ACL, получения информации о кластере и +программного выполнения других административных задач. + +[к оглавлению](#apache-kafka) + +## Kafka Consumer + +## Для чего нужен координатор группы? + +Координатор группы отвечает за управление группами потребителей. Он управляет членством в группах потребителей, назначает +партиции потребителям внутри группы и управляет фиксацией смещения. Когда потребитель присоединяется к группе или покидает ее, +координатор группы запускает перебалансировку для переназначения партиций среди оставшихся потребителей. + +[к оглавлению](#apache-kafka) + +## Для чего нужен Consumer heartbeat thread? + +Consumer Heartbeat Thread отвечает за отправку периодических сигналов брокеру Kafka (в частности, координатору группы). +Эти сигналы указывают на то, что потребитель жив и все еще является частью группы потребителей. Если потребитель не отправляет +данные сигналы в течение настроенного периода, он считается неживым, и координатор группы инициирует перебалансировку +для переназначения его партиций другим потребителям в группе. + +[к оглавлению](#apache-kafka) + +## Как Kafka обрабатывает сообщения? + +Kafka поддерживает два основных способа обработки сообщений: +* **Queue**: каждое сообщение обрабатывается одним потребителем в группе потребителей. Это достигается за счет наличия + в группе нескольких потребителей, каждый из которых считывает данные из отдельных партиций. +* **Publish-Subscribe**: все сообщения обрабатываются всеми потребителями. Это достигается за счет того, что каждый + потребитель находится в своей собственной группе потребителей, что позволяет всем потребителям читать все сообщения. + +[к оглавлению](#apache-kafka) + +## Как Kafka обрабатывает задержку консюмера? + +Задержка (лаг) консюмера в Kafka относится к разнице между оффсетом последнего созданного сообщения и оффсетом последнего +полученного сообщения. Kafka предоставляет инструменты и API для мониторинга задержек консюмеров такие, как инструмент +командной строки Kafka Consumer Groups и API AdminClient. Высокая задержка консюмеров может указывать на проблемы с +производительностью или недостаточную пропускную способность консюмеров. Kafka не обрабатывает задержки автоматически, +но предоставляет информацию, необходимую приложениям для принятия решений о масштабировании или оптимизации производительности. + +[к оглавлению](#apache-kafka) + +## Для чего нужны методы subscribe() и poll()? + +Метод subscribe() используется для подписки на один или несколько топиков. Фактически он не извлекает никаких данных. +Метод poll(), с другой стороны, используется для извлечения данных из топиков. Он возвращает записи, которые были опубликованы +с момента последнего запроса топиков и партиций. Метод poll() обычно вызывается в цикле для непрерывного получения данных. + +[к оглавлению](#apache-kafka) + +## Для чего нужен метод position()? + +Метод position() возвращает смещение следующей записи, которая будет извлечена для данной партиции. Это полезно для +отслеживания хода получения данных и может использоваться в сочетании с методом committed(), чтобы определить насколько +сильно потребитель отстал от своего последнего комита оффсета. Эта информация может быть ценной для мониторинга и +управления показателями потребителей. + +[к оглавлению](#apache-kafka) + +## Для чего нужны методы commitSync() и commitAsync()? + +Эти методы используются для фиксации смещений: +* **commitSync()**: синхронно фиксирует последнее смещение, возвращенное poll(). Он будет повторять попытку до тех пор, + пока не завершится успешно или не столкнется с непроверяемой ошибкой. +* **commitAsync()**: асинхронно фиксирует смещения. Он не повторяет попытку при сбое, что делает его более быстрым, + но менее надежным, чем commitSync(). Выбор между этими методами зависит от баланса между производительностью и надежностью, + требуемого приложением. + +[к оглавлению](#apache-kafka) + +## Другие вопросы + +## Для чего нужен идемпотентный продюсер? + +Идемпотентный продюсер гарантирует exactly-once гарантию доставки, предотвращая дублирование записей в Kafka в случае +повторных попыток отправки сообщений. Это важно для поддержания целостности данных и правильности их обработки в системе, +особенно в распределенных системах, где могут возникать ошибки связи или сбои. + +[к оглавлению](#apache-kafka) + +## Для чего нужен интерфейс Partitioner? + +Интерфейс Partitioner в Producer API определяет в какую партицию топика будет отправлено сообщение. Partitioner по-умолчанию +использует хэш ключа (если он присутствует) для выбора партиции, гарантируя, что сообщения с одним и тем же ключом всегда +отправляются в одну и ту же партицию. Могут быть реализованы пользовательские Partitioner для управления распределением +сообщений по партициям на основе определенной бизнес-логики или характеристик данных. + +[к оглавлению](#apache-kafka) + +## Для чего нужен Broker log cleaner thread? + +Поток очистки журнала в Kafka отвечает за выполнение сжатия журнала. Сжатие журнала - это механизм, при котором Kafka +удаляет избыточные записи, сохраняя только последнее значение для каждого ключа. Это полезно в тех случаях, когда требуется +только последнее обновление для данного ключа, например, для обслуживания changelog или состояния БД. Программа очистки журналов +периодически запускается для сжатия соответствующих партиций. + +[к оглавлению](#apache-kafka) + +## Для чего нужен Kafka Mirror Maker? + +Это инструмент, позволяющий реплицировать данные между кластерами Kafka, потенциально находящихся в разных дата-центрах. +Он работает, потребляя данные из одного кластера и передавая в другой. Можно использовать для создания резервной копии данных, +объединения данных из нескольких дата-центров в единое хранилище или для переноса данных между кластерами. + +[к оглавлению](#apache-kafka) + +## Для чего нужна Schema Registry? + +Kafka Schema Registry предоставляет RESTful интерфейс для хранения и извлечения схем Avro. Schema Registry используется +совместно с Kafka для обеспечения совместимости схем данных между производителями и потребителями. Это особенно полезно +при разработке моделей данных с течением времени, сохраняя обратную и прямую совместимость. + +[к оглавлению](#apache-kafka) + +## Для чего нужен Streams DSL? + +Kafka Streams DSL предоставляет высокоуровневый API для операций потоковой обработки. Он позволяет разработчикам описывать +сложную логику обработки, такую как фильтрация, преобразование, агрегирование и объединение потоков данных. DSL абстрагирует +многие низкоуровневые детали потоковой обработки, упрощая создание и обслуживание приложений потоковой обработки. + +[к оглавлению](#apache-kafka) + +## Как Kafka обеспечивает версионирование сообщений? + +Сама по себе Kafka не обеспечивает версионирование сообщений напрямую, но предоставляет механизмы, позволяющие реализовывать +управление версиями. Одним из распространенных подходов является включение поля версии в схему сообщения. Для более сложных задач +управления версиями используются реестры схем (например, Confluent Schema Registry), которые могут управлять изменением схемы и совместимостью. + +[к оглавлению](#apache-kafka) + +## Как потребители получают сообщения от брокера? + +Kafka использует pull-модель для извлечения сообщений. Потребители запрашивают сообщения у брокеров, а не брокеры +отправляют сообщения потребителям. Это позволяет потребителям контролировать скорость, с которой они получают сообщения. +Потребители отправляют запросы на получение данных от брокера, указывая топик, партицию и начальное смещение для каждой партиции. +Брокер отвечает сообщениями с объемом до указанного максимального предела в байтах. + +[к оглавлению](#apache-kafka) + +## В чем разница между Kafka Streams и Apache Flink? + +Kafka Streams и Apache Flink — это два мощных инструмента для обработки потоков данных в режиме реального времени, но +они различаются по архитектуре, возможностям и сценариям применения. + +### Сравнение Kafka Streams и Apache Flink + +| **Критерий** | **Kafka Streams** | **Apache Flink** | +|------------------------|---------------------------------------------|-----------------------------------------| +| **Архитектура** | Встроенная библиотека, работающая внутри приложения. Зависит от Kafka. | Независимая распределенная система потоковой обработки данных с возможностью интеграции с различными источниками и приемниками данных. | +| **Обработка данных** | Обрабатывает потоки событий непосредственно из Kafka. Подходит для обработки событий и транзакционных данных с минимальной задержкой. | Поддерживает как потоковую (streaming), так и пакетную (batch) обработку данных. Специализируется на сложной обработке событий с гибкими возможностями управления состоянием. | +| **Зависимость от Kafka**| Построена исключительно вокруг Kafka. Требует Kafka для получения и отправки данных. | Работает с широким спектром источников данных (Kafka, HDFS, базы данных и т. д.). Kafka — лишь один из многих источников. | +| **Установка** | Легко интегрируется в существующее Java/Scala-приложение как библиотека. Не требует развертывания кластеров. | Требует отдельного кластера для выполнения, что подходит для высокопроизводительных распределенных систем. | +| **Управление состоянием** | Встроенное состояние с использованием RocksDB, также поддержка репликации состояния. | Имеет развитую систему управления состоянием, поддерживает сложные функции восстановления состояния и обработки данных. | +| **Гарантия доставки** | Поддерживает "at-least-once" и "exactly-once" семантику, когда Kafka настроена соответствующим образом. | Имеет гибкие гарантии доставки: поддержка "exactly-once", "at-least-once" и "at-most-once". | +| **Масштабируемость** | Масштабируется автоматически вместе с Kafka-партициями. Каждая инстанция потребителя Kafka обрабатывает свою партицию. | Поддерживает масштабирование на уровне задач (task), с более гибкой моделью масштабирования и управления ресурсами. | +| **Обработка событий** | Подходит для обработки событий с низкой задержкой и транзакционными требованиями. | Специализируется на сложной обработке событий, таких как windowing, агрегирование и работа с изменяющимся состоянием. Поддерживает сложные аналитические операции. | +| **Инструменты и API** | Легковесная библиотека с простыми API для работы с потоками данных. Основные операции — фильтрация, маппинг, объединение потоков, windowing. | Продвинутая система с богатыми API для сложных вычислений, поддерживающая потоковую и пакетную обработку, обработку событий и контроль сложных бизнес-процессов. | +| **Требования к ресурсам**| Менее ресурсоемка, так как не требует отдельного кластера. Работает в рамках JVM-приложения. | Требует более высоких вычислительных ресурсов, так как выполняется на отдельном кластере и поддерживает высокую степень параллелизма. | + +### Когда выбрать Kafka Streams +- Если вы уже используете Kafka и вам нужна легковесная библиотека для обработки данных непосредственно внутри вашего приложения. +- Для сценариев с низкой задержкой, где данные приходят из Kafka и должны быть быстро обработаны с минимальными накладными расходами. +- Если вам нужно встроить обработку потоков данных в существующую Java/Scala программу без необходимости развертывания отдельных кластеров. + +### Когда выбрать Apache Flink +- Если вы работаете с потоковой и пакетной обработкой данных, где источники и приемники могут быть не только Kafka, но и другие системы (например, HDFS, базы данных). +- Для сложных задач обработки событий, требующих управления состоянием, временных окон, аналитики и восстановления после сбоев. +- Если ваш проект требует высокой производительности, гибкости, точных гарантий доставки и распределенной обработки в кластере. + +### Заключение +- **Kafka Streams** — это идеальный выбор, если ваша инфраструктура уже основана на Kafka, и вам нужна быстрая и легковесная обработка потоков данных. +- **Apache Flink** — это мощный инструмент для сложных аналитических задач, потоковой обработки данных в режиме реального + времени с поддержкой сложных схем обработки, который предоставляет больше возможностей для работы с разнообразными источниками данных. + +[к оглавлению](#apache-kafka) + +## В чем разница между Kafka Consumer и Kafka Stream? + +**Kafka Consumer** - это клиент, который читает данные из топика и производит некоторую обработку. Обычно используется для +простых сценариев получения данных. **Kafka Stream**, с другой стороны, более подвинутый клиент, который может потреблять, +обрабатывать и класть данные обратно в Kafka. Он предоставляет DSL для сложных операций потоковой обработки, таких как +фильтрация, преобразование, агрегирование и объединение потоков. + +[к оглавлению](#apache-kafka) + +## В чем разница между Kafka и Flume? + +**Apache Kafka** и **Apache Flume** — это два популярных инструмента для обработки и передачи данных, однако они имеют +разные цели и архитектуры. Вот основные различия между ними: + +### 1. **Назначение и использование** +- **Kafka**: Это распределенная платформа для потоковой передачи данных, которая обеспечивает высокую пропускную способность +и низкую задержку для обработки больших объемов данных. Kafka используется для создания стриминговых приложений и обработки +данных в реальном времени. Она может быть использована для передачи логов, событий, метрик и других данных, требующих +высокой доступности и масштабируемости. +- **Flume**: Это распределенная система для сбора, агрегации и передачи логов и событий. Flume обычно используется для +доставки логов с серверов в HDFS, HBase или другие системы хранения данных. Его основное предназначение — это сбор данных +из различных источников (например, лог-файлов) и передача их в системы хранения или аналитики. + +### 2. **Архитектура** +- **Kafka**: В Kafka данные отправляются в топики и партиции, которые могут быть независимо прочитаны несколькими потребителями. +Kafka ориентирована на высокую пропускную способность и масштабируемость. Это решает задачу обработки потоковых данных и +событий в реальном времени. +- **Flume**: Flume состоит из **источников (sources)**, **каналов (channels)** и **приемников (sinks)**. Источник получает +данные, канал их буферизует, а приемник отправляет их в конечную систему. Flume использует систему "event-based" и часто +применяется для сбора логов. + +### 3. **Хранение данных** +- **Kafka**: Kafka сохраняет сообщения на диске в течение длительного времени (по умолчанию — до 7 дней) в топиках. +Потребители могут читать данные в любой момент времени, и Kafka поддерживает концепцию **сохранения и ретрансляции данных**. +- **Flume**: Flume не имеет встроенного механизма долговременного хранения. Он просто передает данные в назначенные места +хранения (например, HDFS). Данные в Flume не сохраняются долго, и если система хранения не доступна, они теряются. + +### 4. **Производительность** +- **Kafka**: Kafka предназначен для работы с высокими объемами данных. Он поддерживает масштабируемость как по производителям, +так и по потребителям, и может обрабатывать миллионы сообщений в секунду с минимальной задержкой. +- **Flume**: Flume может быть менее масштабируемым по сравнению с Kafka и больше ориентирован на сбор логов и событий +с различных источников. Хотя Flume тоже может обрабатывать большие объемы данных, он не предназначен для работы с +такими большими потоками, как Kafka. + +### 5. **Использование и кейсы** +- **Kafka**: Используется для стриминга данных, аналитики в реальном времени, интеграции различных систем, работы с +большими данными и построения событийных приложений. +- **Flume**: Используется для сбора, агрегации и передачи логов и событий в системы хранения, такие как HDFS, HBase, +или внешние системы. Это идеальный выбор для организации потоков логирования и мониторинга. + +### 6. **Поддержка и интеграция** +- **Kafka**: Kafka поддерживает широкий спектр интеграций и может быть использован с различными системами для построения +распределенных приложений и аналитических решений. +- **Flume**: Flume ориентирован на интеграцию с Hadoop-экосистемой, и основное его использование — это интеграция с HDFS, +HBase и другими хранилищами данных в этой экосистеме. + +### 7. **Потребительская модель** +- **Kafka**: Kafka поддерживает много потребителей, которые могут читать из одного и того же топика независимо, а также +возможность **повторного прочтения данных**. +- **Flume**: Flume имеет фиксированную схему доставки данных и не поддерживает такую гибкость, как Kafka в части потребителей и обработки. + +### 8. **Гарантии доставки** +- **Kafka**: Kafka поддерживает **гарантии доставки** с различными уровнями подтверждения (acknowledgment), а также может +обеспечивать **доставку сообщений точно один раз** (exactly-once semantics). +- **Flume**: Flume обеспечивает базовые гарантии доставки, но они менее строгие, чем у Kafka, и больше ориентированы на +устойчивость к сбоям, а не на гарантированную доставку. + +[к оглавлению](#apache-kafka) + +## В чем разница между Kafka и RabbitMQ? + +**RabbitMQ** и **Apache Kafka** — это две популярные системы обмена сообщениями, каждая из которых имеет свои особенности +и используется для разных типов приложений. Вот основные различия между ними: + +### 1. **Архитектура** +- **RabbitMQ** использует **очереди сообщений**. Сообщения отправляются в очередь, и один потребитель извлекает сообщение +из очереди для обработки. +- **Apache Kafka** использует **топики и партиции**. Сообщения отправляются в топики, которые могут быть разделены на +партиции, и несколько потребителей могут читать эти сообщения в любом порядке. Kafka ориентирован на большие потоки данных и масштабируемость. + +### 2. **Модель доставки сообщений** +- **RabbitMQ**: Сообщения передаются в очереди, и каждый потребитель получает одно сообщение. Сообщения могут быть +подтверждены (acknowledged) или отклонены (rejected). RabbitMQ гарантирует, что сообщение будет доставлено хотя бы одному потребителю. +- **Kafka**: Сообщения сохраняются в топиках на длительный срок, и потребители могут читать их в любом порядке. Kafka +гарантирует доставку сообщений всем потребителям, если они подписаны на топик, и может позволить многократное чтение старых сообщений. + +### 3. **Гарантии доставки** +- **RabbitMQ**: Предоставляет подтверждения доставки и может повторно отправить сообщение, если потребитель не подтвердил +его получение. Можно настроить разные уровни надежности (например, за счет использования подтверждений или транзакций). +- **Kafka**: Сообщения сохраняются на диске, что позволяет потребителям считывать их в любое время. Kafka гарантирует +доставку сообщений при определенной конфигурации репликации и сохранения. + +### 4. **Производительность и масштабируемость** +- **RabbitMQ**: Лучше подходит для небольших и средних систем, где требуется высокая надежность и гарантированная доставка. +Он поддерживает **горизонтальное масштабирование**, но требует дополнительных усилий для настройки и управления. +- **Kafka**: Отличается высокой **производительностью** и возможностью обработки больших объемов данных. Kafka легко +масштабируется за счет **партиционирования** и репликации данных. + +### 5. **Потребительская модель** +- **RabbitMQ**: Один потребитель получает одно сообщение. Если потребитель не успевает обработать сообщение, оно может быть повторно отправлено. +- **Kafka**: Потребители могут читать сообщения независимо друг от друга. Kafka сохраняет все сообщения в топиках, +и потребители могут читать их в любое время. Kafka также поддерживает концепцию **групп потребителей**, где каждый +потребитель группы обрабатывает разные партиции. + +### 6. **Использование и кейсы** +- **RabbitMQ**: Идеален для обработки запросов и ответов, распределенных приложений, микросервисов с гарантией доставки, +бизнес-процессов с очередями задач. +- **Kafka**: Используется для обработки потоков данных, интеграции с большими данными, записи журналов, мониторинга, +обработки событий в реальном времени и сохранения больших объемов данных для последующего анализа. + +### 7. **Производители и потребители** +- **RabbitMQ**: Один производитель отправляет сообщения в очередь, и несколько потребителей могут обрабатывать эти сообщения. +- **Kafka**: Множество производителей могут отправлять сообщения в топики, и несколько потребителей могут читать их +одновременно, поддерживая масштабируемость. + +### 8. **Сообщения и хранение** +- **RabbitMQ**: Сообщения удаляются из очереди после их обработки потребителем. Хранение сообщений обычно краткосрочное. +- **Kafka**: Сообщения сохраняются на диске в топиках до тех пор, пока не истечет срок хранения (по конфигурации). +Это позволяет повторно читать данные. + +[к оглавлению](#apache-kafka) diff --git a/oop.md b/oop.md index baa3e7b..8464512 100644 --- a/oop.md +++ b/oop.md @@ -50,11 +50,11 @@ __Инкапсуляция__ – это свойство системы, поз Пример: ```java -public class SomePhone { +public class AbstractPhone { private int year; private String company; - public SomePhone(int year, String company) { + public AbstractPhone (int year, String company) { this.year = year; this.company = company; } diff --git a/reactive.md b/reactive.md new file mode 100644 index 0000000..17f0ef8 --- /dev/null +++ b/reactive.md @@ -0,0 +1,584 @@ +[Вопросы для собеседования](README.md) + +# Реактивное программирование +* [Что такое реактивное программирование и чем оно отличается от процедурного программирования?](#что-такое-реактивное-программирование-и-чем-оно-отличается-от-процедурного-программирования) +* [Объясните концепцию потоков данных в реактивном программировании](#объясните-концепцию-потоков-данных-в-реактивном-программировании) +* [Что такое паттерн Observer и как он лежит в основе реактивного программирования?](#что-такое-паттерн-observer-и-как-он-лежит-в-основе-реактивного-программирования) +* [Опишите роль Observable и Observer в реактивном программировании](#опишите-роль-observable-и-observer-в-реактивном-программировании) +* [Что такое backpressure в контексте реактивного программирования?](#что-такое-backpressure-в-контексте-реактивного-программирования) +* [Объясните разницу между Hot и Cold Observable](#объясните-разницу-между-hot-и-cold-observable) +* [Какова роль Подписки в реактивном программировании?](#какова-роль-подписки-в-реактивном-программировании) +* [Как отписаться от потока для предотвращения утечки памяти?](#как-отписаться-от-потока-для-предотвращения-утечки-памяти) +* [Какие есть операторы в Project Reactor и для чего они используются?](#какие-есть-операторы-в-project-reactor-и-для-чего-они-используются) + +## Что такое реактивное программирование и чем оно отличается от процедурного программирования? + +### Основные принципы + +**Процедурное программирование** - это подход, при котором программа состоит из последовательности инструкций, выполняемых одна за другой. +В процедурном программировании акцент делается на определении функций и процедур, которые выполняют определенные задачи. +Этот подход хорошо подходит для решения задач, требующих четкого алгоритма действий. + +**Реактивное программирование**, с другой стороны, фокусируется на обработке потоков данных и событий. +В реактивном программировании программа реагирует на изменения в данных или событиях, происходящих в реальном времени. +Реактивное программирование позволяет создавать более гибкие и эффективные системы, способные адаптироваться к изменениям +в данных и событиях без необходимости явного управления асинхронными задачами. + +Процедурное программирование представляет данные в виде единственного значения, хранящегося в переменной. +Реактивное программирование представляет собой непрерывный поток данных, на который могут подписаться несколько наблюдателей. + +[к оглавлению](#реактивное-программирование) + +## Объясните концепцию потоков данных в реактивном программировании + +Концепция потоков данных в реактивном программировании заключается в представлении данных как непрерывно изменяющегося потока, +который автоматически распространяется через систему. Это отличается от традиционного императивного программирования, где данные +обычно обрабатываются как отдельные элементы и изменения должны явно инициироваться кодом. + +В реактивном программировании данные представлены как поток событий, каждое из которых может содержать новое значение или изменение состояния. +Система автоматически реагирует на эти изменения, обновляя свое состояние соответствующим образом. Это позволяет создавать +приложения, которые эффективно обрабатывают большие объемы данных и быстро реагируют на изменения, происходящие в реальном времени. + +Примером может служить система управления данными, которая автоматически обновляет пользовательский интерфейс при изменении данных в базе. +В таком случае, пользовательский интерфейс будет автоматически обновляться каждый раз, когда происходит изменение данных, +без необходимости явного запроса на обновление со стороны пользователя. + +Реактивное программирование основано на шаблоне Наблюдатель (Observer) + +### Ключевые компоненты +* **Наблюдаемый (Observable)** представляет источник данных. При изменении его состояния (или при создании новых данных) изменения передаются наблюдателям +* **Наблюдатель (Observer)** подписывается на Observable и получает уведомления о любых изменениях состояния или новых данных +* **Подписка (Subscription)** устанавливает взаимосвязь между наблюдаемым и наблюдателем. Подписка может быть "один к одному" или "один ко многим" +* **Операторы (Operators)** часто называемые функциями преобразования, они позволяют изменять или адаптировать данные из наблюдаемого объекта до того, как они попадут к наблюдателю +* **Планировщики (Schedulers)** помогают управлять временем и порядком выполнения операций в таких сценариях, как фоновая работа и обновления пользовательского интерфейса +* **Subjects** совмещают роли объекта наблюдения и наблюдателя. Это могут быть как источники данных, так и потребители данных + +### Процесс передачи данных +* **Эмиссия (Emission)** - данные создаются в наблюдаемом объекте и отправляются его наблюдателям +* **Фильтрация (Filtering)** - операторы могут просматривать входящие данные, пересылая только те, которые соответствуют определенным критериям +* **Трансформация (Transformation)** - данные изменяются — например, путем их мапинга перед передачей наблюдателю +* **Нотификация (Notification)** - информирование наблюдателей при поступлении новых данных + +### Основные характеристики потоков +* **Непрерывность (Continuous)** - поток данных сохраняется, что позволяет осуществлять взаимодействие в режиме реального времени +* **Асинхронность (Asynchronous)** - не гарантируется, что события будут происходить в определенном порядке, что позволяет выполнять неблокирующие операции +* **Однонаправленность (One-directional)** - данные передаются от Observable к его подписчикам, обеспечивая однонаправленный поток + +### Типы потоков по количеству подписчиков +* **Unicast Streams** - у каждого наблюдателя есть эксклюзивное подключение к наблюдаемому источнику +* **Broadcast Streams** - позволяет нескольким наблюдателям подписаться на один объект наблюдения. Каждый наблюдатель +получает полный набор данных, что может быть проблематично, если речь идет о конфиденциальности данных + +### Типы потоков по поведению +* **Hot Observable** - эти последовательности передают данные независимо от присутствия наблюдателя. +Если подписывается новый наблюдатель, он начинает получать данные с точки подписки +* **Cold Observable** - здесь передача данных начинается только после подписки. Любой новый наблюдатель получит данные с самого начала + +### Backpressure +Данный механизм регулируют скорость, с которой данные публикуются в поток. Это необходимо, чтобы справиться с потенциальным +переполнением или узкими местами из-за различий в скоростях обработки данных. + +Например, в RxJava интерфейсы `Observable` и `Flowable` отличаются тем, что последний включает поддержку backpressure. +С `Flowable` можно использовать настраиваемую стратегию backpressure, чтобы контролировать скорость публикации Observable относительно скорости потребления Subscriber + +### Практическое применение +Будь то обработка асинхронных вызовов API или управление вводом данных пользователем, потоки данных обеспечивают надежную +и гибкую основу для многих повседневных задач программирования. + +Можно использовать ряд операторов, таких как `map`, `filter`, `debounce`, и `throttle`, для преобразования данных +и манипулирования ими, исходя из конкретных требований. + +Широкое внедрение реактивных библиотек, таких как RxJava для Android или RxJS для Web, подчеркивает +полезность потоков данных в современной разработке программного обеспечения. + +[к оглавлению](#реактивное-программирование) + +## Что такое паттерн Observer и как он лежит в основе реактивного программирования? + +Паттерн Observer, также известный как Publish/Subscribe или Event Listener, является поведенческим шаблоном проектирования, +который позволяет объектам следить за изменениями в других объектах и реагировать на них. Это один из самых популярных и +полезных шаблонов, который упрощает код и повышает его гибкость. В основе реактивного программирования лежит идея о том, +что система должна реагировать на изменения внешних источников данных или событий. Паттерн Observer идеально подходит для +реализации этой концепции, поскольку он позволяет объектам автоматически получать уведомления об изменениях в других объектах +и соответствующим образом обновлять свое состояние. + +Пример использования паттерна Observer в веб-разработке может включать систему уведомлений для пользователей. Когда новая +статья добавляется на сайт, все подписчики должны быть уведомлены об этом. В этом случае, класс Subject будет отвечать +за управление подписчиками и их уведомление об изменениях, а класс Observer будет представлять собой простого наблюдателя, +который выводит сообщение о новой статье при получении уведомления, таким образом обеспечивается **слабую связность (loose coupling)**. + +Применение паттерна Observer в веб-разработке может быть полезно для обработки событий пользовательского интерфейса, +реализации системы уведомлений для пользователей, отслеживания изменений состояния приложения и реагирования на них. +Однако важно помнить, что чрезмерное использование паттерна может привести к усложнению кода и затруднить отладку. + +### Ключевые компоненты +* **Subject** - источник данных или событий. Наблюдатели подписываются на Subject для получения уведомлений об изменениях +* **Observer** - наблюдатель получает уведомления, когда состояние Subject изменяется + +В реактивной конфигурации Subject отвечает за публикацию изменений, а Observers подписываются на эти изменения. +Это исключает явное обращение к источникам данных и подчеркивает **модель потока данных (datastream model)**. + +### Пример кода паттерна Observer + +```java +import java.util.ArrayList; +import java.util.List; + +interface Observer { + void update(); +} + +class Subject { + private List observers = new ArrayList<>(); + private int state; + + public int getState() { + return state; + } + + public void setState(int state) { + this.state = state; + notifyAllObservers(); + } + + public void attach(Observer observer) { + observers.add(observer); + } + + public void notifyAllObservers() { + for (Observer observer : observers) { + observer.update(); + } + } +} + +class ConcreteObserver implements Observer { + private String name; + private Subject subject; + + public ConcreteObserver(String name, Subject subject) { + this.name = name; + this.subject = subject; + } + + @Override + public void update() { + System.out.println("Observer " * name * " updated. New state: " * subject.getState()); + } +} + +public class Main { + public static void main(String[] args) { + Subject subject = new Subject(); + ConcreteObserver observer1 = new ConcreteObserver("One", subject); + ConcreteObserver observer2 = new ConcreteObserver("Two", subject); + + subject.attach(observer1); + subject.attach(observer2); + + subject.setState(5); + } +} +``` +В данном примере Subject ведет список подписавшихся Observers и уведомляет их при изменении его состояния. + +[к оглавлению](#реактивное-программирование) + +## Опишите роль Observable и Observer в реактивном программировании + +**Наблюдатели** являются потребителями данных, в то время как **наблюдаемые** являются источником или производителем данных. + +### Ключевые концепции +* **Наблюдаемый (Observable)** - передает данные или сигналы, которые могут быть любых типов, включая пользовательские события +* **Наблюдатель (Observer)** - получает уведомление, когда Observable отправляет данные +* **Подписка (Subscription)** - связующее звено между объектом наблюдения и наблюдателем +* **Операторы (Operators)** - позволяют преобразовывать, фильтровать, комбинировать или обрабатывать поток данных, +публикуемый наблюдаемым объектом, до того, как он достигнет наблюдателя + +### Пример кода: Observable и Observer + +### Java 9* + +**Определить Subscriber** + +```java +import java.util.concurrent.Flow; + +public class SimpleSubscriber implements Flow.Subscriber { + private Flow.Subscription subscription; + + @Override + public void onSubscribe(Flow.Subscription subscription) { + this.subscription = subscription; + subscription.request(1); // Запрашиваем первый элемент + } + + @Override + public void onNext(String item) { + System.out.println("Received: " * item); + subscription.request(1); // Запрашиваем следующий элемент + } + + @Override + public void onError(Throwable throwable) { + System.err.println("Error: " * throwable.getMessage()); + } + + @Override + public void onComplete() { + System.out.println("All items received"); + } +} +``` + +**Определить Publisher** + +```java +import java.util.concurrent.Flow; +import java.util.concurrent.SubmissionPublisher; +import java.util.concurrent.TimeUnit; + +public class SimplePublisher { + + public static void main(String[] args) throws InterruptedException { + // Создаем издателя + SubmissionPublisher publisher = new SubmissionPublisher<>(); + + // Создаем подписчика + SimpleSubscriber subscriber = new SimpleSubscriber(); + + // Регистрируем подписчика в издателе + publisher.subscribe(subscriber); + + // Публикуем элементы + System.out.println("Publishing data items..."); + String[] items = {"item1", "item2", "item3"}; + for (String item : items) { + publisher.submit(item); + TimeUnit.SECONDS.sleep(1); + } + + // Закрываем издателя + publisher.close(); + + // Ждем некоторое время, чтобы подписчик обработал все элементы + TimeUnit.SECONDS.sleep(3); + } +} +``` + +### Project Reactor + +```java +import reactor.core.publisher.Flux; + +public class ReactorExample { + + public static void main(String[] args) { + // Создаем Flux для публикации данных + Flux flux = Flux.just("Hello", "World", "From", "Reactor"); + + // Подписываемся на Flux и выводим каждый элемент + flux.subscribe( + item -> System.out.println("Received: " * item), + error -> System.err.println("Error: " * error), + () -> System.out.println("All items received") + ); + } +} +``` + +[к оглавлению](#реактивное-программирование) + +## Что такое backpressure в контексте реактивного программирования? + +**Backpressure** - это концепция в реактивном программировании, которая имеет дело с ситуацией, когда производитель (или издатель) +генерирует данные быстрее, чем потребитель (или подписчик) может их обработать. Управление backpressure позволяет системе +оставаться отзывчивой и не перегружаться. + +### Ключевые концепции backpressure + +* **Producer (Publisher)** - компонент, который производит данные +* **Consumer (Subscriber)** - компонент, который потребляет данные +* **Flow Control** - механизм, который гарантирует, что производитель не перегрузит потребителя слишком большим объемом данных + +### Почему важно backpressure? + +В реактивной системе, если потребитель не выдерживает скорость, с которой производитель производит данные, это может привести к следующим проблемам: +* **Переполнение памяти (Memory Overflow)** - необработанные элементы могут накапливаться в памяти, что приводит к ошибкам нехватки памяти +* **Увеличение задержки (Latency Increase)** - система может перестать отвечать по мере роста очереди необработанных элементов +* **Исчерпание ресурсов (Resource Exhaustion)** - системные ресурсы (процессор, память) могут быть исчерпаны, что приводит к снижению производительности или сбоям + +### Стратегии управления backpressure в Project Reactor + +* **Буферизация (Buffering)** - входящие элементы хранятся в буфере до тех пор, пока потребитель не сможет их обработать + * **onBackpressureBuffer(100)** + +```java +import reactor.core.publisher.Flux; +import reactor.core.scheduler.Schedulers; + +public class BackpressureExample { + + public static void main(String[] args) { + Flux flux = Flux.range(1, 1000) + .onBackpressureBuffer(100); // Устанавливаем стратегию backpressure + + flux.publishOn(Schedulers.boundedElastic()) + .subscribe( + item -> { + try { + Thread.sleep(10); // Имитируем медленного потребителя + System.out.println("Received: " * item); + } catch (InterruptedException e) { + e.printStackTrace(); + } + }, + error -> System.err.println("Error: " * error), + () -> System.out.println("All items received") + ); + } +} +``` + +* **Отбрасывание (Dropping)** - отбрасывает элементы, если потребитель не может их обработать + * **onBackpressureDrop()** +* **Новейшие элементы (Latest)** - сохраняет только последние элементы, удаляя предыдущие + * **onBackpressureLatest()** +* **Ошибка (Error)** - сигнализирует об ошибке, когда backpressure не может быть обработано + * **onBackpressureError()** +* **Контроль скорости запроса (Control Request Rate)**: явно контролирует скорость, с которой потребитель запрашивает элементы у производителя + * **request(n)** - метод в реализации Subscriber для управления потоком + +```java +import reactor.core.publisher.BaseSubscriber; +import reactor.core.publisher.Flux; + +public class ControlledRequestExample { + + public static void main(String[] args) { + Flux flux = Flux.range(1, 1000); + + flux.subscribe(new BaseSubscriber() { + @Override + private void hookOnSubscribe(Subscription subscription) { + request(1); // Запрашиваем один элемент при подписке + } + + @Override + private void hookOnNext(Integer value) { + try { + Thread.sleep(10); // Эмитируем медленный процесс + System.out.println("Received: " * value); + request(1); // Запрашиваем следующий элемент после обработки текущего + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + @Override + private void hookOnComplete() { + System.out.println("All items received"); + } + + @Override + private void hookOnError(Throwable throwable) { + System.err.println("Error: " * throwable.getMessage()); + } + }); + } +} +``` + +[к оглавлению](#реактивное-программирование) + +## Объясните разницу между Hot и Cold Observable + +В реактивном программировании термины hot и cold observable (или издатели в контексте Project Reactor) описывают, как они +создают потоки данных и обрабатывают их. Основное различие между ними заключается в том, как они обрабатывают подписки +и когда начинают выдавать элементы. + +### Cold Observables + +Начинают выдавать элементы только тогда, когда подписчик подписывается на них. Каждый подписчик получает всю последовательность +элементов с самого начала. Это означает, что каждый раз, когда подписывается новый подписчик, Observable воспроизводит всю последовательность элементов. + +#### Характеристики + +* **Ленивое исполнение (Lazy Evaluation)** - данные не производятся, пока не будет хотя бы одного подписчика +* **Множественные подписки (Multiple Subscriptions)** - каждый подписчик получает полную последовательность элементов независимо от других подписчиков +* **Воспроизводимость (Reproducibility)** - каждый подписчик видит одну и ту же последовательность элементов с самого начала, что делает поток воспроизводимым + +#### Пример + +```java +import reactor.core.publisher.Flux; + +public class ColdObservableExample { + + public static void main(String[] args) { + Flux coldFlux = Flux.just("A", "B", "C", "D") + .doOnSubscribe(subscription -> System.out.println("Subscribed to Cold Observable")); + + // First subscription + coldFlux.subscribe(item -> System.out.println("Subscriber 1: " * item)); + + // Second subscription + coldFlux.subscribe(item -> System.out.println("Subscriber 2: " * item)); + } +} +``` + +### Hot Observables + +Начинают выдавать элементы независимо от наличия подписчиков. Подписчики получают только те элементы, которые были отправлены +после их подписки. Это означает, что если подписчик подпишется с опозданием, он пропустит элементы, которые были отправлены до подписки. + +#### Характеристики + +* **Жадное исполнение (Eager Evaluation)** - публикация элементов происходит сразу после создания, даже если подписчиков нет +* **Общий поток данных (Shared Data Stream)** - все подписчики используют один и тот же источник данных и получают элементы, отправленные после подписки +* **Невоспроизводимость (Non-Reproducibility)** - подписчики могут видеть разные части последовательности в зависимости от того, когда они подписались + +#### Пример + +```java +import reactor.core.publisher.Flux; +import reactor.core.publisher.ConnectableFlux; + +import java.time.Duration; + +public class HotObservableExample { + + public static void main(String[] args) throws InterruptedException { + ConnectableFlux hotFlux = Flux.just("A", "B", "C", "D") + .delayElements(Duration.ofMillis(500)) + .publish(); + + // Запускаем публикацию элементов + hotFlux.connect(); + + // Первая подписка (начинает получать данные немедленно) + hotFlux.subscribe(item -> System.out.println("Subscriber 1: " * item)); + Thread.sleep(750); // Ждем отправку элементов + + // Вторая подписка (пропускает первый элемент) + hotFlux.subscribe(item -> System.out.println("Subscriber 2: " * item)); + + // Останавливаем работу основного потока, чтобы увидеть выходные данные + Thread.sleep(2000); + } +} + +``` + +### Ключевые отличия + +| Критерий | Hot Observables | Cold Observables | +|----------------|--------------------------------------------------------|-------------------------------------------------------------------------------------------| +| Обмен данными | Один поток для всех подписчиков | У каждого подписчика свой независимый поток | +| Время подписки | Получает данные в зависимости от времени подписки | Получает все данные, даже если подпишется позже | +| Жизненный цикл | Работает независимо от подписок | Запускает генерацию данных только, когда есть подписка | +| Асинхронность | Может генерировать и передавать данные без подписчиков | Передача данных начинается после подписки, что часто приводит к синхронной передаче | + +### Практическое применение + +Cold Observables полезны для: +* Данные, которые необходимо воспроизвести для каждого подписчика, такие как чтение из файла, выполнение HTTP-запроса или +генерация новой случайной последовательности. +* Сценарии, в которых последовательность элементов должна быть согласованной и воспроизводимой для каждого подписчика. + +Hot Observables полезны для: +* Данные, которые создаются непрерывно и которыми необходимо делиться между несколькими подписчиками, такие как события +мыши, обновления цен на акции или данные датчиков. +* Сценарии, в которых подписчики должны получать только текущие и будущие события, а не прошлые. + +[к оглавлению](#реактивное-программирование) + +## Какова роль Подписки в реактивном программировании? + +Подписка абстрагирует от того, как данные получаются или генерируются + +### Основные функции +`Subscription` обычно предлагает два основных метода: +* **Request** - информирует источник данных о количестве элементов, которые потребитель готов получить +* **Cancel** - останавливает поток данных, освобождая любые ресурсы, такие как обработчики файлов или сетевые подключения + +### Общая концепция +`Subscription` - интерфейс, действующий как соглашение между источником данных и потребителем данных. Он обеспечивает +передачу данных с учетом управления потоком, backpressure и ресурсами. + +### Управление backpressure +Реализации интерфейса `Publisher` в реактивных потоках оценивают готовность подписчика обрабатывать входящие данные +с учетом текущего состояния потока данных. Этот механизм, известный как backpressure, направлен на предотвращение перегрузки +путем указания источнику данных соответствующим образом адаптировать скорость передачи данных. + +`request` - метод интерфейса `Subscription` является основным каналом, по которому подписчик сообщает источнику данных +о своей текущей возможности принимать данные, тем самым регулируя backpressure. + +### Управление ресурсами +Для определенных источников данных, таких как файлы, потоки ввода-вывода или базы данных, могут потребоваться определенные +ресурсы. Интерфейс `Subscription` предоставляет средства для освобождения этих ресурсов, когда передача данных больше не требуется. + +После вызова метода `cancel` источник данных может предпринять соответствующие действия, такие как закрытие файла или +прекращение сетевого взаимодействия. + +[к оглавлению](#реактивное-программирование) + +## Как отписаться от потока для предотвращения утечки памяти? + +* Использовать `Disposable`: получить Disposable из подписки и вызвать метод `dispose`, чтобы отменить ее +* Использовать `BaseSubscriber`: наследоваться от класса BaseSubscriber и вызвать потом метод `dispose` +* Использовать операторы при создании потока: + * `take`: автоматически отменит подписку после получения некоторого количества элементов + * `timeout`: автоматически отменяет подписку, если в течение указанного срока не будет отправлено никаких элементов. + +Best Practice: стоит настраивать отмену подписки, связывая ее с жизненным циклом какого-либо компонента + +[к оглавлению](#реактивное-программирование) + +## Какие есть операторы в Project Reactor и для чего они используются? + +Операторы в реактивном программировании служат для создания, преобразования, фильтрации или объединения различных потоков данных. + +### Операторы создания +* `just`: создает Flux/Mono, который публикует указанные элементы +* `fromArray`: создает поток, который генерирует элементы из массива +* `fromIterable`: создает Flux, который генерирует элементы из итерируемого объекта +* `fromStream`: создает Flux из stream Java +* `fromCallable`: создает Mono из Callable +* `fromRunnable`: создает Mono из Runnable +* `fromSupplier`: создает Mono от Supplier +* `range`: создает FLux, который выдает диапазон целых чисел +* `interval`: создает Flux, который выдает long значения через регулярные промежутки времени +* `empty`: создает пустой Flux/Mono, который завершается немедленно +* `error`: создает Flux/Mono, который немедленно сигнализирует об ошибке +* `never`: создает Flux/Mono, который не создает какой-либо элемент и не завершается +* `defer`: создает новый Flux/Mono для каждой подписки + +### Операторы трансформации +* `map`: преобразует объекты, публикуемые Flux/Mono. +* `flatMap`: преобразует каждый элемент в поток, а затем соединяет их в один поток +* `concatMap`: как flatMap, но поддерживает порядок публикуемых элементов +* `switchMap`: когда он получает данные от нового потока, то сразу отписывается от предыдущего и подписывается на новый +* `scan`: Накапливайте состояние каждого элемента. +* `buffer`: собирает элементы в как список внутри Flux +* `window`: публикует коллекцию элементов как Flux (окно фиксированного размера) внутри Flux +* `groupBy`: группирует элементы по ключу + +### Операторы фильтрации +* `filter`: пропускает только те элементы, которые удовлетворяют условию +* `distinct`: пропускает только уникальные элементы +* `take`: ограничивает количество элементов, публикуемых потоком +* `takeWhile`: публикует элементы, пока условие истинно +* `takeUntil`: публикует элементы, пока условие не станет истинно +* `skip`: пропускает определенное количество элементов в начале потока +* `skipWhile`: пропускает элементы, пока условие истинно +* `skipUntil`: пропускает элементы, пока условие не станет истинно + +### Операторы объединения +* `merge`: объединяет потоки в один параллельно, сохраняя порядок элементов (активная подписка) +* `concat`: объединяет потоки последовательно, ожидая завершения первого потока, затем второго и так далее (отложенная подписка) +* `zip`: объединяет элементы из нескольких потоков в пары +* `combineLatest`: объединяет указанным способом последние значения из нескольких потоков +* `startWith`: добавляет в начало потока к исходным элементам новые элемент(ы)