From 9a0df73a9bdab1fbe2a6785b0d2f1dae0a6bfa8d Mon Sep 17 00:00:00 2001 From: Danil Pudovkin <99204984+dapudovkin@users.noreply.github.com> Date: Wed, 20 Nov 2024 10:04:28 +0500 Subject: [PATCH 1/7] =?UTF-8?q?=D0=92=D0=BE=D0=BF=D1=80=D0=BE=D1=81=D1=8B?= =?UTF-8?q?=20=D0=BF=D0=BE=20=D1=80=D0=B5=D0=B0=D0=BA=D1=82=D0=B8=D0=B2?= =?UTF-8?q?=D0=BD=D0=BE=D0=BC=D1=83=20=D0=BF=D1=80=D0=BE=D0=B3=D1=80=D0=B0?= =?UTF-8?q?=D0=BC=D0=BC=D0=B8=D1=80=D0=BE=D0=B2=D0=B0=D0=BD=D0=B8=D1=8E=20?= =?UTF-8?q?(#162)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * reactive programming & reactive streams concept * backpressure, observables, subscription, operators * fix style * update README.md * Add kafka questions * Add kafka broker, producer architecture + add topic settings * Add kafka producer example config * Add kafka consumer * Update table of contents * fix style * Add kafka interview questions * Update README.md * Update Kafka questions style * Update Kafka --------- Co-authored-by: Danil Pudovkin --- README.md | 79 +++ kafka.md | 1420 +++++++++++++++++++++++++++++++++++++++++++++++++++ reactive.md | 584 +++++++++++++++++++++ 3 files changed, 2083 insertions(+) create mode 100644 kafka.md create mode 100644 reactive.md diff --git a/README.md b/README.md index 13dca74..dc9ccc8 100644 --- a/README.md +++ b/README.md @@ -11,6 +11,7 @@ + [Потоки ввода-вывода в Java](#Потоки-вводавывода-в-java) ![icon][done] + [Сериализация](#Сериализация) ![icon][done] + [Многопоточность](#Многопоточность) ![icon][done] ++ [Реактивное программирование](#реактивное-программирование) ![icon][done] + [Servlets, JSP, JSTL](#servlets-jsp-jstl) ![icon][done] + [Базы данных](#Базы-данных) ![icon][done] + [SQL](#sql) ![icon][done] @@ -24,6 +25,7 @@ + [Основы CSS](#Основы-css) ![icon][done] + [Основы Web](#Основы-web) ![icon][done] + [Дополнительные материалы](#Дополнительные-материалы) ![icon][done] ++ [Apache Kafka](#apache-kafka) ![icon][done] [done]:done.png @@ -445,6 +447,20 @@ [к оглавлению](#Вопросы-для-собеседования-на-java-developer) +## Реактивное программирование + +* [Что такое реактивное программирование и чем оно отличается от процедурного программирования?](reactive.md#что-такое-реактивное-программирование-и-чем-оно-отличается-от-процедурного-программирования) +* [Объясните концепцию потоков данных в реактивном программировании](reactive.md#объясните-концепцию-потоков-данных-в-реактивном-программировании) +* [Что такое паттерн Observer и как он лежит в основе реактивного программирования?](reactive.md#что-такое-паттерн-observer-и-как-он-лежит-в-основе-реактивного-программирования) +* [Опишите роль Observable и Observer в реактивном программировании](reactive.md#опишите-роль-observable-и-observer-в-реактивном-программировании) +* [Что такое backpressure в контексте реактивного программирования?](reactive.md#что-такое-backpressure-в-контексте-реактивного-программирования) +* [Объясните разницу между Hot и Cold Observable](reactive.md#объясните-разницу-между-hot-и-cold-observable) +* [Какова роль Подписки в реактивном программировании?](reactive.md#какова-роль-подписки-в-реактивном-программировании) +* [Как отписаться от потока для предотвращения утечки памяти?](reactive.md#как-отписаться-от-потока-для-предотвращения-утечки-памяти) +* [Какие есть операторы в Project Reactor и для чего они используются?](reactive.md#какие-есть-операторы-в-project-reactor-и-для-чего-они-используются) + +[к оглавлению](#Вопросы-для-собеседования-на-java-developer) + ## Servlets, JSP, JSTL + [Что такое _«сервлет»_?](servlets.md#Что-такое-сервлет) + [В чем заключаются преимущества технологии сервлетов над CGI (Common Gateway Interface)?](servlets.md#В-чем-заключаются-преимущества-технологии-сервлетов-над-cgi-common-gateway-interface) @@ -758,5 +774,68 @@ [к оглавлению](#Вопросы-для-собеседования-на-java-developer) +## Apache Kafka + +* [Что такое Apache Kafka?](kafka.md#что-такое-apache-kafka) +* [Основные компоненты Kafka](kafka.md#основные-компоненты-kafka) + +**Архитектура компонентов** + +* Topic + * [Архитектура топика](kafka.md#архитектура-топика) + * [Настройки топика Kafka](kafka.md#настройки-топика-kafka) +* Broker + * [Архитектура брокера](kafka.md#архитектура-брокера) + * [Настройки брокера Kafka](kafka.md#настройки-брокера-kafka) +* Producer + * [Архитектура продюсера](kafka.md#архитектура-продюсера) + * [Настройки продюсера](kafka.md#настройки-продюсера) + * [Пример конфигурации Kafka Producer](kafka.md#пример-конфигурации-kafka-producer) +* Consumer + * [Архитектура консюмера](kafka.md#архитектура-консюмера) + * [Настройки консюмера](kafka.md#настройки-консюмера) + * [Пример конфигурации Kafka Consumer](kafka.md#пример-конфигурации-kafka-consumer) + +**Kafka API** + +* [Основные API Kafka](kafka.md#основные-api-kafka) +* [Какова роль Producer API?](kafka.md#какова-роль-producer-api) +* [Какова роль Consumer API?](kafka.md#какова-роль-consumer-api) +* [Какова роль Connector API?](kafka.md#какова-роль-connector-api) +* [Какова роль Streams API?](kafka.md#какова-роль-streams-api) +* [Какова роль Transactions API?](kafka.md#какова-роль-transactions-api) +* [Какова роль Quota API?](kafka.md#какова-роль-quota-api) +* [Какова роль AdminClient API?](kafka.md#какова-роль-AdminClient-api) + +**Kafka Consumer** + +* [Для чего нужен координатор группы?](kafka.md#для-чего-нужен-координатор-группы) +* [Для чего нужен Consumer heartbeat thread?](kafka.md#для-чего-нужен-consumer-heartbeat-thread) +* [Как Kafka обрабатывает сообщения?](kafka.md#как-kafka-обрабатывает-сообщения) +* [Как Kafka обрабатывает задержку консюмера?](kafka.md#как-kafka-обрабатывает-задержку-консюмера) +* [Для чего нужны методы subscribe() и poll()?](kafka.md#для-чего-нужны-методы-subscribe-и-poll) +* [Для чего нужен метод position()?](kafka.md#для-чего-нужен-метод-position) +* [Для чего нужны методы commitSync() и commitAsync()?](kafka.md#для-чего-нужны-методы-commitsync-и-commitasync) + +**Другие вопросы** + +* [Для чего нужен идемпотентный продюсер?](kafka.md#для-чего-нужен-идемпотентный-продюсер) +* [Для чего нужен интерфейс Partitioner?](kafka.md#для-чего-нужен-интерфейс-partitioner) +* [Для чего нужен Broker log cleaner thread?](kafka.md#для-чего-нужен-broker-log-cleaner-thread) +* [Для чего нужен Kafka Mirror Maker?](kafka.md#для-чего-нужен-kafka-mirror-maker) +* [Для чего нужна Schema Registry?](kafka.md#для-чего-нужна-schema-registry) +* [Для чего нужен Streams DSL?](kafka.md#для-чего-нужен-streams-dsl) +* [Как Kafka обеспечивает версионирование сообщений?](kafka.md#как-kafka-обеспечивает-версионирование-сообщений) +* [Как потребители получают сообщения от брокера?](kafka.md#как-потребители-получают-сообщения-от-брокера) + +**Сравнение с другими компонентами и системами** + +* [В чем разница между Kafka Consumer и Kafka Stream?](kafka.md#в-чем-разница-между-kafka-consumer-и-kafka-stream) +* [В чем разница между Kafka Streams и Apache Flink?](kafka.md#в-чем-разница-между-kafka-streams-и-apache-flink) +* [В чем разница между Kafka и Flume?](kafka.md#в-чем-разница-между-kafka-и-flume) +* [В чем разница между Kafka и RabbitMQ?](kafka.md#в-чем-разница-между-kafka-и-rabbitmq) + +[к оглавлению](#Вопросы-для-собеседования-на-java-developer) + ## Источники + [Вопросы на собеседование Junior Java Developer](https://jsehelper.blogspot.ru) diff --git a/kafka.md b/kafka.md new file mode 100644 index 0000000..c579cbf --- /dev/null +++ b/kafka.md @@ -0,0 +1,1420 @@ +[Вопросы для собеседования](README.md) + +# Apache Kafka +* [Что такое Apache Kafka?](#что-такое-apache-kafka) +* [Основные компоненты Kafka](#основные-компоненты-kafka) + +**Архитектура компонентов** + +* Topic + * [Архитектура топика](#архитектура-топика) + * [Настройки топика Kafka](#настройки-топика-kafka) +* Broker + * [Архитектура брокера](#архитектура-брокера) + * [Настройки брокера Kafka](#настройки-брокера-kafka) +* Producer + * [Архитектура продюсера](#архитектура-продюсера) + * [Настройки продюсера](#настройки-продюсера) + * [Пример конфигурации Kafka Producer](#пример-конфигурации-kafka-producer) +* Consumer + * [Архитектура консюмера](#архитектура-консюмера) + * [Настройки консюмера](#настройки-консюмера) + * [Пример конфигурации Kafka Consumer](#пример-конфигурации-kafka-consumer) + +**Kafka API** + +* [Основные API Kafka](#основные-api-kafka) +* [Какова роль Producer API?](#какова-роль-producer-api) +* [Какова роль Consumer API?](#какова-роль-consumer-api) +* [Какова роль Connector API?](#какова-роль-connector-api) +* [Какова роль Streams API?](#какова-роль-streams-api) +* [Какова роль Transactions API?](#какова-роль-transactions-api) +* [Какова роль Quota API?](#какова-роль-quota-api) +* [Какова роль AdminClient API?](#какова-роль-AdminClient-api) + +**Kafka Consumer** + +* [Для чего нужен координатор группы?](#для-чего-нужен-координатор-группы) +* [Для чего нужен Consumer heartbeat thread?](#для-чего-нужен-consumer-heartbeat-thread) +* [Как Kafka обрабатывает сообщения?](#как-kafka-обрабатывает-сообщения) +* [Как Kafka обрабатывает задержку консюмера?](#как-kafka-обрабатывает-задержку-консюмера) +* [Для чего нужны методы subscribe() и poll()?](#для-чего-нужны-методы-subscribe-и-poll) +* [Для чего нужен метод position()?](#для-чего-нужен-метод-position) +* [Для чего нужны методы commitSync() и commitAsync()?](#для-чего-нужны-методы-commitsync-и-commitasync) + +**Другие вопросы** + +* [Для чего нужен идемпотентный продюсер?](#для-чего-нужен-идемпотентный-продюсер) +* [Для чего нужен интерфейс Partitioner?](#для-чего-нужен-интерфейс-partitioner) +* [Для чего нужен Broker log cleaner thread?](#для-чего-нужен-broker-log-cleaner-thread) +* [Для чего нужен Kafka Mirror Maker?](#для-чего-нужен-kafka-mirror-maker) +* [Для чего нужна Schema Registry?](#для-чего-нужна-schema-registry) +* [Для чего нужен Streams DSL?](#для-чего-нужен-streams-dsl) +* [Как Kafka обеспечивает версионирование сообщений?](#как-kafka-обеспечивает-версионирование-сообщений) +* [Как потребители получают сообщения от брокера?](#как-потребители-получают-сообщения-от-брокера) + +**Сравнение с другими компонентами и системами** + +* [В чем разница между Kafka Consumer и Kafka Stream?](#в-чем-разница-между-kafka-consumer-и-kafka-stream) +* [В чем разница между Kafka Streams и Apache Flink?](#в-чем-разница-между-kafka-streams-и-apache-flink) +* [В чем разница между Kafka и Flume?](#в-чем-разница-между-kafka-и-flume) +* [В чем разница между Kafka и RabbitMQ?](#в-чем-разница-между-kafka-и-rabbitmq) + + +## Что такое Apache Kafka? + +Это распределённая система с открытым исходным кодом, разработанная для высокоскоростной передачи больших объёмов данных +с минимальной задержкой. + +### Преимущества + +* Персистентность данных +* Высокая производительность +* Независимость пайплайнов обработки +* Возможность просмотреть историю записей заново +* Гибкость в использовании + +### Когда использовать + +* λ-архитектура или k-архитектура +* Стриминг больших данных +* Много клиентов (producer и consumer) +* Требуется кратное масштабирование + +### Чего в Kafka нет из коробки + +* Это не брокер сообщений +* Отложенные сообщения +* DLQ +* AMQP / MQTT +* TTL на сообщение +* Очереди с приоритетами + +[к оглавлению](#apache-kafka) + +## Основные компоненты Kafka + +* **Producer (Производитель)** — приложение, которое публикует сообщения в топики Kafka +* **Consumer (Потребитель)** — приложение, которое подписывается на топики и читает сообщения +* **Broker (Брокер)** — сервер Kafka, который принимает, хранит и распределяет сообщения. В кластере Kafka может быть несколько брокеров +* **Topic (Топик)** — логическое разделение, по которому организуются данные. Производители отправляют сообщения в топики, а потребители читают из них +* **Partition (Раздел)** — каждый топик разделён на партиции для параллельной обработки. Сообщения в партициях упорядочены +* **Zookeeper** — сервис, используемый Kafka для управления состоянием кластера и координации брокеров. +Однако в новых версиях Kafka отказывается от Zookeeper в пользу собственного механизма метаданных KRaft (Kafka Raft). +Это новая внутренняя архитектура метаданных Kafka, которая устраняет зависимость от Zookeeper. Она основана на Raft-консенсусе, +позволяя Kafka брокерам самостоятельно управлять метаданными и координировать взаимодействие между собой. + +[к оглавлению](#apache-kafka) + +## Архитектура топика + +* **Топик разбит на партиции** — сообщения в топике распределяются по партициям для более эффективной параллельной обработки и хранения +* **Партиции хранятся на диске** — Kafka сохраняет данные на диск, что позволяет долговременно хранить сообщения +* **Партиции делятся на сегменты** — сегмент представляет собой обычный файл на диске, сегменты делятся на пассивные и активный. + Запись происходит в активный сегмент +* **Данные удаляются либо по времени, либо по размеру**. Удаление происходит посегментно, с самого старого сегмента + * **retention.bytes** - по максимальному размеру + * **retention.ms** - по времени +* **Сообщение можно быстро найти по его Offset** — каждому сообщению в партиции присваивается уникальный смещающий индекс (offset), по которому можно легко найти сообщение + +[к оглавлению](#apache-kafka) + +## Настройки топика Kafka + +### Репликация + +* `replication.factor` + * **Описание**: Количество реплик для каждой партиции топика + * **Пример**: `replication.factor=3` +* `min.insync.replicas` + * **Описание**: Минимальное количество синхронизированных реплик + * **Пример**: `min.insync.replicas=2` + +### Хранение данных + +* `retention.ms` + * **Описание**: Время хранения сообщений в топике в миллисекундах + * **Пример**: `retention.ms=604800000` (7 дней) +* `retention.bytes` + * **Описание**: Максимальный объём данных в топике, после чего старые сообщения удаляются + * **Пример**: `retention.bytes=10737418240` (10 GB) +* `segment.bytes` + * **Описание**: Размер сегмента логов топика + * **Пример**: `segment.bytes=1073741824` (1 GB) + +### Политики очистки + +* `cleanup.policy` + * **Описание**: Как Kafka обрабатывает старые сообщения + * **Значения**: `delete`, `compact` + * **Пример**: `cleanup.policy=delete` + +### Партиции + +* `num.partitions` + * **Описание**: Количество партиций в топике + * **Пример**: `num.partitions=3` + +[к оглавлению](#apache-kafka) + +## Архитектура брокера + +* **У каждой партиции свой лидер** — в Kafka для каждой партиции в топике назначается лидер-брокер, который отвечает + за запись и чтение данных +* **Сообщения пишутся в лидера** — производители отправляют сообщения напрямую в брокер-лидер партиции +* **Данные реплицируются между брокерами** — для обеспечения отказоустойчивости Kafka реплицирует данные партиций на + другие брокеры, которые становятся репликами +* **Автоматический фейловер лидера** — в случае сбоя брокера-лидера Kafka автоматически назначает новый лидер из числа + реплик, обеспечивая бесшовную работу системы + +[к оглавлению](#apache-kafka) + +## Настройки брокера Kafka + +### Репликация и консистентность + +* `min.insync.replicas` + * **Описание**: Минимальное количество синхронизированных реплик для подтверждения записи + * **Пример**: `min.insync.replicas=2` +* `unclean.leader.election.enable` + * **Описание**: Разрешает выбор лидера из неактуальных реплик, если нет синхронизированных реплик + * **Пример**: `unclean.leader.election.enable=false` + +### Логирование и хранение данных + +* `log.dirs` + * **Описание**: Директория на диске, где хранятся логи партиций + * **Пример**: `log.dirs=/var/lib/kafka/logs` +* `log.retention.hours` + * **Описание**: Максимальное время хранения данных в логах + * **Пример**: `log.retention.hours=168` (7 дней) +* `log.segment.bytes` + * **Описание**: Максимальный размер сегмента лога, после чего создаётся новый + * **Пример**: `log.segment.bytes=1073741824` (1 GB) + +### Производительность и задержки + +* `num.network.threads` + * **Описание**: Количество потоков для обработки сетевых запросов + * **Пример**: `num.network.threads=3` +* `num.io.threads` + * **Описание**: Количество потоков для ввода-вывода + * **Пример**: `num.io.threads=8` +* `socket.send.buffer.bytes` + * **Описание**: Размер буфера для отправки данных по сети + * **Пример**: `socket.send.buffer.bytes=102400` + +### Управление сообщениями + +* `message.max.bytes` + * **Описание**: Максимальный размер сообщения, которое брокер может принять + * **Пример**: `message.max.bytes=1048576` (1 MB) +* `replica.fetch.max.bytes` + * **Описание**: Максимальный размер данных для запроса реплики + * **Пример**: `replica.fetch.max.bytes=1048576` (1 MB) + +### Безопасность + +* `ssl.keystore.location` + * **Описание**: Путь к хранилищу ключей SSL + * **Пример**: `ssl.keystore.location=/var/private/ssl/kafka.keystore.jks` +* `ssl.truststore.location` + * **Описание**: Путь к хранилищу доверенных сертификатов + * **Пример**: `ssl.truststore.location=/var/private/ssl/kafka.truststore.jks` + +[к оглавлению](#apache-kafka) + +## Архитектура продюсера + +* **Создание сообщения (Record)**: Продюсер формирует сообщение, содержащее ключ (необязательный), значение и метаданные, + такие как время отправки. Сообщение отправляется в топик (Topic), который состоит из одной или нескольких партиций +* **Выбор партиции**: Если ключ сообщения указан, Kafka использует его для хеширования и определения, в какую партицию + записать сообщение (сообщения с одинаковым ключом попадают в одну и ту же партицию). Если ключа нет, Kafka распределяет + сообщения по партициям с помощью round-robin или по другим правилам +* **Отправка сообщений в буфер (Batching)**: Для повышения производительности продюсер Kafka не отправляет каждое сообщение + по отдельности, а группирует несколько сообщений в пакеты (batching), прежде чем отправить их брокеру. Это снижает + сетевые задержки и нагрузку на брокера +* **Сжатие (Compression)**: Для уменьшения объёма передаваемых данных продюсер может сжимать сообщения с использованием + таких алгоритмов, как GZIP, Snappy или LZ4. Сжатие снижает нагрузку на сеть и хранение, но добавляет небольшие накладные + расходы на процессор +* **Асинхронная отправка**: Продюсер отправляет пакеты сообщений асинхронно. Это означает, что сообщения записываются в + буфер памяти и отправляются брокеру, не ожидая завершения предыдущих операций. Это повышает пропускную способность +* **Подтверждения (Acknowledgments)**: Kafka позволяет настраивать уровень подтверждений от брокеров +* **Ретрай и идемпотентность**: Если отправка сообщения не удалась, продюсер может повторить попытку отправки (ретрай). + Также можно включить идемпотентный режим продюсера, что предотвращает повторную отправку одного и того же сообщения в + случае сбоя, обеспечивая отправку уникального сообщения один раз +* **Error handling**: Продюсер обрабатывает ошибки при отправке сообщений. В зависимости от настроек продюсер может + попытаться переотправить сообщение или сообщить о проблеме через callback + +### Резюме + +* Продюсер выбирает партицию для сообщения +* Продюсер выбирает уровень гарантии доставки +* В продюсере можно тюнить производительность + +[к оглавлению](#apache-kafka) + +## Настройки продюсера + +### Bootstrap-серверы (`bootstrap.servers`) + +* **Описание**: Указывает адреса брокеров Kafka, к которым продюсер должен подключаться для отправки сообщений +* **Пример**: `bootstrap.servers: localhost:9092,localhost:9093` +* **Зачем это нужно**: Kafka продюсер использует эти брокеры для получения метаданных о кластере (например, информация о топиках и партициях). Эти брокеры служат точками входа в кластер Kafka. + +### Сериализация ключа и значения + +Продюсер должен преобразовывать (сериализовать) данные в байтовый формат перед отправкой в Kafka + +* **Ключевая настройка для сериализации ключа:** + * `key.serializer` + * Пример: `key.serializer: org.apache.kafka.common.serialization.StringSerializer` +* **Ключевая настройка для сериализации значения:** + * `value.serializer` + * Пример: `value.serializer: org.apache.kafka.common.serialization.StringSerializer` + +**Варианты сериализаторов:** +* `StringSerializer` для строк +* `ByteArraySerializer` для массива байтов +* `LongSerializer` для чисел +* Также можно реализовать свои собственные сериализаторы + +### Отправка сообщений в буфер + +Продюсер Kafka отправляет сообщения асинхронно, и для этого используется буферизация сообщений + +* **batch.size**: Размер одного пакета (batch), который продюсер отправляет брокеру + * **Описание**: Определяет количество байтов сообщений, которые могут быть буферизованы в одном пакете перед отправкой брокеру + * **Пример**: `"batch.size": 16384` (16 KB) + * **Зачем это нужно**: Большие пакеты могут повысить производительность, но могут увеличить задержки +* **linger.ms**: Максимальное время ожидания перед отправкой пакета + * **Описание**: Продюсер может немного подождать, пока буфер накопит сообщения, чтобы отправить больше данных за один раз + * **Пример**: `linger.ms: 5` (время ожидания 5 мс) + * **Зачем это нужно**: Позволяет продюсеру собирать больше сообщений в пакете перед отправкой, что может улучшить эффективность использования сети +* **buffer.memory**: Размер выделенной памяти для буферизации сообщений + * **Описание**: Общий объем памяти, который продюсер может использовать для хранения сообщений, ожидающих отправки + * **Пример**: `buffer.memory: 33554432` (32 MB) + * **Зачем это нужно**: Если буфер заполняется, продюсер приостанавливает отправку сообщений, пока буфер не освободится + +### Сжатие сообщений + +Продюсер может сжимать сообщения для уменьшения объема передаваемых данных + +* **compression.type** + * **Описание**: Указывает тип сжатия для сообщений + * **Пример**: `compression.type: gzip` (варианты: none, gzip, snappy, lz4, zstd) + * **Зачем это нужно**: Сжатие уменьшает объем данных, передаваемых по сети, что может снизить нагрузку на сеть и хранилище, + особенно при больших объемах сообщений. Однако это может потребовать дополнительных ресурсов на сжатие/разжатие + +### Распределение сообщений по партициям (партицирование) + +* **partitioner.class** + * **Описание**: определяет логику, по которой продюсер выбирает партицию для каждого сообщения + * **Примеры**: + * **если настройка не задана**, по умолчанию используется `DefaultPartitioner` , который может распределять сообщения по партициям + равномерно или на основе ключа сообщения + * `partitioner.class: o.a.k.clients.producer.RoundRobinPartitioner` использует метод Round Robin для распределения сообщений + * `partitioner.class: o.a.k.clients.producer.UniformStickyPartitioner` равномерно отправляет сообщения, привязываясь + к партиции на короткий промежуток времени, чтобы уменьшить нагрузку на брокеры + +### Подтверждения (acks) + +Настройка определяет, как много брокеров должны подтвердить получение сообщения перед тем, как продюсер будет считать его +успешно отправленным + +* **acks** + * **Описание**: Определяет количество подтверждений от брокеров + * **Значения**: + * `0`: Продюсер не ждёт подтверждений (самая быстрая отправка, но высокий риск потери сообщений) + * `1`: Продюсер ждёт подтверждения от лидера партиции + * `all` (или `-1`): Продюсер ждёт подтверждений от всех реплик (наибольшая надежность, но увеличенные задержки) + * **Пример**: `acks: all` + * **Зачем это нужно**: Позволяет выбрать баланс между скоростью и надежностью отправки данных. + +### Дополнительные важные настройки + +* **Количество повторных попыток (retries):** + * **Описание**: Определяет, сколько раз продюсер должен попытаться отправить сообщение при неудаче + * **Пример**: `retries: 3` + * **Зачем это нужно**: Если произошёл временный сбой, продюсер может попытаться повторить отправку сообщений, что + увеличивает шанс доставки +* **Идемпотентность продюсера (enable.idempotence):** + * **Описание**: Включение идемпотентного режима, что предотвращает дублирование сообщений при сбоях + * **Пример**: `enable.idempotence: true` + * **Зачем это нужно**: Гарантирует, что каждое сообщение будет доставлено ровно один раз +* **Максимальный размер сообщения (max.request.size):** + * **Описание**: Максимальный размер сообщения, которое продюсер может отправить брокеру + * **Пример**: `max.request.size: 1048576` (1 MB) + * **Зачем это нужно**: Ограничивает размер сообщений, которые могут быть отправлены, чтобы избежать перегрузки сети и брокеров. +* **Таймаут ожидания подтверждений (request.timeout.ms):** + * **Описание**: Максимальное время ожидания подтверждения от брокера + * **Пример**: `request.timeout.ms: 30000` (30 секунд) + * **Зачем это нужно**: Помогает избежать бесконечного ожидания ответа от брокера в случае его сбоя + +[к оглавлению](#apache-kafka) + +## Пример конфигурации Kafka Producer + +```java +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import java.util.Properties; + +public class KafkaStringArrayProducer { + + public static void main(String[] args) { + // Настройки Kafka Producer + Properties props = new Properties(); + props.put("bootstrap.servers", "localhost:9092"); + props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + + // Создание Kafka Producer + KafkaProducer producer = new KafkaProducer<>(props); + + String key = "user123"; + String[] value = {"message1", "message2", "message3"}; + + // Создание записи и добавление заголовков + ProducerRecord record = new ProducerRecord<>("my_topic", key, value); + record.headers().add("traceId", "someTraceId"); + + // Отправка сообщения в Kafka + producer.send(record, (metadata, exception) -> { + if (exception != null) { + System.out.println("Ошибка при отправке сообщения: " + exception.getMessage()); + } else { + System.out.println("Сообщение отправлено в топик " + metadata.topic() + " с партицией " + metadata.partition()); + } + }); + + producer.close(); + } +} +``` + +```properties +acks=all +retries=3 +compression.type=gzip +``` + +### С использованием Spring Kafka + +```java +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.config.ConcurrentMessageListenerContainer; +import org.springframework.kafka.listener.MessageListenerContainer; +import org.springframework.kafka.producer.Producer; +import org.springframework.kafka.producer.ProducerRecord; + +import java.util.HashMap; +import java.util.Map; + +@EnableKafka +@Configuration +public class KafkaProducerConfig { + + @Autowired + private KafkaProperties kafkaProperties; + + @Bean + public Map producerConfigs() { + Map props = new HashMap<>(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getServer()); + props.put(ProducerConfig.CLIENT_ID_CONFIG, kafkaProperties.getProducerId()); + props.put( + ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, + "com.example.configuration.kafka.KafkaProducerLoggingInterceptor" + ); + + if ("SASL_SSL".equals(kafkaProperties.getSecurityProtocol())) { + props.put("ssl.truststore.location", kafkaProperties.getSslTrustStoreLocation()); + props.put("ssl.truststore.password", kafkaProperties.getSslTrustStorePassword()); + props.put("ssl.truststore.type", kafkaProperties.getSslTrustStoreType()); + props.put("ssl.keystore.type", kafkaProperties.getSslKeyStoreType()); + + props.put("sasl.mechanism", kafkaProperties.getSaslMechanism()); + props.put("security.protocol", kafkaProperties.getSecurityProtocol()); + props.put("sasl.jaas.config", kafkaProperties.getJaasConfigCompiled()); + } + + return props; + } + + @Bean + public ProducerFactory producerFactory() { + var stringSerializerKey = new StringSerializer(); + stringSerializerKey.configure(Map.of("key.serializer.encoding", "UTF-8"), true); + stringSerializerKey.configure(Map.of("serializer.encoding", "UTF-8"), true); + + var stringSerializerValue = new StringSerializer(); + stringSerializerValue.configure(Map.of("value.serializer.encoding", "UTF-8"), false); + stringSerializerValue.configure(Map.of("serializer.encoding", "UTF-8"), false); + + return new DefaultKafkaProducerFactory<>(producerConfigs(), stringSerializerKey, stringSerializerValue); + } + + @Bean + public KafkaTemplate kafkaTemplate() { + return new KafkaTemplate<>(producerFactory()); + } +} +``` + +```java +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.stereotype.Service; + +@Service +public class KafkaProducerService { + + private final KafkaTemplate kafkaTemplate; + + public KafkaProducerService(KafkaTemplate kafkaTemplate) { + this.kafkaTemplate = kafkaTemplate; + } + + public void sendMessage(String message, String key, String topic) { + try { + log.info("Sending message {}", data); + kafkaTemplate.send(topic, key, message); + log.info("Successfully send message {}", data); + } catch (Exception ex) { + log.error("Failed send message to {} topic by key {}", key, topic); + throw ex; + } + } +} +``` + +```java +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.*; + +@RestController +@RequestMapping("/kafka") +public class KafkaController { + + @Autowired + private KafkaProducerService kafkaProducerService; + + @PostMapping("/send") + public String sendMessage(@RequestParam String message, @RequestParam String key, @RequestParam String topic) { + kafkaProducerService.sendMessage(message, key, topic); + return "Message sent to Kafka!"; + } +} +``` + +### С использованием Spring Cloud Stream + +```yaml +spring: + cloud: + stream: + bindings: + output: + destination: my_topic + kafka: + binder: + brokers: localhost:9092 +``` + +```java +import org.springframework.cloud.stream.annotation.EnableBinding; +import org.springframework.cloud.stream.messaging.Source; +import org.springframework.integration.support.MessageBuilder; +import org.springframework.messaging.Message; +import org.springframework.stereotype.Service; + +@Service +@EnableBinding(Source.class) // Подключение к каналу сообщений +public class KafkaStreamProducer { + + private final Source source; + + public KafkaStreamProducer(Source source) { + this.source = source; + } + + public void sendMessage(String message) { + Message msg = MessageBuilder.withPayload(message).build(); + source.output().send(msg); // Отправка сообщения в Kafka + } +} +``` + +```java +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.*; + +@RestController +@RequestMapping("/kafka-stream") +public class KafkaStreamController { + + @Autowired + private KafkaStreamProducer kafkaStreamProducer; + + @PostMapping("/send") + public String sendMessage(@RequestParam String message) { + kafkaStreamProducer.sendMessage(message); + return "Message sent to Kafka via Spring Cloud Stream!"; + } +} +``` + +[к оглавлению](#apache-kafka) + +## Архитектура консюмера + +Потребители используют **Kafka Consumer API** для взаимодействия с брокерами Kafka. Они получают сообщения и обрабатывают +их согласно своей логике. Потребители могут быть объединены в группы **Consumer Groups**. + +### Резюме + +* "Smart" консюмер +* Консюмер опрашивает кафку +* Консюмер отвечает за гарантию обработки +* Автоматические фейловер в консюмер-группе +* Независимая обработка разными консюмер-группе + +### Компоненты + +#### Consumer Group + +Kafka использует концепцию Consumer Groups, что позволяет нескольким потребителям работать вместе, чтобы параллельно +обрабатывать данные из топиков. Каждый потребитель в группе обрабатывает только часть данных из топика, обеспечивая масштабируемость и балансировку нагрузки. + +* Все сообщения из одного Kafka Topic делятся между всеми потребителями в группе +* Если в группе несколько потребителей, Kafka гарантирует, что каждая партиция топика будет обрабатываться только одним потребителем +* В случае если один из потребителей выходит из строя, его партиции автоматически перераспределяются между оставшимися активными потребителями + +#### Offset (Смещение) + +Потребитель отслеживает offset каждой партиции, чтобы понимать, с какого сообщения продолжать чтение. Смещение — это +уникальный идентификатор каждого сообщения в партиции. + +Потребители могут хранить offset в Kafka или вне её (например, в базе данных или файловой системе). Если потребитель +отключается, он может возобновить обработку с того места, где остановился, прочитав сохранённый offset. + +#### Poll (Опрос) + +Потребители используют метод poll() для опроса Kafka на наличие новых сообщений. Это асинхронный процесс, и Kafka будет +отправлять потребителю доступные сообщения по мере их поступления. + +* Потребитель может указывать тайм-аут, после которого метод poll() вернёт пустой результат, если сообщений нет. +* Потребитель должен обрабатывать сообщения, а затем снова опрашивать Kafka для получения новых данных. + +### Процесс работы + +1. **Инициализация**: Потребитель подключается к Kafka-брокерам и присоединяется к consumer group. Он получает информацию о партиции топика, который будет читать. +2. **Подписка на топик**: Потребитель подписывается на определённые топики с помощью метода `subscribe()`. +3. **Опрос**: Потребитель вызывает метод `poll()` для получения новых сообщений. Если в очереди есть сообщения, они передаются потребителю для обработки. +4. **Обработка сообщений**: Потребитель обрабатывает сообщения, извлекая полезную информацию из каждого. +5. **Подтверждение обработки**: После обработки сообщения потребитель подтверждает обработку с помощью `commit()`. + Это обновляет **offset**, позволяя потребителю продолжить чтение с места, на котором остановился. +6. **Обработка ошибок**: В случае ошибки потребитель может решить, как повторить обработку сообщения + (например, с использованием механизма повторных попыток). +7. **Завершение работы**: Когда потребитель завершает обработку, он выходит из consumer group и может закрыть соединение с Kafka. + +[к оглавлению](#apache-kafka) + +## Настройки консюмера + +* **bootstrap.servers** — список брокеров, к которым будет подключаться потребитель +* **group.id** — идентификатор группы потребителей +* **auto.offset.reset** — настройка поведения при отсутствии offset (`earliest` для чтения с самого начала или `latest` для чтения с конца) +* **enable.auto.commit** — указывает, должен ли потребитель автоматически коммитить offset. Если `false`, потребитель должен делать это вручную +* **auto.commit.interval.ms** — определяет интервал времени между автоматическими коммитами offset сообщений, если включена автоматическая фиксация +* **max.poll.records** — максимальное количество сообщений, которые потребитель будет получать за один вызов `poll()` +* **session.timeout.ms** — максимальное время без общения с Kafka перед тем, как потребитель считается недоступным +* **client.rack** — используется для указания серверной стойки или дата-центра. Это особенно важно в случае, если у вас + есть распределённая инфраструктура Kafka с несколькими стойками или дата-центрами, где сообщения могут быть реплицированы + между разными физическими местоположениями (например, несколькими дата-центрами). + +### Что такое Rack в контексте Kafka? + +**Rack** — это метка, которая идентифицирует физическое местоположение брокеров Kafka. В Kafka можно задать rack для каждого брокера +с помощью параметра `broker.rack`, чтобы управлять репликацией данных, предпочтительно размещая реплики на разных физических машинах или в разных дата-центрах. + +**Преимущества использования client.rack** + +* **Снижение задержек**: Kafka будет предпочитать, чтобы данные попадали в тот же rack, где находится клиент, что уменьшает время отклика +* **Повышенная отказоустойчивость**: С правильной настройкой client.rack и broker.rack можно улучшить отказоустойчивость + за счет размещения реплик в разных физически удаленных местах +* **Лучшее использование ресурсов**: Правильное распределение нагрузки по rack помогает избежать перегрузки одного физического местоположения + +[к оглавлению](#apache-kafka) + +## Пример конфигурации Kafka Consumer + +```java +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.StringDeserializer; + +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; +import java.util.Collections; + +public class KafkaConsumerExample { + + public static void main(String[] args) { + String bootstrapServers = "localhost:9092"; + String groupId = "my-consumer-group"; + String topic = "my-topic"; + + // Настройки Consumer + Map consumerConfigs = new HashMap<>(); + consumerConfigs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + consumerConfigs.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); + consumerConfigs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + consumerConfigs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + // Создание Consumer + KafkaConsumer consumer = new KafkaConsumer<>(consumerConfigs); + + // Подписка на тему + consumer.subscribe(Collections.singletonList(topic)); + + try { + // Чтение сообщений из Kafka + while (true) { + var records = consumer.poll(Duration.ofSeconds(1)); + records.forEach(record -> System.out.println("Received message: " + record.value())); + } + } finally { + consumer.close(); + } + } +} +``` + +**At least once** + +Чтобы гарантировать обработку сообщений хотя бы один раз, нужно коммитить после обработки. + +```java +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; + +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; +import java.util.Collections; + +public class KafkaConsumerAtLeastOnce { + + public static void main(String[] args) { + try { + // Чтение сообщений + while (true) { + var records = consumer.poll(Duration.ofSeconds(1)); // Ожидание 1 секунду для получения сообщений + process(records); + consumer.commitAsync(); // Commit после обработки + } + } finally { + consumer.close(); // Закрытие consumer + } + } +} +``` + +**At most once** + +Чтобы гарантировать обработку сообщений не более одного раза, нужно коммитить до обработки или включить авто-подтверждение смещений +`enable.auto.commit=true`. + +```java +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; + +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; +import java.util.Collections; + +public class KafkaConsumerAtLeastOnce { + + public static void main(String[] args) { + try { + // Чтение сообщений + while (true) { + var records = consumer.poll(Duration.ofSeconds(1)); // Ожидание 1 секунду для получения сообщений + consumer.commitAsync(); // Commit перед обработкой + process(records); + } + } finally { + consumer.close(); // Закрытие consumer + } + } +} +``` + +### С использованием Spring Kafka + +```java +@EnableKafka +@Configuration +public class KafkaConsumerConfig { + + @Autowired + private KafkaProperties kafkaProperties; + + @Bean + public ConsumerFactory consumerFactory() { + return new DefaultKafkaConsumerFactory<>(consumerConfigs()); + } + + @Bean + public Map consumerConfigs() { + Map configs = new HashMap<>(); + configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getServer()); + configs.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getConsumerGroupId()); + configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + return configs; + } + + @Bean + public KafkaListenerContainerFactory> kafkaListenerContainerFactory() { + ConcurrentMessageListenerContainerFactory factory = new ConcurrentMessageListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory()); + return factory; + } +} +``` + +```java +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.stereotype.Service; + +@Service +public class KafkaConsumer { + + @KafkaListener(topics = "my_topic", groupId = "group_id") + public void listen(@Payload String message, + @Header("traceId") String traceId, + @Header("correlationId") String correlationId) { + System.out.println("Received message: " + message); + System.out.println("Trace ID: " + traceId); + System.out.println("Correlation ID: " + correlationId); + } +} +``` + +**At least once** + +```yaml +spring: + kafka: + consumer: + enable-auto-commit: false # Отключение авто-commit + auto-offset-reset: earliest # Начинать чтение с самого начала (если нет смещения) + group-id: my-consumer-group + max-poll-records: 500 # Максимальное количество сообщений для обработки за один раз + listener: + ack-mode: manual # Ручное подтверждение +``` + +```java +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.listener.MessageListener; +import org.springframework.kafka.listener.MessageListenerContainer; +import org.springframework.kafka.listener.config.DefaultMessageListenerContainer; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.listener.MessageListener; +import org.springframework.kafka.listener.MessageListenerContainer; + +@EnableKafka +public class AtLeastOnceConsumer { + + @KafkaListener(topics = "my-topic", groupId = "my-consumer-group") + public void listen(String message, Acknowledgment acknowledgment) { + System.out.println("Received message: " + message); + // Обработка сообщения + // Подтверждение смещения вручную после успешной обработки + acknowledgment.acknowledge(); + } +} +``` + +**At most once** + +```yaml +spring: + kafka: + consumer: + enable-auto-commit: true # Включение авто-commit + group-id: my-consumer-group + auto-offset-reset: earliest # Начинать чтение с самого начала + max-poll-records: 100 # Максимальное количество сообщений для обработки за один раз +``` + +```java +import org.springframework.kafka.annotation.KafkaListener; + +public class AtMostOnceConsumer { + + @KafkaListener(topics = "my-topic", groupId = "my-consumer-group") + public void listen(String message) { + System.out.println("Received message: " + message); + // Обработка сообщения... + // Смещение будет автоматически зафиксировано после получения сообщения + } +} +``` + +### С использованием Spring Cloud Stream + +```yaml +spring: + cloud: + stream: + bindings: + input: + destination: my-topic + group: my-consumer-group + content-type: application/json + kafka: + binder: + brokers: localhost:9092 + auto-create-topics: false +``` + +```java +import org.springframework.cloud.stream.annotation.EnableBinding; +import org.springframework.cloud.stream.annotation.StreamListener; +import org.springframework.messaging.handler.annotation.Payload; +import org.springframework.stereotype.Service; + +@Service +@EnableBinding(KafkaProcessor.class) // Указывает на интерфейс, с которым связывается этот сервис +public class KafkaConsumerService { + + // Метод будет слушать сообщения из указанного канала + @StreamListener("input") + public void handle(@Payload String message) { + System.out.println("Received message: " + message); + } +} +``` + +```java +import org.springframework.cloud.stream.annotation.Input; +import org.springframework.messaging.SubscribableChannel; + +public interface KafkaProcessor { + + @Input("input") // Имя канала, которое мы используем в application.yml + SubscribableChannel input(); +} +``` + +**At least once** + +```yaml +spring: + cloud: + stream: + bindings: + input: + destination: my-topic + group: my-consumer-group + content-type: application/json + consumer: + ackMode: manual # Ручное подтверждение + maxAttempts: 3 # Максимальное количество попыток +``` + +```java +import org.springframework.cloud.stream.annotation.EnableBinding; +import org.springframework.cloud.stream.annotation.StreamListener; +import org.springframework.messaging.Message; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.messaging.simp.SimpMessagingTemplate; +import org.springframework.stereotype.Component; + +@Component +@EnableBinding(Sink.class) // Sink - это интерфейс, предоставляющий Binding для входных сообщений +public class AtLeastOnceConsumer { + + @StreamListener(Sink.INPUT) + public void handleMessage(Message message, @Header(name = "kafka_offset") String offset) { + // Обработка сообщения + System.out.println("Received message: " + message.getPayload()); + // После успешной обработки подтверждаем сообщение + // Spring Cloud Stream автоматически подтвердит сообщение после завершения метода + // благодаря ackMode=manual и настроенному acknowledgment + } +} +``` + +**At most once** + +```yaml +spring: + cloud: + stream: + bindings: + input: + destination: my-topic + group: my-consumer-group + content-type: application/json + consumer: + ackMode: batch # Автоматическое подтверждение после пакета сообщений +``` + +```java +import org.springframework.cloud.stream.annotation.EnableBinding; +import org.springframework.cloud.stream.annotation.StreamListener; +import org.springframework.messaging.Message; +import org.springframework.stereotype.Component; + +@Component +@EnableBinding(Sink.class) +public class AtMostOnceConsumer { + + @StreamListener(Sink.INPUT) + public void handleMessage(Message message) { + // Обработка сообщения + System.out.println("Received message: " + message.getPayload()); + // Смещение будет автоматически зафиксировано после получения сообщения + } +} +``` + +**Mostly Once** + +Это гибридный режим, который стремится быть чем-то средним между At least once и At most once. Он предполагает, что сообщения +будут доставлены обычно один раз, но иногда, в случае сбоев, может быть обработано больше одного раза. Для реализации +такого режима в Spring Cloud Stream потребуется дополнительная логика, например, фильтрация дублированных сообщений или +использование уникальных идентификаторов сообщений. + +В рамках Spring Cloud Stream, можно обработать Mostly Once с использованием уникальных идентификаторов сообщений или +кеширования состояния, чтобы отфильтровать повторно обработанные сообщения. + +```yaml +spring: + cloud: + stream: + bindings: + input: + destination: my-topic + group: my-consumer-group + content-type: application/json + consumer: + ackMode: manual # Ручное подтверждение + maxAttempts: 3 # Максимальное количество попыток +``` + +```java +import org.springframework.cloud.stream.annotation.EnableBinding; +import org.springframework.cloud.stream.annotation.StreamListener; +import org.springframework.messaging.Message; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.messaging.simp.SimpMessagingTemplate; +import org.springframework.stereotype.Component; + +import java.util.HashSet; +import java.util.Set; + +@Component +@EnableBinding(Sink.class) +public class MostlyOnceConsumer { + + private Set processedMessageIds = new HashSet<>(); + + @StreamListener(Sink.INPUT) + public void handleMessage(Message message, @Header("messageId") String messageId) { + if (processedMessageIds.contains(messageId)) { + System.out.println("Duplicate message: " + messageId); + return; // Пропускаем дублированное сообщение + } + // Обработка сообщения + System.out.println("Received message: " + message.getPayload()); + // Добавляем идентификатор в обработанные + processedMessageIds.add(messageId); + // После успешной обработки подтверждаем сообщение вручную + // Spring Cloud Stream подтвердит сообщение после выполнения метода + } +} +``` + +[к оглавлению](#apache-kafka) + +## Основные API Kafka + +* Producer API +* Consumer API +* Streams API +* Connector API + +[к оглавлению](#apache-kafka) + +## Какова роль Producer API? + +Используется для публикации потока сообщений в топики Kafka. Он управляет партицированием сообщений, сжатием и балансировкой +нагрузки между несколькими брокерами. Продюсер также отвечает за повторные неудачные попытки публикации и может быть +настроен на различные уровни гарантий доставки. + +[к оглавлению](#apache-kafka) + +## Какова роль Consumer API? + +Обеспечивает механизм для потребления сообщений топиков. Оно позволяет приложениям и микросервисам читать данные, +поступающие в Kafka, и обрабатывать их для дальнейшего использования, будь то хранение, анализ или реактивная обработка. + +[к оглавлению](#apache-kafka) + +## Какова роль Connector API? + +Connector API в Apache Kafka является частью Kafka Connect, которая представляет собой инфраструктуру для интеграции +внешних систем с Kafka. Connector API играет ключевую роль в упрощении процесса подключения различных источников данных +и систем-приемников к Kafka, предоставляя возможность автоматического перемещения данных между ними. + +[к оглавлению](#apache-kafka) + +## Какова роль Streams API? + +Это компонент Apache Kafka, предназначенный для создания приложений и микросервисов, которые обрабатывают потоки данных +в реальном времени. Его основная роль заключается в том, чтобы позволить разработчикам легко обрабатывать и анализировать +данные, поступающие в виде непрерывных потоков из топиков. Kafka Streams API предоставляет высокоуровневый интерфейс для +выполнения таких операций, как фильтрация, агрегация, объединение данных и вычисление оконных функций. + +[к оглавлению](#apache-kafka) + +## Какова роль Transactions API? + +Kafka Transactions API позволяет выполнять атомарные обновления для нескольких топиков. Он включает exactly-once +гарантию для приложений, которые читают данные из одного топика и пишут в другой. Это особенно полезно для приложений потоковой +обработки, которым необходимо гарантировать, что каждое входное событие влияет на выходные данные ровно один раз, даже в случае сбоев. + +[к оглавлению](#apache-kafka) + +## Какова роль Quota API? + +Quota API позволяет настраивать квоты для каждого клиента для ограничения скорости создания или потребления данных, чтобы +один клиент не потреблял слишком много ресурсов брокера. Это помогает обеспечить справедливое распределение ресурсов и +предотвратить сценарии отказа в обслуживании. + +[к оглавлению](#apache-kafka) + +## Какова роль AdminClient API? + +AdminClient API предоставляет операции для управления топиками, брокерами, конфигурацией и другими объектами Kafka. +Его можно использовать для создания, удаления и описания топиков, управления списками ACL, получения информации о кластере и +программного выполнения других административных задач. + +[к оглавлению](#apache-kafka) + +## Kafka Consumer + +## Для чего нужен координатор группы? + +Координатор группы отвечает за управление группами потребителей. Он управляет членством в группах потребителей, назначает +партиции потребителям внутри группы и управляет фиксацией смещения. Когда потребитель присоединяется к группе или покидает ее, +координатор группы запускает перебалансировку для переназначения партиций среди оставшихся потребителей. + +[к оглавлению](#apache-kafka) + +## Для чего нужен Consumer heartbeat thread? + +Consumer Heartbeat Thread отвечает за отправку периодических сигналов брокеру Kafka (в частности, координатору группы). +Эти сигналы указывают на то, что потребитель жив и все еще является частью группы потребителей. Если потребитель не отправляет +данные сигналы в течение настроенного периода, он считается неживым, и координатор группы инициирует перебалансировку +для переназначения его партиций другим потребителям в группе. + +[к оглавлению](#apache-kafka) + +## Как Kafka обрабатывает сообщения? + +Kafka поддерживает два основных способа обработки сообщений: +* **Queue**: каждое сообщение обрабатывается одним потребителем в группе потребителей. Это достигается за счет наличия + в группе нескольких потребителей, каждый из которых считывает данные из отдельных партиций. +* **Publish-Subscribe**: все сообщения обрабатываются всеми потребителями. Это достигается за счет того, что каждый + потребитель находится в своей собственной группе потребителей, что позволяет всем потребителям читать все сообщения. + +[к оглавлению](#apache-kafka) + +## Как Kafka обрабатывает задержку консюмера? + +Задержка (лаг) консюмера в Kafka относится к разнице между оффсетом последнего созданного сообщения и оффсетом последнего +полученного сообщения. Kafka предоставляет инструменты и API для мониторинга задержек консюмеров такие, как инструмент +командной строки Kafka Consumer Groups и API AdminClient. Высокая задержка консюмеров может указывать на проблемы с +производительностью или недостаточную пропускную способность консюмеров. Kafka не обрабатывает задержки автоматически, +но предоставляет информацию, необходимую приложениям для принятия решений о масштабировании или оптимизации производительности. + +[к оглавлению](#apache-kafka) + +## Для чего нужны методы subscribe() и poll()? + +Метод subscribe() используется для подписки на один или несколько топиков. Фактически он не извлекает никаких данных. +Метод poll(), с другой стороны, используется для извлечения данных из топиков. Он возвращает записи, которые были опубликованы +с момента последнего запроса топиков и партиций. Метод poll() обычно вызывается в цикле для непрерывного получения данных. + +[к оглавлению](#apache-kafka) + +## Для чего нужен метод position()? + +Метод position() возвращает смещение следующей записи, которая будет извлечена для данной партиции. Это полезно для +отслеживания хода получения данных и может использоваться в сочетании с методом committed(), чтобы определить насколько +сильно потребитель отстал от своего последнего комита оффсета. Эта информация может быть ценной для мониторинга и +управления показателями потребителей. + +[к оглавлению](#apache-kafka) + +## Для чего нужны методы commitSync() и commitAsync()? + +Эти методы используются для фиксации смещений: +* **commitSync()**: синхронно фиксирует последнее смещение, возвращенное poll(). Он будет повторять попытку до тех пор, + пока не завершится успешно или не столкнется с непроверяемой ошибкой. +* **commitAsync()**: асинхронно фиксирует смещения. Он не повторяет попытку при сбое, что делает его более быстрым, + но менее надежным, чем commitSync(). Выбор между этими методами зависит от баланса между производительностью и надежностью, + требуемого приложением. + +[к оглавлению](#apache-kafka) + +## Другие вопросы + +## Для чего нужен идемпотентный продюсер? + +Идемпотентный продюсер гарантирует exactly-once гарантию доставки, предотвращая дублирование записей в Kafka в случае +повторных попыток отправки сообщений. Это важно для поддержания целостности данных и правильности их обработки в системе, +особенно в распределенных системах, где могут возникать ошибки связи или сбои. + +[к оглавлению](#apache-kafka) + +## Для чего нужен интерфейс Partitioner? + +Интерфейс Partitioner в Producer API определяет в какую партицию топика будет отправлено сообщение. Partitioner по-умолчанию +использует хэш ключа (если он присутствует) для выбора партиции, гарантируя, что сообщения с одним и тем же ключом всегда +отправляются в одну и ту же партицию. Могут быть реализованы пользовательские Partitioner для управления распределением +сообщений по партициям на основе определенной бизнес-логики или характеристик данных. + +[к оглавлению](#apache-kafka) + +## Для чего нужен Broker log cleaner thread? + +Поток очистки журнала в Kafka отвечает за выполнение сжатия журнала. Сжатие журнала - это механизм, при котором Kafka +удаляет избыточные записи, сохраняя только последнее значение для каждого ключа. Это полезно в тех случаях, когда требуется +только последнее обновление для данного ключа, например, для обслуживания changelog или состояния БД. Программа очистки журналов +периодически запускается для сжатия соответствующих партиций. + +[к оглавлению](#apache-kafka) + +## Для чего нужен Kafka Mirror Maker? + +Это инструмент, позволяющий реплицировать данные между кластерами Kafka, потенциально находящихся в разных дата-центрах. +Он работает, потребляя данные из одного кластера и передавая в другой. Можно использовать для создания резервной копии данных, +объединения данных из нескольких дата-центров в единое хранилище или для переноса данных между кластерами. + +[к оглавлению](#apache-kafka) + +## Для чего нужна Schema Registry? + +Kafka Schema Registry предоставляет RESTful интерфейс для хранения и извлечения схем Avro. Schema Registry используется +совместно с Kafka для обеспечения совместимости схем данных между производителями и потребителями. Это особенно полезно +при разработке моделей данных с течением времени, сохраняя обратную и прямую совместимость. + +[к оглавлению](#apache-kafka) + +## Для чего нужен Streams DSL? + +Kafka Streams DSL предоставляет высокоуровневый API для операций потоковой обработки. Он позволяет разработчикам описывать +сложную логику обработки, такую как фильтрация, преобразование, агрегирование и объединение потоков данных. DSL абстрагирует +многие низкоуровневые детали потоковой обработки, упрощая создание и обслуживание приложений потоковой обработки. + +[к оглавлению](#apache-kafka) + +## Как Kafka обеспечивает версионирование сообщений? + +Сама по себе Kafka не обеспечивает версионирование сообщений напрямую, но предоставляет механизмы, позволяющие реализовывать +управление версиями. Одним из распространенных подходов является включение поля версии в схему сообщения. Для более сложных задач +управления версиями используются реестры схем (например, Confluent Schema Registry), которые могут управлять изменением схемы и совместимостью. + +[к оглавлению](#apache-kafka) + +## Как потребители получают сообщения от брокера? + +Kafka использует pull-модель для извлечения сообщений. Потребители запрашивают сообщения у брокеров, а не брокеры +отправляют сообщения потребителям. Это позволяет потребителям контролировать скорость, с которой они получают сообщения. +Потребители отправляют запросы на получение данных от брокера, указывая топик, партицию и начальное смещение для каждой партиции. +Брокер отвечает сообщениями с объемом до указанного максимального предела в байтах. + +[к оглавлению](#apache-kafka) + +## В чем разница между Kafka Streams и Apache Flink? + +Kafka Streams и Apache Flink — это два мощных инструмента для обработки потоков данных в режиме реального времени, но +они различаются по архитектуре, возможностям и сценариям применения. + +### Сравнение Kafka Streams и Apache Flink + +| **Критерий** | **Kafka Streams** | **Apache Flink** | +|------------------------|---------------------------------------------|-----------------------------------------| +| **Архитектура** | Встроенная библиотека, работающая внутри приложения. Зависит от Kafka. | Независимая распределенная система потоковой обработки данных с возможностью интеграции с различными источниками и приемниками данных. | +| **Обработка данных** | Обрабатывает потоки событий непосредственно из Kafka. Подходит для обработки событий и транзакционных данных с минимальной задержкой. | Поддерживает как потоковую (streaming), так и пакетную (batch) обработку данных. Специализируется на сложной обработке событий с гибкими возможностями управления состоянием. | +| **Зависимость от Kafka**| Построена исключительно вокруг Kafka. Требует Kafka для получения и отправки данных. | Работает с широким спектром источников данных (Kafka, HDFS, базы данных и т. д.). Kafka — лишь один из многих источников. | +| **Установка** | Легко интегрируется в существующее Java/Scala-приложение как библиотека. Не требует развертывания кластеров. | Требует отдельного кластера для выполнения, что подходит для высокопроизводительных распределенных систем. | +| **Управление состоянием** | Встроенное состояние с использованием RocksDB, также поддержка репликации состояния. | Имеет развитую систему управления состоянием, поддерживает сложные функции восстановления состояния и обработки данных. | +| **Гарантия доставки** | Поддерживает "at-least-once" и "exactly-once" семантику, когда Kafka настроена соответствующим образом. | Имеет гибкие гарантии доставки: поддержка "exactly-once", "at-least-once" и "at-most-once". | +| **Масштабируемость** | Масштабируется автоматически вместе с Kafka-партициями. Каждая инстанция потребителя Kafka обрабатывает свою партицию. | Поддерживает масштабирование на уровне задач (task), с более гибкой моделью масштабирования и управления ресурсами. | +| **Обработка событий** | Подходит для обработки событий с низкой задержкой и транзакционными требованиями. | Специализируется на сложной обработке событий, таких как windowing, агрегирование и работа с изменяющимся состоянием. Поддерживает сложные аналитические операции. | +| **Инструменты и API** | Легковесная библиотека с простыми API для работы с потоками данных. Основные операции — фильтрация, маппинг, объединение потоков, windowing. | Продвинутая система с богатыми API для сложных вычислений, поддерживающая потоковую и пакетную обработку, обработку событий и контроль сложных бизнес-процессов. | +| **Требования к ресурсам**| Менее ресурсоемка, так как не требует отдельного кластера. Работает в рамках JVM-приложения. | Требует более высоких вычислительных ресурсов, так как выполняется на отдельном кластере и поддерживает высокую степень параллелизма. | + +### Когда выбрать Kafka Streams +- Если вы уже используете Kafka и вам нужна легковесная библиотека для обработки данных непосредственно внутри вашего приложения. +- Для сценариев с низкой задержкой, где данные приходят из Kafka и должны быть быстро обработаны с минимальными накладными расходами. +- Если вам нужно встроить обработку потоков данных в существующую Java/Scala программу без необходимости развертывания отдельных кластеров. + +### Когда выбрать Apache Flink +- Если вы работаете с потоковой и пакетной обработкой данных, где источники и приемники могут быть не только Kafka, но и другие системы (например, HDFS, базы данных). +- Для сложных задач обработки событий, требующих управления состоянием, временных окон, аналитики и восстановления после сбоев. +- Если ваш проект требует высокой производительности, гибкости, точных гарантий доставки и распределенной обработки в кластере. + +### Заключение +- **Kafka Streams** — это идеальный выбор, если ваша инфраструктура уже основана на Kafka, и вам нужна быстрая и легковесная обработка потоков данных. +- **Apache Flink** — это мощный инструмент для сложных аналитических задач, потоковой обработки данных в режиме реального + времени с поддержкой сложных схем обработки, который предоставляет больше возможностей для работы с разнообразными источниками данных. + +[к оглавлению](#apache-kafka) + +## В чем разница между Kafka Consumer и Kafka Stream? + +**Kafka Consumer** - это клиент, который читает данные из топика и производит некоторую обработку. Обычно используется для +простых сценариев получения данных. **Kafka Stream**, с другой стороны, более подвинутый клиент, который может потреблять, +обрабатывать и класть данные обратно в Kafka. Он предоставляет DSL для сложных операций потоковой обработки, таких как +фильтрация, преобразование, агрегирование и объединение потоков. + +[к оглавлению](#apache-kafka) + +## В чем разница между Kafka и Flume? + +**Apache Kafka** и **Apache Flume** — это два популярных инструмента для обработки и передачи данных, однако они имеют +разные цели и архитектуры. Вот основные различия между ними: + +### 1. **Назначение и использование** +- **Kafka**: Это распределенная платформа для потоковой передачи данных, которая обеспечивает высокую пропускную способность +и низкую задержку для обработки больших объемов данных. Kafka используется для создания стриминговых приложений и обработки +данных в реальном времени. Она может быть использована для передачи логов, событий, метрик и других данных, требующих +высокой доступности и масштабируемости. +- **Flume**: Это распределенная система для сбора, агрегации и передачи логов и событий. Flume обычно используется для +доставки логов с серверов в HDFS, HBase или другие системы хранения данных. Его основное предназначение — это сбор данных +из различных источников (например, лог-файлов) и передача их в системы хранения или аналитики. + +### 2. **Архитектура** +- **Kafka**: В Kafka данные отправляются в топики и партиции, которые могут быть независимо прочитаны несколькими потребителями. +Kafka ориентирована на высокую пропускную способность и масштабируемость. Это решает задачу обработки потоковых данных и +событий в реальном времени. +- **Flume**: Flume состоит из **источников (sources)**, **каналов (channels)** и **приемников (sinks)**. Источник получает +данные, канал их буферизует, а приемник отправляет их в конечную систему. Flume использует систему "event-based" и часто +применяется для сбора логов. + +### 3. **Хранение данных** +- **Kafka**: Kafka сохраняет сообщения на диске в течение длительного времени (по умолчанию — до 7 дней) в топиках. +Потребители могут читать данные в любой момент времени, и Kafka поддерживает концепцию **сохранения и ретрансляции данных**. +- **Flume**: Flume не имеет встроенного механизма долговременного хранения. Он просто передает данные в назначенные места +хранения (например, HDFS). Данные в Flume не сохраняются долго, и если система хранения не доступна, они теряются. + +### 4. **Производительность** +- **Kafka**: Kafka предназначен для работы с высокими объемами данных. Он поддерживает масштабируемость как по производителям, +так и по потребителям, и может обрабатывать миллионы сообщений в секунду с минимальной задержкой. +- **Flume**: Flume может быть менее масштабируемым по сравнению с Kafka и больше ориентирован на сбор логов и событий +с различных источников. Хотя Flume тоже может обрабатывать большие объемы данных, он не предназначен для работы с +такими большими потоками, как Kafka. + +### 5. **Использование и кейсы** +- **Kafka**: Используется для стриминга данных, аналитики в реальном времени, интеграции различных систем, работы с +большими данными и построения событийных приложений. +- **Flume**: Используется для сбора, агрегации и передачи логов и событий в системы хранения, такие как HDFS, HBase, +или внешние системы. Это идеальный выбор для организации потоков логирования и мониторинга. + +### 6. **Поддержка и интеграция** +- **Kafka**: Kafka поддерживает широкий спектр интеграций и может быть использован с различными системами для построения +распределенных приложений и аналитических решений. +- **Flume**: Flume ориентирован на интеграцию с Hadoop-экосистемой, и основное его использование — это интеграция с HDFS, +HBase и другими хранилищами данных в этой экосистеме. + +### 7. **Потребительская модель** +- **Kafka**: Kafka поддерживает много потребителей, которые могут читать из одного и того же топика независимо, а также +возможность **повторного прочтения данных**. +- **Flume**: Flume имеет фиксированную схему доставки данных и не поддерживает такую гибкость, как Kafka в части потребителей и обработки. + +### 8. **Гарантии доставки** +- **Kafka**: Kafka поддерживает **гарантии доставки** с различными уровнями подтверждения (acknowledgment), а также может +обеспечивать **доставку сообщений точно один раз** (exactly-once semantics). +- **Flume**: Flume обеспечивает базовые гарантии доставки, но они менее строгие, чем у Kafka, и больше ориентированы на +устойчивость к сбоям, а не на гарантированную доставку. + +[к оглавлению](#apache-kafka) + +## В чем разница между Kafka и RabbitMQ? + +**RabbitMQ** и **Apache Kafka** — это две популярные системы обмена сообщениями, каждая из которых имеет свои особенности +и используется для разных типов приложений. Вот основные различия между ними: + +### 1. **Архитектура** +- **RabbitMQ** использует **очереди сообщений**. Сообщения отправляются в очередь, и один потребитель извлекает сообщение +из очереди для обработки. +- **Apache Kafka** использует **топики и партиции**. Сообщения отправляются в топики, которые могут быть разделены на +партиции, и несколько потребителей могут читать эти сообщения в любом порядке. Kafka ориентирован на большие потоки данных и масштабируемость. + +### 2. **Модель доставки сообщений** +- **RabbitMQ**: Сообщения передаются в очереди, и каждый потребитель получает одно сообщение. Сообщения могут быть +подтверждены (acknowledged) или отклонены (rejected). RabbitMQ гарантирует, что сообщение будет доставлено хотя бы одному потребителю. +- **Kafka**: Сообщения сохраняются в топиках на длительный срок, и потребители могут читать их в любом порядке. Kafka +гарантирует доставку сообщений всем потребителям, если они подписаны на топик, и может позволить многократное чтение старых сообщений. + +### 3. **Гарантии доставки** +- **RabbitMQ**: Предоставляет подтверждения доставки и может повторно отправить сообщение, если потребитель не подтвердил +его получение. Можно настроить разные уровни надежности (например, за счет использования подтверждений или транзакций). +- **Kafka**: Сообщения сохраняются на диске, что позволяет потребителям считывать их в любое время. Kafka гарантирует +доставку сообщений при определенной конфигурации репликации и сохранения. + +### 4. **Производительность и масштабируемость** +- **RabbitMQ**: Лучше подходит для небольших и средних систем, где требуется высокая надежность и гарантированная доставка. +Он поддерживает **горизонтальное масштабирование**, но требует дополнительных усилий для настройки и управления. +- **Kafka**: Отличается высокой **производительностью** и возможностью обработки больших объемов данных. Kafka легко +масштабируется за счет **партиционирования** и репликации данных. + +### 5. **Потребительская модель** +- **RabbitMQ**: Один потребитель получает одно сообщение. Если потребитель не успевает обработать сообщение, оно может быть повторно отправлено. +- **Kafka**: Потребители могут читать сообщения независимо друг от друга. Kafka сохраняет все сообщения в топиках, +и потребители могут читать их в любое время. Kafka также поддерживает концепцию **групп потребителей**, где каждый +потребитель группы обрабатывает разные партиции. + +### 6. **Использование и кейсы** +- **RabbitMQ**: Идеален для обработки запросов и ответов, распределенных приложений, микросервисов с гарантией доставки, +бизнес-процессов с очередями задач. +- **Kafka**: Используется для обработки потоков данных, интеграции с большими данными, записи журналов, мониторинга, +обработки событий в реальном времени и сохранения больших объемов данных для последующего анализа. + +### 7. **Производители и потребители** +- **RabbitMQ**: Один производитель отправляет сообщения в очередь, и несколько потребителей могут обрабатывать эти сообщения. +- **Kafka**: Множество производителей могут отправлять сообщения в топики, и несколько потребителей могут читать их +одновременно, поддерживая масштабируемость. + +### 8. **Сообщения и хранение** +- **RabbitMQ**: Сообщения удаляются из очереди после их обработки потребителем. Хранение сообщений обычно краткосрочное. +- **Kafka**: Сообщения сохраняются на диске в топиках до тех пор, пока не истечет срок хранения (по конфигурации). +Это позволяет повторно читать данные. + +[к оглавлению](#apache-kafka) diff --git a/reactive.md b/reactive.md new file mode 100644 index 0000000..17f0ef8 --- /dev/null +++ b/reactive.md @@ -0,0 +1,584 @@ +[Вопросы для собеседования](README.md) + +# Реактивное программирование +* [Что такое реактивное программирование и чем оно отличается от процедурного программирования?](#что-такое-реактивное-программирование-и-чем-оно-отличается-от-процедурного-программирования) +* [Объясните концепцию потоков данных в реактивном программировании](#объясните-концепцию-потоков-данных-в-реактивном-программировании) +* [Что такое паттерн Observer и как он лежит в основе реактивного программирования?](#что-такое-паттерн-observer-и-как-он-лежит-в-основе-реактивного-программирования) +* [Опишите роль Observable и Observer в реактивном программировании](#опишите-роль-observable-и-observer-в-реактивном-программировании) +* [Что такое backpressure в контексте реактивного программирования?](#что-такое-backpressure-в-контексте-реактивного-программирования) +* [Объясните разницу между Hot и Cold Observable](#объясните-разницу-между-hot-и-cold-observable) +* [Какова роль Подписки в реактивном программировании?](#какова-роль-подписки-в-реактивном-программировании) +* [Как отписаться от потока для предотвращения утечки памяти?](#как-отписаться-от-потока-для-предотвращения-утечки-памяти) +* [Какие есть операторы в Project Reactor и для чего они используются?](#какие-есть-операторы-в-project-reactor-и-для-чего-они-используются) + +## Что такое реактивное программирование и чем оно отличается от процедурного программирования? + +### Основные принципы + +**Процедурное программирование** - это подход, при котором программа состоит из последовательности инструкций, выполняемых одна за другой. +В процедурном программировании акцент делается на определении функций и процедур, которые выполняют определенные задачи. +Этот подход хорошо подходит для решения задач, требующих четкого алгоритма действий. + +**Реактивное программирование**, с другой стороны, фокусируется на обработке потоков данных и событий. +В реактивном программировании программа реагирует на изменения в данных или событиях, происходящих в реальном времени. +Реактивное программирование позволяет создавать более гибкие и эффективные системы, способные адаптироваться к изменениям +в данных и событиях без необходимости явного управления асинхронными задачами. + +Процедурное программирование представляет данные в виде единственного значения, хранящегося в переменной. +Реактивное программирование представляет собой непрерывный поток данных, на который могут подписаться несколько наблюдателей. + +[к оглавлению](#реактивное-программирование) + +## Объясните концепцию потоков данных в реактивном программировании + +Концепция потоков данных в реактивном программировании заключается в представлении данных как непрерывно изменяющегося потока, +который автоматически распространяется через систему. Это отличается от традиционного императивного программирования, где данные +обычно обрабатываются как отдельные элементы и изменения должны явно инициироваться кодом. + +В реактивном программировании данные представлены как поток событий, каждое из которых может содержать новое значение или изменение состояния. +Система автоматически реагирует на эти изменения, обновляя свое состояние соответствующим образом. Это позволяет создавать +приложения, которые эффективно обрабатывают большие объемы данных и быстро реагируют на изменения, происходящие в реальном времени. + +Примером может служить система управления данными, которая автоматически обновляет пользовательский интерфейс при изменении данных в базе. +В таком случае, пользовательский интерфейс будет автоматически обновляться каждый раз, когда происходит изменение данных, +без необходимости явного запроса на обновление со стороны пользователя. + +Реактивное программирование основано на шаблоне Наблюдатель (Observer) + +### Ключевые компоненты +* **Наблюдаемый (Observable)** представляет источник данных. При изменении его состояния (или при создании новых данных) изменения передаются наблюдателям +* **Наблюдатель (Observer)** подписывается на Observable и получает уведомления о любых изменениях состояния или новых данных +* **Подписка (Subscription)** устанавливает взаимосвязь между наблюдаемым и наблюдателем. Подписка может быть "один к одному" или "один ко многим" +* **Операторы (Operators)** часто называемые функциями преобразования, они позволяют изменять или адаптировать данные из наблюдаемого объекта до того, как они попадут к наблюдателю +* **Планировщики (Schedulers)** помогают управлять временем и порядком выполнения операций в таких сценариях, как фоновая работа и обновления пользовательского интерфейса +* **Subjects** совмещают роли объекта наблюдения и наблюдателя. Это могут быть как источники данных, так и потребители данных + +### Процесс передачи данных +* **Эмиссия (Emission)** - данные создаются в наблюдаемом объекте и отправляются его наблюдателям +* **Фильтрация (Filtering)** - операторы могут просматривать входящие данные, пересылая только те, которые соответствуют определенным критериям +* **Трансформация (Transformation)** - данные изменяются — например, путем их мапинга перед передачей наблюдателю +* **Нотификация (Notification)** - информирование наблюдателей при поступлении новых данных + +### Основные характеристики потоков +* **Непрерывность (Continuous)** - поток данных сохраняется, что позволяет осуществлять взаимодействие в режиме реального времени +* **Асинхронность (Asynchronous)** - не гарантируется, что события будут происходить в определенном порядке, что позволяет выполнять неблокирующие операции +* **Однонаправленность (One-directional)** - данные передаются от Observable к его подписчикам, обеспечивая однонаправленный поток + +### Типы потоков по количеству подписчиков +* **Unicast Streams** - у каждого наблюдателя есть эксклюзивное подключение к наблюдаемому источнику +* **Broadcast Streams** - позволяет нескольким наблюдателям подписаться на один объект наблюдения. Каждый наблюдатель +получает полный набор данных, что может быть проблематично, если речь идет о конфиденциальности данных + +### Типы потоков по поведению +* **Hot Observable** - эти последовательности передают данные независимо от присутствия наблюдателя. +Если подписывается новый наблюдатель, он начинает получать данные с точки подписки +* **Cold Observable** - здесь передача данных начинается только после подписки. Любой новый наблюдатель получит данные с самого начала + +### Backpressure +Данный механизм регулируют скорость, с которой данные публикуются в поток. Это необходимо, чтобы справиться с потенциальным +переполнением или узкими местами из-за различий в скоростях обработки данных. + +Например, в RxJava интерфейсы `Observable` и `Flowable` отличаются тем, что последний включает поддержку backpressure. +С `Flowable` можно использовать настраиваемую стратегию backpressure, чтобы контролировать скорость публикации Observable относительно скорости потребления Subscriber + +### Практическое применение +Будь то обработка асинхронных вызовов API или управление вводом данных пользователем, потоки данных обеспечивают надежную +и гибкую основу для многих повседневных задач программирования. + +Можно использовать ряд операторов, таких как `map`, `filter`, `debounce`, и `throttle`, для преобразования данных +и манипулирования ими, исходя из конкретных требований. + +Широкое внедрение реактивных библиотек, таких как RxJava для Android или RxJS для Web, подчеркивает +полезность потоков данных в современной разработке программного обеспечения. + +[к оглавлению](#реактивное-программирование) + +## Что такое паттерн Observer и как он лежит в основе реактивного программирования? + +Паттерн Observer, также известный как Publish/Subscribe или Event Listener, является поведенческим шаблоном проектирования, +который позволяет объектам следить за изменениями в других объектах и реагировать на них. Это один из самых популярных и +полезных шаблонов, который упрощает код и повышает его гибкость. В основе реактивного программирования лежит идея о том, +что система должна реагировать на изменения внешних источников данных или событий. Паттерн Observer идеально подходит для +реализации этой концепции, поскольку он позволяет объектам автоматически получать уведомления об изменениях в других объектах +и соответствующим образом обновлять свое состояние. + +Пример использования паттерна Observer в веб-разработке может включать систему уведомлений для пользователей. Когда новая +статья добавляется на сайт, все подписчики должны быть уведомлены об этом. В этом случае, класс Subject будет отвечать +за управление подписчиками и их уведомление об изменениях, а класс Observer будет представлять собой простого наблюдателя, +который выводит сообщение о новой статье при получении уведомления, таким образом обеспечивается **слабую связность (loose coupling)**. + +Применение паттерна Observer в веб-разработке может быть полезно для обработки событий пользовательского интерфейса, +реализации системы уведомлений для пользователей, отслеживания изменений состояния приложения и реагирования на них. +Однако важно помнить, что чрезмерное использование паттерна может привести к усложнению кода и затруднить отладку. + +### Ключевые компоненты +* **Subject** - источник данных или событий. Наблюдатели подписываются на Subject для получения уведомлений об изменениях +* **Observer** - наблюдатель получает уведомления, когда состояние Subject изменяется + +В реактивной конфигурации Subject отвечает за публикацию изменений, а Observers подписываются на эти изменения. +Это исключает явное обращение к источникам данных и подчеркивает **модель потока данных (datastream model)**. + +### Пример кода паттерна Observer + +```java +import java.util.ArrayList; +import java.util.List; + +interface Observer { + void update(); +} + +class Subject { + private List observers = new ArrayList<>(); + private int state; + + public int getState() { + return state; + } + + public void setState(int state) { + this.state = state; + notifyAllObservers(); + } + + public void attach(Observer observer) { + observers.add(observer); + } + + public void notifyAllObservers() { + for (Observer observer : observers) { + observer.update(); + } + } +} + +class ConcreteObserver implements Observer { + private String name; + private Subject subject; + + public ConcreteObserver(String name, Subject subject) { + this.name = name; + this.subject = subject; + } + + @Override + public void update() { + System.out.println("Observer " * name * " updated. New state: " * subject.getState()); + } +} + +public class Main { + public static void main(String[] args) { + Subject subject = new Subject(); + ConcreteObserver observer1 = new ConcreteObserver("One", subject); + ConcreteObserver observer2 = new ConcreteObserver("Two", subject); + + subject.attach(observer1); + subject.attach(observer2); + + subject.setState(5); + } +} +``` +В данном примере Subject ведет список подписавшихся Observers и уведомляет их при изменении его состояния. + +[к оглавлению](#реактивное-программирование) + +## Опишите роль Observable и Observer в реактивном программировании + +**Наблюдатели** являются потребителями данных, в то время как **наблюдаемые** являются источником или производителем данных. + +### Ключевые концепции +* **Наблюдаемый (Observable)** - передает данные или сигналы, которые могут быть любых типов, включая пользовательские события +* **Наблюдатель (Observer)** - получает уведомление, когда Observable отправляет данные +* **Подписка (Subscription)** - связующее звено между объектом наблюдения и наблюдателем +* **Операторы (Operators)** - позволяют преобразовывать, фильтровать, комбинировать или обрабатывать поток данных, +публикуемый наблюдаемым объектом, до того, как он достигнет наблюдателя + +### Пример кода: Observable и Observer + +### Java 9* + +**Определить Subscriber** + +```java +import java.util.concurrent.Flow; + +public class SimpleSubscriber implements Flow.Subscriber { + private Flow.Subscription subscription; + + @Override + public void onSubscribe(Flow.Subscription subscription) { + this.subscription = subscription; + subscription.request(1); // Запрашиваем первый элемент + } + + @Override + public void onNext(String item) { + System.out.println("Received: " * item); + subscription.request(1); // Запрашиваем следующий элемент + } + + @Override + public void onError(Throwable throwable) { + System.err.println("Error: " * throwable.getMessage()); + } + + @Override + public void onComplete() { + System.out.println("All items received"); + } +} +``` + +**Определить Publisher** + +```java +import java.util.concurrent.Flow; +import java.util.concurrent.SubmissionPublisher; +import java.util.concurrent.TimeUnit; + +public class SimplePublisher { + + public static void main(String[] args) throws InterruptedException { + // Создаем издателя + SubmissionPublisher publisher = new SubmissionPublisher<>(); + + // Создаем подписчика + SimpleSubscriber subscriber = new SimpleSubscriber(); + + // Регистрируем подписчика в издателе + publisher.subscribe(subscriber); + + // Публикуем элементы + System.out.println("Publishing data items..."); + String[] items = {"item1", "item2", "item3"}; + for (String item : items) { + publisher.submit(item); + TimeUnit.SECONDS.sleep(1); + } + + // Закрываем издателя + publisher.close(); + + // Ждем некоторое время, чтобы подписчик обработал все элементы + TimeUnit.SECONDS.sleep(3); + } +} +``` + +### Project Reactor + +```java +import reactor.core.publisher.Flux; + +public class ReactorExample { + + public static void main(String[] args) { + // Создаем Flux для публикации данных + Flux flux = Flux.just("Hello", "World", "From", "Reactor"); + + // Подписываемся на Flux и выводим каждый элемент + flux.subscribe( + item -> System.out.println("Received: " * item), + error -> System.err.println("Error: " * error), + () -> System.out.println("All items received") + ); + } +} +``` + +[к оглавлению](#реактивное-программирование) + +## Что такое backpressure в контексте реактивного программирования? + +**Backpressure** - это концепция в реактивном программировании, которая имеет дело с ситуацией, когда производитель (или издатель) +генерирует данные быстрее, чем потребитель (или подписчик) может их обработать. Управление backpressure позволяет системе +оставаться отзывчивой и не перегружаться. + +### Ключевые концепции backpressure + +* **Producer (Publisher)** - компонент, который производит данные +* **Consumer (Subscriber)** - компонент, который потребляет данные +* **Flow Control** - механизм, который гарантирует, что производитель не перегрузит потребителя слишком большим объемом данных + +### Почему важно backpressure? + +В реактивной системе, если потребитель не выдерживает скорость, с которой производитель производит данные, это может привести к следующим проблемам: +* **Переполнение памяти (Memory Overflow)** - необработанные элементы могут накапливаться в памяти, что приводит к ошибкам нехватки памяти +* **Увеличение задержки (Latency Increase)** - система может перестать отвечать по мере роста очереди необработанных элементов +* **Исчерпание ресурсов (Resource Exhaustion)** - системные ресурсы (процессор, память) могут быть исчерпаны, что приводит к снижению производительности или сбоям + +### Стратегии управления backpressure в Project Reactor + +* **Буферизация (Buffering)** - входящие элементы хранятся в буфере до тех пор, пока потребитель не сможет их обработать + * **onBackpressureBuffer(100)** + +```java +import reactor.core.publisher.Flux; +import reactor.core.scheduler.Schedulers; + +public class BackpressureExample { + + public static void main(String[] args) { + Flux flux = Flux.range(1, 1000) + .onBackpressureBuffer(100); // Устанавливаем стратегию backpressure + + flux.publishOn(Schedulers.boundedElastic()) + .subscribe( + item -> { + try { + Thread.sleep(10); // Имитируем медленного потребителя + System.out.println("Received: " * item); + } catch (InterruptedException e) { + e.printStackTrace(); + } + }, + error -> System.err.println("Error: " * error), + () -> System.out.println("All items received") + ); + } +} +``` + +* **Отбрасывание (Dropping)** - отбрасывает элементы, если потребитель не может их обработать + * **onBackpressureDrop()** +* **Новейшие элементы (Latest)** - сохраняет только последние элементы, удаляя предыдущие + * **onBackpressureLatest()** +* **Ошибка (Error)** - сигнализирует об ошибке, когда backpressure не может быть обработано + * **onBackpressureError()** +* **Контроль скорости запроса (Control Request Rate)**: явно контролирует скорость, с которой потребитель запрашивает элементы у производителя + * **request(n)** - метод в реализации Subscriber для управления потоком + +```java +import reactor.core.publisher.BaseSubscriber; +import reactor.core.publisher.Flux; + +public class ControlledRequestExample { + + public static void main(String[] args) { + Flux flux = Flux.range(1, 1000); + + flux.subscribe(new BaseSubscriber() { + @Override + private void hookOnSubscribe(Subscription subscription) { + request(1); // Запрашиваем один элемент при подписке + } + + @Override + private void hookOnNext(Integer value) { + try { + Thread.sleep(10); // Эмитируем медленный процесс + System.out.println("Received: " * value); + request(1); // Запрашиваем следующий элемент после обработки текущего + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + @Override + private void hookOnComplete() { + System.out.println("All items received"); + } + + @Override + private void hookOnError(Throwable throwable) { + System.err.println("Error: " * throwable.getMessage()); + } + }); + } +} +``` + +[к оглавлению](#реактивное-программирование) + +## Объясните разницу между Hot и Cold Observable + +В реактивном программировании термины hot и cold observable (или издатели в контексте Project Reactor) описывают, как они +создают потоки данных и обрабатывают их. Основное различие между ними заключается в том, как они обрабатывают подписки +и когда начинают выдавать элементы. + +### Cold Observables + +Начинают выдавать элементы только тогда, когда подписчик подписывается на них. Каждый подписчик получает всю последовательность +элементов с самого начала. Это означает, что каждый раз, когда подписывается новый подписчик, Observable воспроизводит всю последовательность элементов. + +#### Характеристики + +* **Ленивое исполнение (Lazy Evaluation)** - данные не производятся, пока не будет хотя бы одного подписчика +* **Множественные подписки (Multiple Subscriptions)** - каждый подписчик получает полную последовательность элементов независимо от других подписчиков +* **Воспроизводимость (Reproducibility)** - каждый подписчик видит одну и ту же последовательность элементов с самого начала, что делает поток воспроизводимым + +#### Пример + +```java +import reactor.core.publisher.Flux; + +public class ColdObservableExample { + + public static void main(String[] args) { + Flux coldFlux = Flux.just("A", "B", "C", "D") + .doOnSubscribe(subscription -> System.out.println("Subscribed to Cold Observable")); + + // First subscription + coldFlux.subscribe(item -> System.out.println("Subscriber 1: " * item)); + + // Second subscription + coldFlux.subscribe(item -> System.out.println("Subscriber 2: " * item)); + } +} +``` + +### Hot Observables + +Начинают выдавать элементы независимо от наличия подписчиков. Подписчики получают только те элементы, которые были отправлены +после их подписки. Это означает, что если подписчик подпишется с опозданием, он пропустит элементы, которые были отправлены до подписки. + +#### Характеристики + +* **Жадное исполнение (Eager Evaluation)** - публикация элементов происходит сразу после создания, даже если подписчиков нет +* **Общий поток данных (Shared Data Stream)** - все подписчики используют один и тот же источник данных и получают элементы, отправленные после подписки +* **Невоспроизводимость (Non-Reproducibility)** - подписчики могут видеть разные части последовательности в зависимости от того, когда они подписались + +#### Пример + +```java +import reactor.core.publisher.Flux; +import reactor.core.publisher.ConnectableFlux; + +import java.time.Duration; + +public class HotObservableExample { + + public static void main(String[] args) throws InterruptedException { + ConnectableFlux hotFlux = Flux.just("A", "B", "C", "D") + .delayElements(Duration.ofMillis(500)) + .publish(); + + // Запускаем публикацию элементов + hotFlux.connect(); + + // Первая подписка (начинает получать данные немедленно) + hotFlux.subscribe(item -> System.out.println("Subscriber 1: " * item)); + Thread.sleep(750); // Ждем отправку элементов + + // Вторая подписка (пропускает первый элемент) + hotFlux.subscribe(item -> System.out.println("Subscriber 2: " * item)); + + // Останавливаем работу основного потока, чтобы увидеть выходные данные + Thread.sleep(2000); + } +} + +``` + +### Ключевые отличия + +| Критерий | Hot Observables | Cold Observables | +|----------------|--------------------------------------------------------|-------------------------------------------------------------------------------------------| +| Обмен данными | Один поток для всех подписчиков | У каждого подписчика свой независимый поток | +| Время подписки | Получает данные в зависимости от времени подписки | Получает все данные, даже если подпишется позже | +| Жизненный цикл | Работает независимо от подписок | Запускает генерацию данных только, когда есть подписка | +| Асинхронность | Может генерировать и передавать данные без подписчиков | Передача данных начинается после подписки, что часто приводит к синхронной передаче | + +### Практическое применение + +Cold Observables полезны для: +* Данные, которые необходимо воспроизвести для каждого подписчика, такие как чтение из файла, выполнение HTTP-запроса или +генерация новой случайной последовательности. +* Сценарии, в которых последовательность элементов должна быть согласованной и воспроизводимой для каждого подписчика. + +Hot Observables полезны для: +* Данные, которые создаются непрерывно и которыми необходимо делиться между несколькими подписчиками, такие как события +мыши, обновления цен на акции или данные датчиков. +* Сценарии, в которых подписчики должны получать только текущие и будущие события, а не прошлые. + +[к оглавлению](#реактивное-программирование) + +## Какова роль Подписки в реактивном программировании? + +Подписка абстрагирует от того, как данные получаются или генерируются + +### Основные функции +`Subscription` обычно предлагает два основных метода: +* **Request** - информирует источник данных о количестве элементов, которые потребитель готов получить +* **Cancel** - останавливает поток данных, освобождая любые ресурсы, такие как обработчики файлов или сетевые подключения + +### Общая концепция +`Subscription` - интерфейс, действующий как соглашение между источником данных и потребителем данных. Он обеспечивает +передачу данных с учетом управления потоком, backpressure и ресурсами. + +### Управление backpressure +Реализации интерфейса `Publisher` в реактивных потоках оценивают готовность подписчика обрабатывать входящие данные +с учетом текущего состояния потока данных. Этот механизм, известный как backpressure, направлен на предотвращение перегрузки +путем указания источнику данных соответствующим образом адаптировать скорость передачи данных. + +`request` - метод интерфейса `Subscription` является основным каналом, по которому подписчик сообщает источнику данных +о своей текущей возможности принимать данные, тем самым регулируя backpressure. + +### Управление ресурсами +Для определенных источников данных, таких как файлы, потоки ввода-вывода или базы данных, могут потребоваться определенные +ресурсы. Интерфейс `Subscription` предоставляет средства для освобождения этих ресурсов, когда передача данных больше не требуется. + +После вызова метода `cancel` источник данных может предпринять соответствующие действия, такие как закрытие файла или +прекращение сетевого взаимодействия. + +[к оглавлению](#реактивное-программирование) + +## Как отписаться от потока для предотвращения утечки памяти? + +* Использовать `Disposable`: получить Disposable из подписки и вызвать метод `dispose`, чтобы отменить ее +* Использовать `BaseSubscriber`: наследоваться от класса BaseSubscriber и вызвать потом метод `dispose` +* Использовать операторы при создании потока: + * `take`: автоматически отменит подписку после получения некоторого количества элементов + * `timeout`: автоматически отменяет подписку, если в течение указанного срока не будет отправлено никаких элементов. + +Best Practice: стоит настраивать отмену подписки, связывая ее с жизненным циклом какого-либо компонента + +[к оглавлению](#реактивное-программирование) + +## Какие есть операторы в Project Reactor и для чего они используются? + +Операторы в реактивном программировании служат для создания, преобразования, фильтрации или объединения различных потоков данных. + +### Операторы создания +* `just`: создает Flux/Mono, который публикует указанные элементы +* `fromArray`: создает поток, который генерирует элементы из массива +* `fromIterable`: создает Flux, который генерирует элементы из итерируемого объекта +* `fromStream`: создает Flux из stream Java +* `fromCallable`: создает Mono из Callable +* `fromRunnable`: создает Mono из Runnable +* `fromSupplier`: создает Mono от Supplier +* `range`: создает FLux, который выдает диапазон целых чисел +* `interval`: создает Flux, который выдает long значения через регулярные промежутки времени +* `empty`: создает пустой Flux/Mono, который завершается немедленно +* `error`: создает Flux/Mono, который немедленно сигнализирует об ошибке +* `never`: создает Flux/Mono, который не создает какой-либо элемент и не завершается +* `defer`: создает новый Flux/Mono для каждой подписки + +### Операторы трансформации +* `map`: преобразует объекты, публикуемые Flux/Mono. +* `flatMap`: преобразует каждый элемент в поток, а затем соединяет их в один поток +* `concatMap`: как flatMap, но поддерживает порядок публикуемых элементов +* `switchMap`: когда он получает данные от нового потока, то сразу отписывается от предыдущего и подписывается на новый +* `scan`: Накапливайте состояние каждого элемента. +* `buffer`: собирает элементы в как список внутри Flux +* `window`: публикует коллекцию элементов как Flux (окно фиксированного размера) внутри Flux +* `groupBy`: группирует элементы по ключу + +### Операторы фильтрации +* `filter`: пропускает только те элементы, которые удовлетворяют условию +* `distinct`: пропускает только уникальные элементы +* `take`: ограничивает количество элементов, публикуемых потоком +* `takeWhile`: публикует элементы, пока условие истинно +* `takeUntil`: публикует элементы, пока условие не станет истинно +* `skip`: пропускает определенное количество элементов в начале потока +* `skipWhile`: пропускает элементы, пока условие истинно +* `skipUntil`: пропускает элементы, пока условие не станет истинно + +### Операторы объединения +* `merge`: объединяет потоки в один параллельно, сохраняя порядок элементов (активная подписка) +* `concat`: объединяет потоки последовательно, ожидая завершения первого потока, затем второго и так далее (отложенная подписка) +* `zip`: объединяет элементы из нескольких потоков в пары +* `combineLatest`: объединяет указанным способом последние значения из нескольких потоков +* `startWith`: добавляет в начало потока к исходным элементам новые элемент(ы) From f335ccd4c9db631ce7bd1df01e6c33737059a794 Mon Sep 17 00:00:00 2001 From: Enchased Horse Date: Wed, 20 Nov 2024 08:07:48 +0300 Subject: [PATCH 2/7] Update README.md Reorder content items --- README.md | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/README.md b/README.md index dc9ccc8..dff913e 100644 --- a/README.md +++ b/README.md @@ -24,8 +24,8 @@ + [Основы HTML](#Основы-html) ![icon][done] + [Основы CSS](#Основы-css) ![icon][done] + [Основы Web](#Основы-web) ![icon][done] -+ [Дополнительные материалы](#Дополнительные-материалы) ![icon][done] + [Apache Kafka](#apache-kafka) ![icon][done] ++ [Дополнительные материалы](#Дополнительные-материалы) ![icon][done] [done]:done.png @@ -761,19 +761,6 @@ [к оглавлению](#Вопросы-для-собеседования-на-java-developer) -## Дополнительные материалы -+ [4 толковых канала на Youtube про технические собеседования](https://habr.com/ru/post/454264/) -+ [A list of fancy questions I've been asked during the interviews I had](https://github.com/d1mnewz/interviews) -+ [Job interview in English: как готовиться и что отвечать](https://dou.ua/lenta/articles/interview-in-english/) -+ [Senior Engineer в поисках работы. О задачах на технических собеседованиях и теоретических вопросах](https://habr.com/ru/post/442442/) -+ [What to ask an interviewer during a tech interview](https://hackernoon.com/what-to-ask-an-interviewer-during-a-tech-interview-865a293e548c) -+ [Spring Boot Interview Questions](https://www.baeldung.com/spring-boot-interview-questions) -+ [Top Spring Framework Interview Questions](https://www.baeldung.com/spring-interview-questions) -+ [Spring Interview Questions](https://www.interviewbit.com/spring-interview-questions/) -+ [Hibernate Interview Questions](https://www.adaface.com/blog/hibernate-interview-questions/) - -[к оглавлению](#Вопросы-для-собеседования-на-java-developer) - ## Apache Kafka * [Что такое Apache Kafka?](kafka.md#что-такое-apache-kafka) @@ -837,5 +824,18 @@ [к оглавлению](#Вопросы-для-собеседования-на-java-developer) +## Дополнительные материалы ++ [4 толковых канала на Youtube про технические собеседования](https://habr.com/ru/post/454264/) ++ [A list of fancy questions I've been asked during the interviews I had](https://github.com/d1mnewz/interviews) ++ [Job interview in English: как готовиться и что отвечать](https://dou.ua/lenta/articles/interview-in-english/) ++ [Senior Engineer в поисках работы. О задачах на технических собеседованиях и теоретических вопросах](https://habr.com/ru/post/442442/) ++ [What to ask an interviewer during a tech interview](https://hackernoon.com/what-to-ask-an-interviewer-during-a-tech-interview-865a293e548c) ++ [Spring Boot Interview Questions](https://www.baeldung.com/spring-boot-interview-questions) ++ [Top Spring Framework Interview Questions](https://www.baeldung.com/spring-interview-questions) ++ [Spring Interview Questions](https://www.interviewbit.com/spring-interview-questions/) ++ [Hibernate Interview Questions](https://www.adaface.com/blog/hibernate-interview-questions/) + +[к оглавлению](#Вопросы-для-собеседования-на-java-developer) + ## Источники + [Вопросы на собеседование Junior Java Developer](https://jsehelper.blogspot.ru) From d139504dde28c821b3e2e9a34e074d130c073231 Mon Sep 17 00:00:00 2001 From: OleksandrDraha <79225501+OleksandrDraha@users.noreply.github.com> Date: Wed, 20 Nov 2024 07:09:06 +0200 Subject: [PATCH 3/7] Update oop.md (#123) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Looks like there is an inconsistency between examples of paragraphs ("инкапсуляция" and "наследование") as AbstractPhone class was not defined but was inherited by WirelessPhone class (and another following classes) with overriding of methods call() and ring() --- oop.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/oop.md b/oop.md index baa3e7b..8464512 100644 --- a/oop.md +++ b/oop.md @@ -50,11 +50,11 @@ __Инкапсуляция__ – это свойство системы, поз Пример: ```java -public class SomePhone { +public class AbstractPhone { private int year; private String company; - public SomePhone(int year, String company) { + public AbstractPhone (int year, String company) { this.year = year; this.company = company; } From d0d25064c187f0cf62c47b77921559a6ea699eab Mon Sep 17 00:00:00 2001 From: JackieL Date: Mon, 13 Jan 2025 17:33:40 +0800 Subject: [PATCH 4/7] =?UTF-8?q?=D0=94=D0=BE=D0=B1=D0=B0=D0=B2=D0=B8=D1=82?= =?UTF-8?q?=D1=8C=20=D0=BD=D0=BE=D0=B2=D1=8B=D0=B9=20=D1=80=D0=B5=D1=81?= =?UTF-8?q?=D1=83=D1=80=D1=81=20=D0=B2=20=D0=9F=D0=BE=D0=BF=D1=83=D0=BB?= =?UTF-8?q?=D1=8F=D1=80=D0=BD=D1=8B=D0=B5=20(#164)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.md b/README.md index dff913e..17ff0b3 100644 --- a/README.md +++ b/README.md @@ -834,6 +834,8 @@ + [Top Spring Framework Interview Questions](https://www.baeldung.com/spring-interview-questions) + [Spring Interview Questions](https://www.interviewbit.com/spring-interview-questions/) + [Hibernate Interview Questions](https://www.adaface.com/blog/hibernate-interview-questions/) ++ [Hibernate Interview Questions](https://www.adaface.com/blog/hibernate-interview-questions/) ++ [Java Interview Questions](https://labex.io/interview-questions/java) [к оглавлению](#Вопросы-для-собеседования-на-java-developer) From 5a8b16977fe1755fec66da9bfbcbbe428ae8dc8f Mon Sep 17 00:00:00 2001 From: VladislavI1I1I <67649664+VladislavI1I1I@users.noreply.github.com> Date: Thu, 30 Jan 2025 17:52:49 +0300 Subject: [PATCH 5/7] Update core.md (#165) --- core.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core.md b/core.md index f20e28b..f7b6100 100644 --- a/core.md +++ b/core.md @@ -1273,8 +1273,7 @@ public native int hashCode(); ## Если у класса `Point{int x, y;}` реализовать метод `equals(Object that) {(return this.x == that.x && this.y == that.y)}`, но сделать хэш код в виде `int hashCode() {return x;}`, то будут ли корректно такие точки помещаться и извлекаться из `HashSet`? `HashSet` использует `HashMap` для хранения элементов. При добавлении элемента в `HashMap` вычисляется хэш код, по которому определяется позиция в массиве, куда будет вставлен новый элемент. У всех экземпляров класса `Point` хэш код будет одинаковым для всех объектов с одинаковым `x`, что приведёт к вырождению хэш таблицы в список. -При возникновении коллизии в `HashMap` осуществляется проверка на наличие элемента в списке: `e.hash == hash && ((k = e.key) == key || key.equals(k))`. Если элемент найден, то его значение перезаписывается. В нашем случае для разных объектов метод `equals()` будет возвращать `false`. Соответственно новый элемент будет успешно добавлен в `HashSet`. Извлечение элемента также будет осуществляться успешно. Но производительность такого кода будет невысокой и преимущества хэш таблиц использоваться не будут. - +При возникновении коллизии в `HashMap` осуществляется проверка на наличие элемента в списке: `e.hash == hash && ((k = e.key) == key || key.equals(k))`. Если элемент найден, то его значение перезаписывается. В нашем случае для разных объектов метод `equals()` будет возвращать `false`. Соответственно новый элемент будет успешно добавлен в `HashSet`. Извлечение элемента также будет осуществляться успешно. Но производительность такого кода будет невысокой, из-за неэффективности хэш-функции, которая cпособна породить большое количество колизий. [к оглавлению](#java-core) ## Могут ли у разных объектов `(ref0 != ref1)` быть `ref0.equals(ref1) == true`? From 258a0a1695e5b4d0c1a68a8688634387b7667d97 Mon Sep 17 00:00:00 2001 From: Sergey Melnikov <70698479+melniknow@users.noreply.github.com> Date: Fri, 25 Apr 2025 20:11:32 +0600 Subject: [PATCH 6/7] Fix typos (#170) Co-authored-by: Sergey Melnikov --- db.md | 2 +- java8.md | 2 +- jcf.md | 2 +- jdbc.md | 6 +++--- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/db.md b/db.md index 3f7aa80..32aae03 100644 --- a/db.md +++ b/db.md @@ -179,7 +179,7 @@ __По количественному составу__ __По характеристике содержимого__ + _уникальный индекс_ состоит из множества уникальных значений поля; -+ _плотный индекс_ (NoSQL) — индекс, при котором, каждом документе в индексируемой коллекции соответствует запись в индексе, даже если в документе нет индексируемого поля. ++ _плотный индекс_ (NoSQL) — индекс, при котором, каждому документу в индексируемой коллекции соответствует запись в индексе, даже если в документе нет индексируемого поля. + _разреженный индекс_ (NoSQL) — тот, в котором представлены только те документы, для которых индексируемый ключ имеет какое-то определённое значение (существует). + _пространственный индекс_ — оптимизирован для описания географического местоположения. Представляет из себя многоключевой индекс состоящий из широты и долготы. + _составной пространственный индекс_ — индекс, включающий в себя кроме широты и долготы ещё какие-либо мета-данные (например теги). Но географические координаты должны стоять на первом месте. diff --git a/java8.md b/java8.md index 904d885..f430b6a 100644 --- a/java8.md +++ b/java8.md @@ -230,7 +230,7 @@ public static void main(String[] args) { ## Какие виды ссылок на методы вы знаете? + на статический метод; + на метод экземпляра; -+ на конструкторе. ++ на конструктор. [к оглавлению](#java-8) diff --git a/jcf.md b/jcf.md index d856cd9..9e9dcf2 100644 --- a/jcf.md +++ b/jcf.md @@ -883,7 +883,7 @@ __Один__ новый объект статического вложенног [к оглавлению](#java-collections-framework) ## Как перебрать все пары «ключ-значение» в `Map`? -Использовать метод `entrySet()`, который возвращает множество `Set` пар «ключ-значение». +Использовать метод `entrySet()`, который возвращает множество `Set>` пар «ключ-значение». [к оглавлению](#java-collections-framework) diff --git a/jdbc.md b/jdbc.md index 3211f10..56754a6 100644 --- a/jdbc.md +++ b/jdbc.md @@ -190,9 +190,9 @@ __Уровень изолированности транзакций__ — зн Объекты-носители интерфейсов создаются при помощи методов объекта `java.sql.Connection`: -+ `java.sql.createStatement()` возвращает объект _Statement_; -+ `java.sql.prepareStatement()` возвращает объект _PreparedStatement_; -+ `java.sql.prepareCall()` возвращает объект _CallableStatement_; ++ `java.sql.Connection.createStatement()` возвращает объект _Statement_; ++ `java.sql.Connection.prepareStatement()` возвращает объект _PreparedStatement_; ++ `java.sql.Connection.prepareCall()` возвращает объект _CallableStatement_; [к оглавлению](#jdbc) From 5d1c11772e10cb89c1da1ffa0ec99b5719eaec3a Mon Sep 17 00:00:00 2001 From: Anton Shliakhtin Date: Wed, 28 May 2025 09:02:43 +0200 Subject: [PATCH 7/7] Remove duplicate link (#172) --- README.md | 1 - 1 file changed, 1 deletion(-) diff --git a/README.md b/README.md index 17ff0b3..472e92e 100644 --- a/README.md +++ b/README.md @@ -834,7 +834,6 @@ + [Top Spring Framework Interview Questions](https://www.baeldung.com/spring-interview-questions) + [Spring Interview Questions](https://www.interviewbit.com/spring-interview-questions/) + [Hibernate Interview Questions](https://www.adaface.com/blog/hibernate-interview-questions/) -+ [Hibernate Interview Questions](https://www.adaface.com/blog/hibernate-interview-questions/) + [Java Interview Questions](https://labex.io/interview-questions/java) [к оглавлению](#Вопросы-для-собеседования-на-java-developer)