Kafka java что это
Перейти к содержимому

Kafka java что это

  • автор:

Что такое Apache Kafka: как устроен и работает брокер сообщений

Apache Kafka — распределенный брокер сообщений, работающий в стриминговом режиме. В статье мы расскажем про его устройство и преимущества, а также о том, где применяют это ПО.

Изображение записи

Apache Kafka — распределенный брокер сообщений, работающий в стриминговом режиме. В статье мы расскажем про его устройство и преимущества, а также о том, где применяют «Кафку».

Что такое брокер сообщений

Главная задача брокера — обеспечение связи и обмена информацией между приложениями или отдельными модулями в режиме реального времени.

Брокер — система, преобразующая сообщение от источника данных (продюсера) в сообщение принимающей стороны (консьюмера). Брокер выступает проводником и состоит из серверов, объединенных в кластеры.

Apache Kafka — диспетчер сообщений, разработанный LinkedIn. В 2011 году был опубликован программный код. В 2012 году Kafka попал в инкубатор Apache, дальнейшая разработка ведется в рамках Apache Software Foundation. Открытое программное обеспечение с разрешительной лицензией написано на Java и Scala.

Изначально «Кафку» создавали как систему, оптимизированную под запись, и создатель Джей Крепс выбрал такое название в честь одного из своих любимых писателей.

Шаги передачи данных

Чтобы понять, как функционирует распределенная система Apache Kafka, необходимо проследить путь данных.

Событие или сообщение — данные, которые поступают из одного сервиса, хранятся на узлах Kafka и читаются другими сервисами. Сообщение состоит из:

  • Key — опциональный ключ, нужен для распределения сообщений по кластеру.
  • Value — массив байт, бизнес-данные.
  • Timestamp — текущее системное время, устанавливается отправителем или кластером во время обработки.
  • Headers — пользовательские атрибуты key-value, которые прикрепляют к сообщению.

Продюсер — поставщик данных, который генерирует сообщения — например, служебные события, логи, метрики, события мониторинга.

Консьюмер — потребитель данных, который читает и использует события, пример — сервис сбора статистики.

Взаимодействие продюсера и консьюмера сообщений

Какие сложности решает распределенная система

Сообщения могут быть однотипными или разнородными, поскольку разным потребителям нужны разные данные. Один тип событий может быть нужен всем консьюмерам, а другие — только одному.

Без брокера продюсеры должны знать получателя и резервного консьюмера, если основной недоступен. К тому же, поставщикам данных придется самостоятельно регистрировать новых консьюмеров. С помощью брокера продюсеры просто отправляют информацию в единый узел.

Managed service для Apache Kafka

Сообщения хранятся на узлах-брокерах. Kafka — масштабируемый кластер со множеством взаимозаменяемых серверов, в которые добавляются новые брокеры, распределяющие задачи между собой.

ZooKeeper — инструмент-координатор, действует как общая служба конфигурации в системе. Работает как база для хранения метаданных о состоянии узлов кластера и расположении сообщений. ZooKeeper обеспечивает гибкую и надежную синхронизацию в распределенной системе, позволяя нескольким клиентам выполнять одновременно чтение и запись.

Kafka Controller — среди брокеров Zookeeper выбирает одного, который будет обеспечивать консистентность данных.

Topic — принцип деления потока данных, базовая и основная сущность Apache Kafka. В топик складывается стрим данных, единая очередь из входящих сообщений.

Partition — для ускорения чтения и записи топики делятся на партиции. Происходит параллелизация данных. Это конфигурируемый параметр, сообщения могут отправлять несколько продюсеров и принимать несколько консьюмеров.

Managed service для Apache Kafka

Упорядочение событий происходит на уровне партиций. Принимающая сторона потребляет данные в порядке расположения в партиции. Пример: все события одного пользователя сервисы принимают упорядоченно, обработка сохраняет последовательность пути пользователя. Выстраивается конвейер данных, алгоритмы машинного обучения могут извлекать из сырой информации необходимую для бизнеса информацию.

Преимущества Apache Kafka

Брокер распределяет информацию в широковещательном режиме. Применяющийся в Apache Kafka подход нужен для масштабирования и репликации данных.

Горизонтальное масштабирование

Множество объединенных серверов гарантируют высокую доступность данных — выход из строя одного из узлов не нарушает целостность. Кластер состоит из обычных машин, а не суперкомпьютеров, их можно менять и дополнять. Система автоматически перебалансируется.

Чтобы события не потерялись, существуют механизмы репликации. Данные записываются на несколько машин, если что-то случается с сервером, он переключается на резервный. Кластер в режиме реального времени определяет, где находятся данные, и продолжает их использовать.

Офсеты

Если консьюмер падает в процессе получения данных, то, когда он запустится вновь и ему нужно будет вернутся к чтению этого сообщения, он воспользуется офсетом и продолжит с нужного места.

Взаимодействие через API

Брокеры решают проблему интеграции разных технических стеков и протоколов. Интеграция происходит просто: продюсерам и консьюмерам необходимо знать только API брокера. Они не контактируют между собой, с помощью чего достигается высокая интегрируемость с другими системами.

Принцип first in — first out

Принцип FIFO действует на консьюмеров. Чтение происходит в том же порядке, в котором пришла информация.

Где применяется Apache Kafka

Отказоустойчивая система используется в бизнесе, где необходимо собирать, хранить и обрабатывать большие неструктурированные данные. Примеры — платформы, где требуется интеграция данных из большого количества источников, сервисы стриминговой аналитики, mission-critical applications.

Big Data

Первоначально LinkedIn разработали «Кафку» для своих целей: обмена данными между службами, репликации баз данных, потоковой передачи информации о деятельности и операционных показателях приложений.

Для IBM Apache Kafka работает как средство обмена сообщениями между микросервисами. В аналитических системах американской корпорации Apache Kafka обрабатывает потоковые и событийные данные.

Uber, Twitter, Netflix и AirBnb с помощью хорошо развитых пайплайнов обработки данных передают миллиарды сообщений в день. «Кафка» решает проблемы перемещения Big data из одного источника в другой.

Издание The New York Times использует Apache Kafka для хранения и распространения опубликованного контента среди различных приложений и систем, которые делают его доступным для читателей в режиме реального времени.

Internet of Things

IoT-платформы используют архитектуру с большим количеством конечных устройств: контроллеров, датчиков, сенсоров и smart-гаджетов. ПО интернета вещей с помощью алгоритмов ML составляет графики профилактического ремонта оборудования, анализируя данные, поступающие с устройств.

ML-системы работают с онлайн-потоками, когда приборы, приложения и пользователи постоянно посылают данные, а сервисы обрабатывают их в реальном времени. Apache Kafka выступает центральным звеном в этом процессе.

Отрасли

Kafka используют организации практически в любой отрасли: разработка ПО, финансовые услуги, здравоохранение, государственное управление, транспорт, телеком, геймдев.

Сегодня Kafka пользуются тысячи компаний, более 60% входят в список Fortune 100. На официальном сайте представлен полный список корпораций и учреждений, которые используют брокера Apache.

Конкуренты

Чаще всего Kafka сравнивают с RabbitMQ. Обе системы — брокеры сообщений. Главное отличие в модели доставки: Kafka добавляет сообщение в журнал, и консьюмер сам забирает информацию из топика; брокер RabbitMQ самостоятельно отправляет сообщения получателям — помещает событие в очередь и отслеживает его статус.

«Кролик» удаляет событие после доставки, «Кафка» хранит до запланированной очистки журнала. Таким образом, брокер Apache используется как источник истории изменений.

Разработчики RabbitMQ создали системы управления потоком сообщений: мониторинг получения, маршрутизация и шаблоны доставки. Подобное гибкое управление подойдет для высокоскоростного обмена сообщениями между несколькими сервисами. Минус такого подхода в снижении производительности при высокой нагрузке.

Главный вывод — для сбора и агрегации событий из большого количества источников, логов и метрик больше подойдет Apache Kafka.

Заключение

Благодаря высокой пропускной способности и согласованности данных Apache Kafka обрабатывает огромные массивы данных в реальном времени. Системы горизонтального масштабирования и офсеты гарантируют надежность. Kafka — удачное решение для проекта с очень большими нагрузками на обработку данных. Установить это ПО можно на серверы Ubuntu, Windows, CentOS и других популярных операционных систем.

Kafka Apache

Kafka Apache — распределенная система обмена сообщениями между серверными приложениями в режиме реального времени. Благодаря высокой пропускной способности, масштабируемости и надежности применяется в компаниях, работающих с большими объемами данных. Написана на языках Java и Scala.

Освойте профессию
«Fullstack-разработчик на Python»

Kafka разработана компанией LinkedIn. В 2011 году разработчик опубликовал исходный код системы. С тех пор платформа развивается и поддерживается как открытый проект в рамках фонда Apache Software Foundation. Apache Kafka используют многие крупные компании, такие как LinkedIn, Microsoft, The New York Times, Netflix и другие.

Профессия / 12 месяцев
Fullstack-разработчик на Python

Создавайте веб-проекты самостоятельно

dffsdd (2)

Применение Kafka Apache

Kafka Apache — эффективный инструмент для организации работы серверных проектов любого уровня. Благодаря гибкости, масштабируемости и отказоустойчивости используется в различных направлениях IT-индустрии, от сервисов потоковых видео до аналитики Big Data.

  • Для связи микросервисов. Kafka — связующее звено между отдельными функциональными модулями большой системы. Например, с ее помощью можно подписать микросервис на другие компоненты для регулярного получения обновлений.
  • Потоковая передача данных. Высокая пропускная способность системы позволяет поддерживать непрерывные потоки информации. За счет грамотной маршрутизации «Кафка» не только надежно передает данные, но и позволяет производить с ними различные операции.
  • Ведение журнала событий. Kafka сохраняет данные в строго организованную структуру, в которой всегда можно отследить, когда произошло то или иное событие. Информация хранится в течение заданного промежутка времени, что можно использовать для разгрузки базы данных или медленно работающих систем логирования.

Читайте также Как выбрать IT-специальность в новых реалиях?

Как устроена и работает Kafka Apache

Кратко архитектуру системы сообщений можно охарактеризовать следующим образом:

  • распределенность — отдельные узлы системы располагаются на нескольких аппаратных платформах (кластерах). Это обеспечивает ей высокую отказоустойчивость;
  • масштабируемость — систему можно наращивать за счет простого добавления новых узлов (брокеров сообщений).

В архитектуре Kafka Apache ключевыми являются концепции:

  • продюсер (producer) — приложение или процесс, генерирующий и посылающий данные (публикующий сообщение);
  • потребитель (consumer) приложение или процесс, который принимает сгенерированное продюсером сообщение;
  • сообщение — пакет данных, необходимый для совершения какой-либо операции (например, авторизации, оформления покупки или подписки);
  • брокер — узел (диспетчер) передачи сообщения от процесса-продюсера приложению-потребителю;
  • топик (тема) виртуальное хранилище сообщений (журнал записей) одинакового или похожего содержания, из которого приложение-потребитель извлекает необходимую ему информацию.

В упрощенном виде работа Kafka Apache выглядит следующим образом:

схема работы Kafka Apache

  • Приложение-продюсер создает сообщение и отправляет его на узел Kafka.
  • Брокер сохраняет сообщение в топике, на который подписаны приложения-потребители.
  • Потребитель при необходимости делает запрос в топик и получает из него нужные данные.

Сообщения хранятся в Kafka в виде журнала коммитов — записей, размещенных в строгой последовательности. Их можно только добавлять. Удалять или корректировать нельзя. Сообщения хранятся в той последовательности, в которой поступили, их считывание ведется слева направо, а отслеживание — по изменению порядкового номера. Брокеры Kafka не обрабатывают записи — только помещают их в тему на кластере. Хранение может длиться в течение определенного периода или до достижения заданного порога.

Станьте Fullstack-разработчик на Python и найдите стабильную работу
на удаленке

Если тема слишком разрастается, для упрощения и ускорения процесса она разделяется на секции. Каждая секция содержит сообщения, сгруппированные по объединяющему признаку. Например, массив пользовательских запросов можно сгруппировать по первой букве имени пользователей. Так приложению-потребителю не придется просматривать весь топик — только нужную тему, что ускоряет процесс обмена сообщениями.

организация данных в Kafka, схема

Преимущества Kafka

Отказоустойчивость

Kafka — распределенная система обмена сообщениями, узлы которой содержатся на нескольких кластерах. Принимая сообщение от продюсера, она реплицирует (копирует) его, а копии сохраняет на разных узлах. При этом один из брокеров назначается ведомым в секции, через него потребители будут обращаться к записям. Другие брокеры остаются ведомыми, их главная задача — обеспечить сохранность сообщения (его копий) даже при выходе одного или нескольких узлов из строя. Распределенный характер и механизм репликации записей обеспечивают системе высокую устойчивость. Надежность повышает интеграция с Apache ZooKeeper, которая обеспечивает координацию компонентов друг с другом.

Масштабируемость

Apache Kafka поддерживает «горячее» расширение, то есть ее можно увеличивать с помощью простого добавления новых машин в кластеры, не отключая всю систему. Так исключаются простои, связанные с переоборудованием серверных мощностей. Принцип удобнее горизонтального масштабирования, при котором на одну серверную машину «навешиваются» дополнительные ресурсы: жесткие диски, CPU, RAM и т.д. При необходимости систему можно легко сократить, исключив лишние машины из кластера.

Производительность

В Kafka процессы генерирования/отправки и считывания сообщений организованы независимо друг от друга. Тысячи приложений, процессов могут одновременно и параллельно играть роль генераторов и потребителей сообщений. В сочетании с распределенным характером и масштабируемостью это позволяет применять «Кафка» как в небольших, так и в масштабных проектах с большими объемами данных.

Открытый исходный код

Kafka распространяется по свободной лицензии фонда Apache Software Foundation. Благодаря этому Kafka Apache имеет ряд преимуществ:

  • большой объем подробной справочной информации от официальных разработчиков, а также различных мануалов, лайфхаков, инструкций и обзоров от большого числа энтузиастов-любителей и профессионалов;
  • большое количество дополнительных программных пакетов, патчей от сторонних разработчиков, расширяющих и улучшающих базовый функционал системы;
  • возможность самостоятельно адаптировать систему под специфику проекта за счет гибкости настроек.

Безопасность

В Kafka есть инструменты, обеспечивающие безопасную работу и достоверность данных. Например, настроив уровень изоляции для транзакций, можно исключить чтение незавершенных или отмененных сообщений. Кроме того, благодаря сохранению данных в топиках пользователь может в любой момент отследить изменения в системе. А принцип последовательной записи позволяет быстро находить нужные сообщения.

Долговечность

Данные в Kafka сохраняются в долговременные виртуальные хранилища в течение заданного периода времени (дней, недель, месяцев). За счет распределенного хранения информации она не потеряется при сбое одного или нескольких узлов, и потребитель сможет в любой момент обратиться к нужному сообщению в топике, отследив его смещение.

Интегрируемость

Благодаря собственному протоколу на базе TCP «Кафка Апач» взаимодействует с другими протоколами передачи данных (REST, HTTP, XMPP, STOMP, AMQP, MQTT). Встроенный фреймворк Kafka Connect позволяет Kafka подключаться к базам данных, файловым и облачным хранилищам.

Единственный заметный недостаток системы — ориентированность на обработку больших объемов данных. Из-за этого функционал маршрутизации потоков ограничен по сравнению с другими аналогичными платформами. По мере развития Kafka это различие становится менее заметным, а сама система — более гибкой и универсальной.

Fullstack-разработчик на Python

Fullstack-разработчики могут в одиночку сделать IT-проект от архитектуры до интерфейса. Их навыки востребованы у работодателей, особенно в стартапах. Научитесь программировать на Python и JavaScript и создавайте сервисы с нуля.

Apache Kafka для чайников

Данная статья будет полезной тем, кто только начал знакомиться с микросервисной архитектурой и с сервисом Apache Kafka. Материал не претендует на подробный туториал, но поможет быстро начать работу с данной технологией. Я расскажу о том, как установить и настроить Kafka на Windows 10. Также мы создадим проект, используя Intellij IDEA и Spring Boot.

Зачем?

Трудности в понимании тех или иных инструментов часто связаны с тем, что разработчик никогда не сталкивался с ситуациями, в которых эти инструменты могут понадобиться. С Kafka всё обстоит точно также. Опишем ситуацию, в которой данная технология будет полезной. Если у вас монолитная архитектура приложения, то разумеется, никакая Kafka вам не нужна. Всё меняется с переходом на микросервисы. По сути, каждый микросервис – это отдельная программа, выполняющая ту или иную функцию, и которая может быть запущена независимо от других микросервисов. Микросервисы можно сравнить с сотрудниками в офисе, которые сидят за отдельными столами и независимо от коллег решают свою задачу. Работа такого распределённого коллектива немыслима без централизованной координации. Сотрудники должны иметь возможность обмениваться сообщениями и результатами своей работы между собой. Именно эту проблему и призвана решить Apache Kafka для микросервисов.

Apache Kafka является брокером сообщений. С его помощью микросервисы могут взаимодействовать друг с другом, посылая и получая важную информацию. Возникает вопрос, почему не использовать для этих целей обычный POST – reqest, в теле которого можно передать нужные данные и таким же образом получить ответ? У такого подхода есть ряд очевидных минусов. Например, продюсер (сервис, отправляющий сообщение) может отправить данные только в виде response’а в ответ на запрос консьюмера (сервиса, получающего данные). Допустим, консьюмер отправляет POST – запрос, и продюсер отвечает на него. В это время консьюмер по каким-то причинам не может принять полученный ответ. Что будет с данными? Они будут потеряны. Консьюмеру снова придётся отправлять запрос и надеяться, что данные, которые он хотел получить, за это время не изменились, и продюсер всё ещё готов принять request.

Apache Kafka решает эту и многие другие проблемы, возникающие при обмене сообщениями между микросервисами. Не лишним будет напомнить, что бесперебойный и удобный обмен данными – одна из ключевых проблем, которую необходимо решить для обеспечения устойчивой работы микросервисной архитектуры.

Установка и настройка ZooKeeper и Apache Kafka на Windows 10

Первое, что надо знать для начала работы — это то, что Apache Kafka работает поверх сервиса ZooKeeper. ZooKeeper — это распределенный сервис конфигурирования и синхронизации, и это всё, что нам нужно знать о нём в данном контексте. Мы должны скачать, настроить и запустить его перед тем, как начать работу с Kafka. Прежде чем начать работу с ZooKeeper, убедитесь, что у вас установлен и настроен JRE.

Извлекаем из скаченного архива ZooKeeper`а файлы в какую-нибудь папку на диске.
В папке zookeeper с номером версии, находим папку conf и в ней файл “zoo_sample.cfg”.

Копируем его и меняем название копии на “zoo.cfg”. Открываем файл-копию и находим в нём строчку dataDir=/tmp/zookeeper. Прописываем в данной строчке полный путь к нашей папке zookeeper-х.х.х. У меня это выглядит так: dataDir=C:\\ZooKeeper\\zookeeper-3.6.0

Теперь добавим системную переменную среды: ZOOKEEPER_HOME = C:\ ZooKeeper \zookeeper-3.4.9 и в конце системной переменной Path добавим запись: ;%ZOOKEEPER_HOME%\bin;

Запускаем командную строку и пишем команду:

zkserver

Если всё сделано правильно, вы увидите примерно следующее.

Это означает, что ZooKeeper стартанул нормально. Переходим непосредственно к установке и настройке сервера Apache Kafka. Скачиваем свежую версию с официального сайта и извлекаем содержимое архива: kafka.apache.org/downloads

В папке с Kafka находим папку config, в ней находим файл server.properties и открываем его.

Находим строку log.dirs= /tmp/kafka-logs и указываем в ней путь, куда Kafka будет сохранять логи: log.dirs=c:/kafka/kafka-logs.

В этой же папке редактируем файл zookeeper.properties. Строчку dataDir=/tmp/zookeeper меняем на dataDir=c:/kafka/zookeeper-data, не забывая при этом, после имени диска указывать путь к своей папке с Kafka. Если вы всё сделали правильно, можно запускать ZooKeeper и Kafka.

Для кого-то может оказаться неприятной неожиданностью, что никакого GUI для управления Kafka нет. Возможно, это потому, что сервис рассчитан на суровых нёрдов, работающих исключительно с консолью. Так или иначе, для запуска кафки нам потребуется командная строка.

Сначала надо запустить ZooKeeper. В папке с кафкой находим папку bin/windows, в ней находим файл для запуска сервиса zookeeper-server-start.bat, кликаем по нему. Ничего не происходит? Так и должно быть. Открываем в этой папке консоль и пишем:

 start zookeeper-server-start.bat

Опять не работает? Это норма. Всё потому что zookeeper-server-start.bat для своей работы требует параметры, прописанные в файле zookeeper.properties, который, как мы помним, лежит в папке config. Пишем в консоль:

start zookeeper-server-start.bat c:\kafka\config\zookeeper.properties 

Теперь всё должно стартануть нормально.

Ещё раз открываем консоль в этой папке (ZooKeeper не закрывать!) и запускаем kafka:

start kafka-server-start.bat c:\kafka\config\server.properties

Для того, чтобы не писать каждый раз команды в командной строке, можно воспользоваться старым проверенным способом и создать батник со следующим содержимым:

start C:\kafka\bin\windows\zookeeper-server-start.bat C:\kafka\config\zookeeper.properties timeout 10 start C:\kafka\bin\windows\kafka-server-start.bat C:\kafka\config\server.properties

Строка timeout 10 нужна для того, чтобы задать паузу между запуском zookeeper и kafka. Если вы всё сделали правильно, при клике на батник должны открыться две консоли с запущенным zookeeper и kafka.Теперь мы можем прямо из командной строки создать продюсера сообщений и консьюмера с нужными параметрами. Но, на практике это может понадобиться разве что для тестирования сервиса. Гораздо больше нас будет интересовать, как работать с kafka из IDEA.

Работа с kafka из IDEA

Мы напишем максимально простое приложение, которое одновременно будет и продюсером и консьюмером сообщения, а затем добавим в него полезные фичи. Создадим новый спринг-проект. Удобнее всего делать это с помощью спринг-инициалайзера. Добавляем зависимости org.springframework.kafka и spring-boot-starter-web

В итоге файл pom.xml должен выглядеть так:

Для того, чтобы отправлять сообщения, нам потребуется объект KafkaTemplate. Как мы видим объект является типизированным. Первый параметр – это тип ключа, второй – самого сообщения. Пока оба параметра мы укажем как String. Объект будем создавать в классе-рестконтроллере. Объявим KafkaTemplate и попросим Spring инициализировать его, поставив аннотацию Autowired.

@Autowired private KafkaTemplate kafkaTemplate;

В принципе, наш продюсер готов. Всё что осталось сделать – это вызвать у него метод send(). Имеется несколько перегруженных вариантов данного метода. Мы используем в нашем проекте вариант с 3 параметрами — send(String topic, K key, V data). Так как KafkaTemplate типизирован String-ом, то ключ и данные в методе send будут являться строкой. Первым параметром указывается топик, то есть тема, в которую будут отправляться сообщения, и на которую могут подписываться консьюмеры, чтобы их получать. Если топик, указанный в методе send не существует, он будет создан автоматически. Полный текст класса выглядит так.

@RestController @RequestMapping("msg") public class MsgController < @Autowired private KafkaTemplatekafkaTemplate; @PostMapping public void sendOrder(String msgId, String msg) < kafkaTemplate.send("msg", msgId, msg); >> 

Контроллер мапится на localhost:8080/msg, в теле запроса передаётся ключ и само сообщений.

Отправитель сообщений готов, теперь создадим слушателя. Spring так же позволяет cделать это без особых усилий. Достаточно создать метод и пометить его аннотацией @KafkaListener, в параметрах которой можно указать только топик, который будет слушаться. В нашем случае это выглядит так.

@KafkaListener(topics="msg")

У самого метода, помеченного аннотацией, можно указать один принимаемый параметр, имеющий тип сообщения, передаваемого продюсером.

Класс, в котором будет создаваться консьюмер необходимо пометить аннотацией @EnableKafka.

@EnableKafka @SpringBootApplication public class SimpleKafkaExampleApplication < @KafkaListener(topics="msg") public void msgListener(String msg)< System.out.println(msg); >public static void main(String[] args) < SpringApplication.run(SimpleKafkaExampleApplication.class, args); >>

Так же в файле настроек application.property необходимо указать параметр консьюмера groupe-id. Если этого не сделать, приложение не запустится. Параметр имеет тип String и может быть любым.

spring.kafka.consumer.group-id=app.1

Наш простейший кафка-проект готов. У нас есть отправитель и получатель сообщений. Осталось только запустить. Для начала запускаем ZooKeeper и Kafka с помощью батника, который мы написали ранее, затем запускаем наше приложение. Отправлять запрос удобнее всего с помощью Postman. В теле запроса не забываем указывать параметры msgId и msg.

Если мы видим в IDEA такую картину, значит всё работает: продюсер отправил сообщение, консьюмер получил его и вывел в консоль.

Усложняем проект

Реальные проекты с использованием Kafka конечно же сложнее, чем тот, который мы создали. Теперь, когда мы разобрались с базовыми функциями сервиса, рассмотрим, какие дополнительные возможности он предоставляет. Для начала усовершенствуем продюсера.

Если вы открывали метод send(), то могли заметить, что у всех его вариантов есть возвращаемое значение ListenableFuture>. Сейчас мы не будем подробно рассматривать возможности данного интерфейса. Здесь будет достаточно сказать, что он нужен для просмотра результата отправки сообщения.

@PostMapping public void sendMsg(String msgId, String msg)< ListenableFuture> future = kafkaTemplate.send("msg", msgId, msg); future.addCallback(System.out::println, System.err::println); kafkaTemplate.flush(); >

Метод addCallback() принимает два параметра – SuccessCallback и FailureCallback. Оба они являются функциональными интерфейсами. Из названия можно понять, что метод первого будет вызван в результате успешной отправки сообщения, второго – в результате ошибки.Теперь, если мы запустим проект, то увидим на консоли примерно следующее:

SendResult [producerRecord=ProducerRecord(topic=msg, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=1, value=Hello, world!, timestamp=null), recordMetadata=msg-0@6]

Посмотрим ещё раз внимательно на нашего продюсера. Интересно, что будет если в качестве ключа будет не String, а, допустим, Long, а в качестве передаваемого сообщения и того хуже – какая-нибудь сложная DTO? Попробуем для начала изменить ключ на числовое значение…

Если мы укажем в продюсере в качестве ключа Long, то приложение нормально запуститься, но при попытке отправить сообщение будет выброшен ClassCastException и будет сообщено, что класс Long не может быть приведён к классу String.

Если мы попробуем вручную создать объект KafkaTemplate, то увидим, что в конструктор в качестве параметра передаётся объект интерфейса ProducerFactory, например DefaultKafkaProducerFactory<>. Для того, чтобы создать DefaultKafkaProducerFactory, нам нужно в его конструктор передать Map, содержащий настройки продюсера. Весь код по конфигурации и созданию продюсера вынесем в отдельный класс. Для этого создадим пакет config и в нём класс KafkaProducerConfig.

@Configuration public class KafkaProducerConfig < private String kafkaServer="localhost:9092"; @Bean public MapproducerConfigs() < Mapprops = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; > @Bean public ProducerFactory producerFactory() < return new DefaultKafkaProducerFactory<>(producerConfigs()); > @Bean public KafkaTemplate kafkaTemplate() < return new KafkaTemplate<>(producerFactory()); > > 

В методе producerConfigs() создаём мапу с конфигурациями и в качестве сериализатора для ключа указываем LongSerializer.class. Запускаем, отправляем запрос из Postman и видим, что теперь всё работает, как надо: продюсер отправляет сообщение, а консьюмер принимает его.

Теперь изменим тип передаваемого значения. Что если у нас не стандартный класс из библиотеки Java, а какой-нибудь кастомный DTO. Допустим такой.

@Data public class UserDto < private Long age; private String name; private Address address; >@Data @AllArgsConstructor public class Address

Для отправки DTO в качестве сообщения, нужно внести некоторые изменения в конфигурацию продюсера. В качестве сериализатора значения сообщения укажем JsonSerializer.class и не забудем везде изменить тип String на UserDto.

@Configuration public class KafkaProducerConfig < private String kafkaServer="localhost:9092"; @Bean public MapproducerConfigs() < Mapprops = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return props; > @Bean public ProducerFactory producerFactory() < return new DefaultKafkaProducerFactory<>(producerConfigs()); > @Bean public KafkaTemplate kafkaTemplate() < return new KafkaTemplate<>(producerFactory()); > >

Отправим сообщение. В консоль будет выведена следующая строка:

Теперь займёмся усложнением консьюмера. До этого наш метод public void msgListener(String msg), помеченный аннотацией @KafkaListener(topics=«msg») в качестве параметра принимал String и выводил его на консоль. Как быть, если мы хотим получить другие параметры передаваемого сообщения, например, ключ или партицию? В этом случае тип передаваемого значения необходимо изменить.

@KafkaListener(topics="msg") public void orderListener(ConsumerRecord record)

Из объекта ConsumerRecord мы можем получить все интересующие нас параметры.

Мы видим, что вместо ключа на консоль выводятся какие-то кракозябры. Это потому, что для десериализации ключа по умолчанию используется StringDeserializer, и если мы хотим, чтобы ключ в целочисленном формате корректно отображался, мы должны изменить его на LongDeserializer. Для настройки консьюмера в пакете config создадим класс KafkaConsumerConfig.

@Configuration public class KafkaConsumerConfig < @Value("$") private String kafkaServer; @Value("$") private String kafkaGroupId; @Bean public Map consumerConfigs() < Mapprops = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId); return props; > @Bean public KafkaListenerContainerFactory kafkaListenerContainerFactory() < ConcurrentKafkaListenerContainerFactoryfactory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; > @Bean public ConsumerFactory consumerFactory() < return new DefaultKafkaConsumerFactory<>(consumerConfigs()); > >

Класс KafkaConsumerConfig очень похож на KafkaProducerConfig, который мы создавали ранее. Здесь так же присутствует Map, содержащий необходимые конфигурации, например, такие как десериализатор для ключа и значения. Созданная мапа используется при создании ConsumerFactory<>, которая в свою очередь, нужна для создания KafkaListenerContainerFactory. Важная деталь: метод возвращающий KafkaListenerContainerFactory должен называться kafkaListenerContainerFactory(), иначе Spring не сможет найти нужного бина и проект не скомпилируется. Запускаем.

Видим, что теперь ключ отображается как надо, а это значит, что всё работает. Конечно, возможности Apache Kafka далеко выходят за пределы тех, что описаны в данной статье, однако, надеюсь, прочитав её, вы составите представление о данном сервисе и, самое главное, сможете начать работу с ним.

Мойте руки чаще, носите маски, не выходите без необходимости на улицу, и будьте здоровы.

Apache Kafka. Пишем простой producer и consumer и тестируем их

В данной статье будет описано, как создать простой kafka producer и kafka consumer, а затем протестировать их.

Данная статья будет полезна начинающим разработчикам, которые еще не работали с технологией Apache Kafka.

Вначале надо разобраться, что такое Apache Kafka и для чего она используется. И тут сразу могут возникнуть первые вопросы, так как первое, что приходит в голову, если идет речь о kafka, то это — распределенная система обмена сообщениями между серверными приложениями в режиме реального времени. Но если «копнуть глубже» и посмотреть на определение kafka на официальном сайте https://kafka.apache.org/ мы увидим.

Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.

Исходя из этого определения Apache Kafka — это больше, чем просто система обмена сообщениями, это распределенная платформа потоковой передачи событий, а также потоковой аналитики и интеграции данных.

То есть kafka может использоваться и как база данных, и как распределенное хранилище логов, и как очередь, и как платформа для потоковой обработки данных и т.д.

В данной статье будет рассмотрен пример, как с помощью kafka организовать обмен сообщениями между двумя микросервисами.

Kafka, как и почти все сервисы обработки очередей, условно состоит из трех основных частей:

1) сервер или еще его называют брокер;

2) producer — отправляют сообщения брокеру;

3) consumer — считывают сообщения с брокера, использую модель pull, то есть консьюмеры сами отправляют запросы к брокеру для получения новых сообщений.

Главной отличительной чертой kafka от других систем обработки очередей (например RabbitMQ), является то, что сообщения в kafka могут храниться на брокере днями, неделями или даже годами. Благодаря этому одно и тоже сообщение может быть обработано разными консьюмерами по-разному.

Рассмотрим какая структура сообщения в kafka. Оно состоит из ключа (key), значения (value), таймстампа (timestamp) и набора метаданных (headers).

Сообщения хранятся в топиках (topics). Топики состоят из партиций (partitions). Партиции или их еще называют разделы — это копии очередей наших сообщений. Чтобы повысить надежность и доступность данных в кластере-Kafka, разделы могут иметь копии, число которых задается коэффициентом репликации (replication factor), который показывает, на сколько брокеров-последователей (follower) будут скопированы данные с ведущего-лидера (leader). Таким образом, гарантируется наличие нескольких копий сообщения на разных брокерах. Партиции, в свою очередь, распределены между брокерами внутри одного кластера. Такая сложная, на первый взгляд, система хранения сообщений необходима для отказоустойчивости, масштабирования и повышения производительности работы, так как она позволяет продюсерам писать в несколько брокеров одновременно, а консьюмерам — читать, также из нескольких брокеров.

У каждой партиции есть свой «лидер» (leader) — это тот брокер, который работает с продюсером и на него приходит сообщение, а также у каждой партиции имеются несколько «фолловеров» (followers) — это брокеры, которые хранят копии партиций. Перед отправкой сообщения консьюмер обращается к брокеру и запрашивает данные, кто является лидером партиции.

Таким образом, общая схема сохранения сообщения в kafka выглядит следующим образом. Имеется какой-то топик, в который записываются сообщения, и есть несколько партиций (копий очередей наших сообщений), распределенных по брокерам в кластере. Продюсер вначале обращается к брокеру с вопросом, кто является лидером партиции в данном брокере, и после получения данной информации отправляет туда свое сообщение, на втором этапе, фолловеры данной партиции копируют себе отправленное сообщение на свой брокер. Так происходит с каждой партицией.

Время хранения сообщения в kafka регулируется с помощью специальных настроек.

Рассмотрим сейчас как выглядит работа консьюмера в kafka.

Каждый консьюмер должен быть частью какой-нибудь консьюмер группы. Данная группа должна иметь уникальное название и должна быть зарегистрирована в кластере. Как правило, если у нас есть несколько консьюмеров, в одной группе, то они получают сообщения из разных партиций. Желательно, чтобы количество консьюмеров было равно количеству партиций, и каждый консьюмер читал сообщения из своей партиции, таким образом, распределяется нагрузка и повышается производительность работы.

Есть еще один важный вопрос. Если мы захотим добавить консьюмера к топику не сразу, а позже или, например, произойдет сбой консьюмера, а позже он восстановится и вопрос, откуда он будет знать с какого сообщения продолжить работу? Для этого имеется специальный механизм консьюмер-офсетов (offset). Перед началом работы консьюмер делает специальный запрос к брокеру с указанием группы, топика, партиции и офсета, который должен быть помечен как обработанный. Брокер сохраняет эту информацию у себя. При сбое в работе, консьюмер запрашивает у брокера последний закомиченный офсет и продолжает читать с данной позиции сообщения.

Это упрощенное описание работы kafka-продюсера и kafka-консьюмера.

Также при описании kafka нельзя не вспомнить про один важный компонент — zookeeper.

ZooKeeper — это хранилище метаданных kafka, именно он знает в каком состоянии находятся брокеры, какая партиция играет роль лидера, сколько партиций и где они находятся, сколько у каждой партиции реплик и так далее.

Разобравшись немного с теорией приступим к нашему примеру.

Весь код примера будет доступен по ссылке.

Пример будет очень простой. Допустим у нас будет три микросервиса. Один — это продюсер — он будет производить и отправлять сообщения в kafka, в нашем случае это будет Заказ.

@Data @AllArgsConstructor @NoArgsConstructor public class Order

Второй микросервис — консьюмер, который будет читать наше сообщение и записывать его в базу данных.

И третий микросервис — также будет читать наше сообщение и просто выводить его в консоль.

Таким образом, я хочу показать, что можно настроить несколько консьюмеров, которые будут подписаны на один топик и будут получать из него сообщения, но поступать с ними по-разному.

Весь код приводить не буду, буду останавливаться только на главных моментах.

Kafka, zookeeper, kafka-ui (для просмотра сообщений в kafka), database (postgres) и pgadmin (для просмотра данных в базе) поднимем с помощью docker.

Для этого напишем следующий docker-compose.yml файл.

services: zookeeper: image: confluentinc/cp-zookeeper:6.2.4 healthcheck: test: [ "CMD", "nc", "-vz", "localhost", "2181" ] interval: 10s timeout: 3s retries: 3 environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 ports: - 22181:2181 kafka: image: confluentinc/cp-kafka:6.2.4 depends_on: zookeeper: condition: service_healthy ports: - 29092:29092 healthcheck: test: [ "CMD", "nc", "-vz", "localhost", "9092" ] interval: 10s timeout: 3s retries: 3 environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_LISTENERS: OUTSIDE://:29092,INTERNAL://:9092 KAFKA_ADVERTISED_LISTENERS: OUTSIDE://localhost:29092,INTERNAL://kafka:9092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,OUTSIDE:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 kafka-ui: image: provectuslabs/kafka-ui container_name: kafka-ui ports: - "8080:8080" restart: always depends_on: kafka: condition: service_healthy environment: KAFKA_CLUSTERS_0_NAME: local KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092 service-db: image: postgres:14.7-alpine environment: POSTGRES_USER: username POSTGRES_PASSWORD: password healthcheck: test: ["CMD-SHELL", "pg_isready", "-d", "clients_database"] interval: 10s timeout: 3s retries: 3 ports: - "15432:5432" volumes: - ./infrastructure/db/create_db.sql:/docker-entrypoint-initdb.d/create_db.sql restart: unless-stopped pgadmin: container_name: pgadmin4_container image: dpage/pgadmin4:7 restart: always environment: PGADMIN_DEFAULT_EMAIL: admin@admin.com PGADMIN_DEFAULT_PASSWORD: root ports: - "5050:80" kafka-topics-generator: image: confluentinc/cp-kafka:6.2.4 depends_on: kafka: condition: service_healthy entrypoint: [ '/bin/sh', '-c' ] command: | " # blocks until kafka is reachable kafka-topics --bootstrap-server kafka:9092 --list echo -e 'Creating kafka topics' kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic send-order-event --replication-factor 1 --partitions 2 echo -e 'Successfully created the following topics:' kafka-topics --bootstrap-server kafka:9092 --list "

Базу данных orders_database, создадим на этапе поднятия контейнера с postgres.

Топик (send-order-event) создадим с помощью команды в отдельном контейнере, здесь же создадим две партиции, так как у нас будет два консьюмера и желательно, чтобы каждый консьюмер читал из своей патриции.

Топики можно также создавать и с помощью кода.

Пройдемся по этапам создания продюсера.

Вначале необходимо сделать некоторые настройки продюсера. Это можно делать с помощью кода или прописывать в application файле. Мы это сделаем с помощью application.yml файла.

server: port: 8081 spring: kafka: bootstrap-servers: localhost:29092 producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.springframework.kafka.support.serializer.JsonSerializer properties: acks: 1 spring: json: add: type: headers: false topic: send-order: send-order-event

Здесь указываем порт, на котором будет работать kafka (должен совпадать с внешним портом, который мы открыли в docker для kafka), также необходимо указать как мы будем сериализовать ключ и значение (значение — это и будет наш заказ, поэтому здесь надо указать JsonSerializer). Также прописываем название нашего топика send-order-event, название должно совпадать с тем, что мы указали при создании топика в docker. Данное название мы потом с помощью аннотации @Value будем сетать в переменную.

Далее создадим сам сервис по отправке сообщений.

@Service @RequiredArgsConstructor public class KafkaMessagingService < @Value("$") private String sendClientTopic; private final KafkaTemplate kafkaTemplate; public void sendOrder(OrderSendEvent orderSendEvent) < kafkaTemplate.send(sendClientTopic, orderSendEvent.getBarCode(), orderSendEvent); >>

Внедряем бин private final KafkaTemplate kafkaTemplate в данный класс с помощью аннотации @RequiredArgsConstructor. Также как было сказано раньше сетаем в переменную sendClientTopic название нашего топика с application.yml файла. Далее пишем сам метод по отправке сообщения, который на вход будет принимать OrderSendEvent — то есть наш заказ. Вызываем у kafkaTemplate метод send куда передаем название топика, ключ (в качестве ключа будет выступать код продукта). Ключ нужен для того чтобы сообщения с одинаковыми ключами всегда записываются в одну и ту же партицию. Последним передаем сам заказ.

@Data @AllArgsConstructor @NoArgsConstructor public class OrderSendEvent

Создадим еще класс Producer.

@Slf4j @Component @RequiredArgsConstructor public class Producer < private final KafkaMessagingService kafkaMessagingService; private final ModelMapper modelMapper; public Order sendOrderEvent(Order order) < kafkaMessagingService.sendOrder(modelMapper.map(order, OrderSendEvent.class)); log.info("Send order from producer <>", order); return order; > >

Он нужен просто для того чтобы отделить логику отправки от маппинга сущностей.

Отправку сообщения будем производить с помощью postman, поэтому создадим еще контроллер OrderController.

@Slf4j @Validated @RestController @RequiredArgsConstructor @RequestMapping("/api/v1/orders") public class OrderController < private final Producer producer; @PostMapping @ResponseStatus(HttpStatus.OK) public Order sendOrder(@RequestBody Order order) < log.info("Send order to kafka"); producer.sendOrderEvent(order); return order; >>

Рассмотрим теперь первый консьюмер.

Вначале также создадим application.yml файл, в котором настроим наш консьюмер.

server: port: 8082 spring: kafka: bootstrap-servers: localhost:29092 consumer: group-id: "order-1" auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer properties: spring: json: trusted: packages: '*' datasource: url: jdbc:postgresql://$:$/orders_database username: username password: password liquibase: enabled: true drop-first: false change-log: classpath:db/changelog/db.changelog-master.xml default-schema: public jpa: show-sql: false open-in-view: false hibernate: ddl-auto: none properties: hibernate: dialect: org.hibernate.dialect.PostgreSQLDialect topic: send-order: send-order-event

Здесь как и в продюсере указываем порт, на котором работает kafka.

Прописываем group-id: «order-1» — так как консьюмеры должны быть объединены в группы.

Указываем настройку auto-offset-reset: earliest — она нужна для того, если мы добавим новую партицию, когда в топик пишут сообщения продюсеры, без данной настройки, мы можем потерять или не обработать кусок данных, записавшихся в новую партицию до того, как консьюмеры обновили метаданные по топику и начали читать данные из этой партиции.

Как и в продюсере указываем как мы будем уже только десериализовать наши ключ и значение. Также прописываем настройку для того чтобы JsonDeserializer доверял десериализовать только классы в доверенном пакете. То есть тут можно указать конкретный пакет или с помощью «*» — указать, что нужно доверять всем классам во всех пакетах.

Также прописываем название нашего топика send-order-event.

В данном файле также прописываем настройки по подключению к базе данных, накатыванию таблиц с помощью liquibase и чтобы выводились sql запросы к базе данных.

Далее создадим класс OrderEvent. По структуре он должен совпадать с тем классом (OrderSendEvent), который мы отправляем через продюсер.

@Data @AllArgsConstructor @NoArgsConstructor public class OrderEvent

И сам сервис по приемке сообщения.

@Slf4j @Service @AllArgsConstructor public class KafkaMessagingService < private static final String topicCreateOrder = "$"; private static final String kafkaConsumerGroupId = "$"; private final OrderService orderService; private final ModelMapper modelMapper; @Transactional @KafkaListener(topics = topicCreateOrder, groupId = kafkaConsumerGroupId, properties = ) public OrderEvent createOrder(OrderEvent orderEvent) < log.info("Message consumed <>", orderEvent); orderService.save(modelMapper.map(orderEvent, OrderDto.class)); return orderEvent; > >

Здесь сетаем переменным topicCreateOrder и kafkaConsumerGroupId с application.yml файла значения названия топика и группы.

Создаем сам метод по обработке сообщений. Вешаем на него аннотацию @KafkaListener куда передаем название топика, который надо слушать, название группы, а также передаем еще настройку по дефолтному типу данных, который мы принимаем. Данную настройку, можно прописать и в application.yml файле, но я хотел показать как можно передавать настройки каждому слушателю, или, например, у вас в группе есть слушатель, который принимает другую сущность.

Далее с полученным сообщением, то есть OrderEvent, можно выполнять различную логику, зависящую от бизнес-требований. В нашем случае мы будем сохранять наш заказ в базу данных.

Рассмотрим еще один консьюмер, он создан в другом микросервисе и его настройки идентичны первому, поэтому только покажу сам метод по приемке сообщений — он будет выводить наш заказ в консоль. Здесь я хочу показать, что на один топик могут быть подписаны несколько консьюмеров и по разному трактовать, что делать с тем сообщением, которое будет появляться в топике.

@Slf4j @Service @AllArgsConstructor public class KafkaMessagingService < private static final String topicCreateOrder = "$"; private static final String kafkaConsumerGroupId = "$"; @Transactional @KafkaListener(topics = topicCreateOrder, groupId = kafkaConsumerGroupId, properties = ) public OrderEvent printOrder(OrderEvent orderEvent) < log.info("The product: <>was ordered in quantity: <> and at a price: <>", orderEvent.getProductName(), orderEvent.getQuantity(), orderEvent.getPrice()); log.info("To pay: <>", new BigDecimal(orderEvent.getQuantity()).multiply(orderEvent.getPrice())); return orderEvent; > >

Давайте сейчас посмотрим как все это работает.

Вначале запустим наш docker-compose.yml командой docker-compose up -d в консоли.

Далее необходимо подождать, пока docker стянет необходимые образы с docker hub и на их основе запустит контейнеры.

Идем в docker desktop и мы должны увидеть следующее.

Kafka, zookeeper, kafka-ui, postgres и pgadmin должны быть запущены и работать. Зайдем в kafka-topics-generator и убедимся, что топик создался.

Далее запускаем все наши три микросервиса.

Идем в postman и отправляем json с заказом на адрес http://localhost:8081/api/v1/orders, так как мы запустили наш продюсер на порту 8081.

В логах продюсера мы должны увидеть, что сообщение отправилось.

Теперь зайдем на http://localhost:8080/ здесь мы должны увидеть в Topics наш топик.

Также в Messages мы должны увидеть наше отправленное сообщение.

И в Consumers мы можем увидеть, что у нас есть два консьюмера.

Также проверим сохранился ли наш заказ, это должен был сделать наш первый консьюмер.

В логах мы видим, что сообщение обработано.

Идем на http://localhost:5050 заходим используя креды указанные в docker-compose.yml.

Далее настраиваем подключение.

Делаем select * from orders и должны увидеть сохраненный заказ.

Теперь еще проверим как сработал наш второй консьюмер. Смотрим логи и видим, что наш второй консьюмер также отработал и вывел в консоль наш заказ.

Еще посмотрим как можно протестировать продюсер и консьюмер.

Вначале обратимся к продюсеру. Его мы протестируем с помощью EmbeddedKafka, он будет работать быстрее, чем использовать KafkaContainer, но для тестов консьюмера мы попробуем использовать KafkaContainer.

@SpringBootTest @DirtiesContext @EmbeddedKafka(partitions = 1, brokerProperties = < "listeners=PLAINTEXT://localhost:9092", "port=9092" >) public class KafkaMessageProducerServiceIT < public static final String TOPIC_NAME_SEND_CLIENT = "send-order-event"; @Autowired private KafkaMessagingService kafkaMessagingService; @Test public void it_should_send_order_event() < OrderSendEvent order = FakeOrder.getOrderSendEvent(); kafkaMessagingService.sendOrder(order); Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); properties.put(JsonDeserializer.TRUSTED_PACKAGES, "*"); properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group-java-test"); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); properties.put(JsonDeserializer.VALUE_DEFAULT_TYPE, OrderSendEvent.class); KafkaConsumerconsumer = new KafkaConsumer<>(properties); consumer.subscribe(Arrays.asList(TOPIC_NAME_SEND_CLIENT)); ConsumerRecords records = consumer.poll(Duration.ofMillis(10000L)); consumer.close(); //then assertEquals(1, records.count()); assertEquals(order.getProductName(), records.iterator().next().value().getProductName()); assertEquals(order.getBarCode(), records.iterator().next().value().getBarCode()); assertEquals(order.getQuantity(), records.iterator().next().value().getQuantity()); assertEquals(order.getPrice(), records.iterator().next().value().getPrice()); > >

Суть данного теста проста, мы внедряем наш реальный сервис по отправке сообщений KafkaMessagingService и вызываем метод sendOrder(), куда передаем тестовое сообщение. После создаем консьюмера, подключаемся к нашему топику, читаем оттуда сообщение и проверяем совпадает ли оно с отправленным.

Как видим тест прошел успешно.

Протестируем наш консьюмер, который сохраняет заказ в базу данных.

@Testcontainers @SpringBootTest class KafkaMessagingServiceIT < public static final Long ORDER_ID = 1L; public static final String TOPIC_NAME_SEND_ORDER= "send-order-event"; @Container static PostgreSQLContainerpostgreSQLContainer = new PostgreSQLContainer<>("postgres:12") .withUsername("username") .withPassword("password") .withExposedPorts(5432) .withReuse(true); @Container static final KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.4")) .withEmbeddedZookeeper() .withEnv("KAFKA_LISTENERS", "PLAINTEXT://0.0.0.0:9093 ,BROKER://0.0.0.0:9092") .withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT") .withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER") .withEnv("KAFKA_BROKER_ID", "1") .withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1") .withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", "1") .withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1") .withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1") .withEnv("KAFKA_LOG_FLUSH_INTERVAL_MESSAGES", Long.MAX_VALUE + "") .withEnv("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "0"); static < Startables.deepStart(Stream.of(postgreSQLContainer, kafkaContainer)).join(); >@DynamicPropertySource static void overrideProperties(DynamicPropertyRegistry registry) < registry.add("spring.kafka.bootstrap-servers", kafkaContainer::getBootstrapServers); registry.add("spring.datasource.url", postgreSQLContainer::getJdbcUrl); registry.add("spring.datasource.username", postgreSQLContainer::getUsername); registry.add("spring.datasource.password", postgreSQLContainer::getPassword); registry.add("spring.datasource.driver-class-name", postgreSQLContainer::getDriverClassName); >@Autowired private OrdersRepository ordersRepository; @Test void save_order() throws InterruptedException < //given String bootstrapServers = kafkaContainer.getBootstrapServers(); OrderEvent orderEvent = FakeOrder.getOrderEvent(); Order order = FakeOrder.getOrder(); MapconfigProps = new HashMap<>(); configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); ProducerFactory producerFactory = new DefaultKafkaProducerFactory<>(configProps); KafkaTemplate kafkaTemplate = new KafkaTemplate<>(producerFactory); //when SECONDS.sleep(5); kafkaTemplate.send(TOPIC_NAME_SEND_ORDER, orderEvent.getBarCode(), orderEvent); SECONDS.sleep(5); //then Order orderFromDB = ordersRepository.findById(ORDER_ID).get(); assertEquals(orderFromDB.getId(), ORDER_ID); assertEquals(orderFromDB.getProductName(), order.getProductName()); assertEquals(orderFromDB.getBarCode(), order.getBarCode()); assertEquals(orderFromDB.getQuantity(), order.getQuantity()); assertEquals(orderFromDB.getPrice(), order.getPrice().setScale(2, RoundingMode.HALF_DOWN)); assertEquals(orderFromDB.getAmount(), order.getAmount().setScale(2)); assertEquals(orderFromDB.getOrderDate().getYear(), order.getOrderDate().getYear()); assertEquals(orderFromDB.getStatus(), order.getStatus()); > >

Так как это интеграционный тест, то мы будем использовать KafkaContainer и PostgreSQLContainer, и проверим, что наше сообщение прочиталось и сохранилось в базу данных.

То есть вначале настраиваем контейнеры с kafka и postgreSQL.

Далее внедряем OrdersRepository, чтобы потом получить оттуда данные.

И сам тест тоже довольно прост. Вначале мы создаем продюсера и отправляем в наш топик сообщение с заказом. Далее с помощью ordersRepository обращаемся к базе данных, оттуда получаем наш сохраненный заказ, который должен был сам сохраниться и проверяем правильный ли он.

Данный тест будет выполняться довольно долго, так как надо еще поднять контейнеры с kafka и postgreSQL.

Как видим наш тест прошел успешно.

Спасибо. Всем кто дочитал до конца.

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *