Где apache kafka хранит данные
Перейти к содержимому

Где apache kafka хранит данные

  • автор:

Что такое Apache Kafka: как устроен и работает брокер сообщений

Apache Kafka — распределенный брокер сообщений, работающий в стриминговом режиме. В статье мы расскажем про его устройство и преимущества, а также о том, где применяют это ПО.

Изображение записи

Apache Kafka — распределенный брокер сообщений, работающий в стриминговом режиме. В статье мы расскажем про его устройство и преимущества, а также о том, где применяют «Кафку».

Что такое брокер сообщений

Главная задача брокера — обеспечение связи и обмена информацией между приложениями или отдельными модулями в режиме реального времени.

Брокер — система, преобразующая сообщение от источника данных (продюсера) в сообщение принимающей стороны (консьюмера). Брокер выступает проводником и состоит из серверов, объединенных в кластеры.

Apache Kafka — диспетчер сообщений, разработанный LinkedIn. В 2011 году был опубликован программный код. В 2012 году Kafka попал в инкубатор Apache, дальнейшая разработка ведется в рамках Apache Software Foundation. Открытое программное обеспечение с разрешительной лицензией написано на Java и Scala.

Изначально «Кафку» создавали как систему, оптимизированную под запись, и создатель Джей Крепс выбрал такое название в честь одного из своих любимых писателей.

Шаги передачи данных

Чтобы понять, как функционирует распределенная система Apache Kafka, необходимо проследить путь данных.

Событие или сообщение — данные, которые поступают из одного сервиса, хранятся на узлах Kafka и читаются другими сервисами. Сообщение состоит из:

  • Key — опциональный ключ, нужен для распределения сообщений по кластеру.
  • Value — массив байт, бизнес-данные.
  • Timestamp — текущее системное время, устанавливается отправителем или кластером во время обработки.
  • Headers — пользовательские атрибуты key-value, которые прикрепляют к сообщению.

Продюсер — поставщик данных, который генерирует сообщения — например, служебные события, логи, метрики, события мониторинга.

Консьюмер — потребитель данных, который читает и использует события, пример — сервис сбора статистики.

Взаимодействие продюсера и консьюмера сообщений

Какие сложности решает распределенная система

Сообщения могут быть однотипными или разнородными, поскольку разным потребителям нужны разные данные. Один тип событий может быть нужен всем консьюмерам, а другие — только одному.

Без брокера продюсеры должны знать получателя и резервного консьюмера, если основной недоступен. К тому же, поставщикам данных придется самостоятельно регистрировать новых консьюмеров. С помощью брокера продюсеры просто отправляют информацию в единый узел.

Managed service для Apache Kafka

Сообщения хранятся на узлах-брокерах. Kafka — масштабируемый кластер со множеством взаимозаменяемых серверов, в которые добавляются новые брокеры, распределяющие задачи между собой.

ZooKeeper — инструмент-координатор, действует как общая служба конфигурации в системе. Работает как база для хранения метаданных о состоянии узлов кластера и расположении сообщений. ZooKeeper обеспечивает гибкую и надежную синхронизацию в распределенной системе, позволяя нескольким клиентам выполнять одновременно чтение и запись.

Kafka Controller — среди брокеров Zookeeper выбирает одного, который будет обеспечивать консистентность данных.

Topic — принцип деления потока данных, базовая и основная сущность Apache Kafka. В топик складывается стрим данных, единая очередь из входящих сообщений.

Partition — для ускорения чтения и записи топики делятся на партиции. Происходит параллелизация данных. Это конфигурируемый параметр, сообщения могут отправлять несколько продюсеров и принимать несколько консьюмеров.

Managed service для Apache Kafka

Упорядочение событий происходит на уровне партиций. Принимающая сторона потребляет данные в порядке расположения в партиции. Пример: все события одного пользователя сервисы принимают упорядоченно, обработка сохраняет последовательность пути пользователя. Выстраивается конвейер данных, алгоритмы машинного обучения могут извлекать из сырой информации необходимую для бизнеса информацию.

Преимущества Apache Kafka

Брокер распределяет информацию в широковещательном режиме. Применяющийся в Apache Kafka подход нужен для масштабирования и репликации данных.

Горизонтальное масштабирование

Множество объединенных серверов гарантируют высокую доступность данных — выход из строя одного из узлов не нарушает целостность. Кластер состоит из обычных машин, а не суперкомпьютеров, их можно менять и дополнять. Система автоматически перебалансируется.

Чтобы события не потерялись, существуют механизмы репликации. Данные записываются на несколько машин, если что-то случается с сервером, он переключается на резервный. Кластер в режиме реального времени определяет, где находятся данные, и продолжает их использовать.

Офсеты

Если консьюмер падает в процессе получения данных, то, когда он запустится вновь и ему нужно будет вернутся к чтению этого сообщения, он воспользуется офсетом и продолжит с нужного места.

Взаимодействие через API

Брокеры решают проблему интеграции разных технических стеков и протоколов. Интеграция происходит просто: продюсерам и консьюмерам необходимо знать только API брокера. Они не контактируют между собой, с помощью чего достигается высокая интегрируемость с другими системами.

Принцип first in — first out

Принцип FIFO действует на консьюмеров. Чтение происходит в том же порядке, в котором пришла информация.

Где применяется Apache Kafka

Отказоустойчивая система используется в бизнесе, где необходимо собирать, хранить и обрабатывать большие неструктурированные данные. Примеры — платформы, где требуется интеграция данных из большого количества источников, сервисы стриминговой аналитики, mission-critical applications.

Big Data

Первоначально LinkedIn разработали «Кафку» для своих целей: обмена данными между службами, репликации баз данных, потоковой передачи информации о деятельности и операционных показателях приложений.

Для IBM Apache Kafka работает как средство обмена сообщениями между микросервисами. В аналитических системах американской корпорации Apache Kafka обрабатывает потоковые и событийные данные.

Uber, Twitter, Netflix и AirBnb с помощью хорошо развитых пайплайнов обработки данных передают миллиарды сообщений в день. «Кафка» решает проблемы перемещения Big data из одного источника в другой.

Издание The New York Times использует Apache Kafka для хранения и распространения опубликованного контента среди различных приложений и систем, которые делают его доступным для читателей в режиме реального времени.

Internet of Things

IoT-платформы используют архитектуру с большим количеством конечных устройств: контроллеров, датчиков, сенсоров и smart-гаджетов. ПО интернета вещей с помощью алгоритмов ML составляет графики профилактического ремонта оборудования, анализируя данные, поступающие с устройств.

ML-системы работают с онлайн-потоками, когда приборы, приложения и пользователи постоянно посылают данные, а сервисы обрабатывают их в реальном времени. Apache Kafka выступает центральным звеном в этом процессе.

Отрасли

Kafka используют организации практически в любой отрасли: разработка ПО, финансовые услуги, здравоохранение, государственное управление, транспорт, телеком, геймдев.

Сегодня Kafka пользуются тысячи компаний, более 60% входят в список Fortune 100. На официальном сайте представлен полный список корпораций и учреждений, которые используют брокера Apache.

Конкуренты

Чаще всего Kafka сравнивают с RabbitMQ. Обе системы — брокеры сообщений. Главное отличие в модели доставки: Kafka добавляет сообщение в журнал, и консьюмер сам забирает информацию из топика; брокер RabbitMQ самостоятельно отправляет сообщения получателям — помещает событие в очередь и отслеживает его статус.

«Кролик» удаляет событие после доставки, «Кафка» хранит до запланированной очистки журнала. Таким образом, брокер Apache используется как источник истории изменений.

Разработчики RabbitMQ создали системы управления потоком сообщений: мониторинг получения, маршрутизация и шаблоны доставки. Подобное гибкое управление подойдет для высокоскоростного обмена сообщениями между несколькими сервисами. Минус такого подхода в снижении производительности при высокой нагрузке.

Главный вывод — для сбора и агрегации событий из большого количества источников, логов и метрик больше подойдет Apache Kafka.

Заключение

Благодаря высокой пропускной способности и согласованности данных Apache Kafka обрабатывает огромные массивы данных в реальном времени. Системы горизонтального масштабирования и офсеты гарантируют надежность. Kafka — удачное решение для проекта с очень большими нагрузками на обработку данных. Установить это ПО можно на серверы Ubuntu, Windows, CentOS и других популярных операционных систем.

Apache Kafka: основы технологии

У Kafka есть множество способов применения, и у каждого способа есть свои особенности. В этой статье разберём, чем Kafka отличается от популярных систем обмена сообщениями; рассмотрим, как Kafka хранит данные и обеспечивает гарантию сохранности; поймём, как записываются и читаются данные.

Статья подготовлена на основе открытого занятия из видеокурса по Apache Kafka. Авторы — Анатолий Солдатов, Lead Engineer в Авито, и Александр Миронов, Infrastructure Engineer в Stripe. Базовые темы курса доступны на Youtube.

Kafka и классические сервисы очередей

Для первого погружения в технологию сравним Kafka и классические сервисы очередей, такие как RabbitMQ и Amazon SQS.

Системы очередей обычно состоят из трёх базовых компонентов:

1) сервер,
2) продюсеры, которые отправляют сообщения в некую именованную очередь, заранее сконфигурированную администратором на сервере,
3) консьюмеры, которые считывают те же самые сообщения по мере их появления.

Базовые компоненты классической системы очередей

Базовые компоненты классической системы очередей

В веб-приложениях очереди часто используются для отложенной обработки событий или в качестве временного буфера между другими сервисами, тем самым защищая их от всплесков нагрузки.

Консьюмеры получают данные с сервера, используя две разные модели запросов: pull или push.

image

pull-модель — консьюмеры сами отправляют запрос раз в n секунд на сервер для получения новой порции сообщений. При таком подходе клиенты могут эффективно контролировать собственную нагрузку. Кроме того, pull-модель позволяет группировать сообщения в батчи, таким образом достигая лучшей пропускной способности. К минусам модели можно отнести потенциальную разбалансированность нагрузки между разными консьюмерами, а также более высокую задержку обработки данных.

push-модель — сервер делает запрос к клиенту, посылая ему новую порцию данных. По такой модели, например, работает RabbitMQ. Она снижает задержку обработки сообщений и позволяет эффективно балансировать распределение сообщений по консьюмерам. Но для предотвращения перегрузки консьюмеров в случае с RabbitMQ клиентам приходится использовать функционал QS, выставляя лимиты.

Как правило, приложение пишет и читает из очереди с помощью нескольких инстансов продюсеров и консьюмеров. Это позволяет эффективно распределить нагрузку.

image

Типичный жизненный цикл сообщений в системах очередей:

  1. Продюсер отправляет сообщение на сервер.
  2. Консьюмер фетчит (от англ. fetch — принести) сообщение и его уникальный идентификатор сервера.
  3. Сервер помечает сообщение как in-flight. Сообщения в таком состоянии всё ещё хранятся на сервере, но временно не доставляются другим консьюмерам. Таймаут этого состояния контролируется специальной настройкой.
  4. Консьюмер обрабатывает сообщение, следуя бизнес-логике. Затем отправляет ack или nack-запрос обратно на сервер, используя уникальный идентификатор, полученный ранее — тем самым либо подтверждая успешную обработку сообщения, либо сигнализируя об ошибке.
  5. В случае успеха сообщение удаляется с сервера навсегда. В случае ошибки или таймаута состояния in-flight сообщение доставляется консьюмеру для повторной обработки.

Типичный жизненный цикл сообщений в системах очередей

Типичный жизненный цикл сообщений в системах очередей

С базовыми принципами работы очередей разобрались, теперь перейдём к Kafka. Рассмотрим её фундаментальные отличия.

Как и сервисы обработки очередей, Kafka условно состоит из трёх компонентов:

1) сервер (по-другому ещё называется брокер),
2) продюсеры — они отправляют сообщения брокеру,
3) консьюмеры — считывают эти сообщения, используя модель pull.

Базовые компоненты Kafka

Базовые компоненты Kafka

Пожалуй, фундаментальное отличие Kafka от очередей состоит в том, как сообщения хранятся на брокере и как потребляются консьюмерами.

  • Сообщения в Kafka не удаляются брокерами по мере их обработки консьюмерами — данные в Kafka могут храниться днями, неделями, годами.
  • Благодаря этому одно и то же сообщение может быть обработано сколько угодно раз разными консьюмерами и в разных контекстах.

В этом кроется главная мощь и главное отличие Kafka от традиционных систем обмена сообщениями.

Теперь давайте посмотрим, как Kafka и системы очередей решают одну и ту же задачу. Начнём с системы очередей.

Представим, что есть некий сайт, на котором происходит регистрация пользователя. Для каждой регистрации мы должны:

1) отправить письмо пользователю,
2) пересчитать дневную статистику регистраций.

В случае с RabbitMQ или Amazon SQS функционал может помочь нам доставить сообщения всем сервисам одновременно. Но при необходимости подключения нового сервиса придётся конфигурировать новую очередь.

image

Kafka упрощает задачу. Достаточно послать сообщения всего один раз, а консьюмеры сервиса отправки сообщений и консьюмеры статистики сами считают его по мере необходимости.

image

Kafka также позволяет тривиально подключать новые сервисы к стриму регистрации. Например, сервис архивирования всех регистраций в S3 для последующей обработки с помощью Spark или Redshift можно добавить без дополнительного конфигурирования сервера или создания дополнительных очередей.

Кроме того, раз Kafka не удаляет данные после обработки консьюмерами, эти данные могут обрабатываться заново, как бы отматывая время назад сколько угодно раз. Это оказывается невероятно полезно для восстановления после сбоев и, например, верификации кода новых консьюмеров. В случае с RabbitMQ пришлось бы записывать все данные заново, при этом, скорее всего, в отдельную очередь, чтобы не сломать уже имеющихся клиентов.

Структура данных

Наверняка возникает вопрос: «Раз сообщения не удаляются, то как тогда гарантировать, что консьюмер не будет читать одни и те же сообщения (например, при перезапуске)?».

Для ответа на этот вопрос разберёмся, какова внутренняя структура Kafka и как в ней хранятся сообщения.

Каждое сообщение (event или message) в Kafka состоит из ключа, значения, таймстампа и опционального набора метаданных (так называемых хедеров).

image

Сообщения в Kafka организованы и хранятся в именованных топиках (Topics), каждый топик состоит из одной и более партиций (Partition), распределённых между брокерами внутри одного кластера. Подобная распределённость важна для горизонтального масштабирования кластера, так как она позволяет клиентам писать и читать сообщения с нескольких брокеров одновременно.

Когда новое сообщение добавляется в топик, на самом деле оно записывается в одну из партиций этого топика. Сообщения с одинаковыми ключами всегда записываются в одну и ту же партицию, тем самым гарантируя очередность или порядок записи и чтения.

Для гарантии сохранности данных каждая партиция в Kafka может быть реплицирована n раз, где n — replication factor. Таким образом гарантируется наличие нескольких копий сообщения, хранящихся на разных брокерах.

image

У каждой партиции есть «лидер» (Leader) — брокер, который работает с клиентами. Именно лидер работает с продюсерами и в общем случае отдаёт сообщения консьюмерам. К лидеру осуществляют запросы фолловеры (Follower) — брокеры, которые хранят реплику всех данных партиций. Сообщения всегда отправляются лидеру и, в общем случае, читаются с лидера.

Чтобы понять, кто является лидером партиции, перед записью и чтением клиенты делают запрос метаданных от брокера. Причём они могут подключаться к любому брокеру в кластере.

image

Основная структура данных в Kafka — это распределённый, реплицируемый лог. Каждая партиция — это и есть тот самый реплицируемый лог, который хранится на диске. Каждое новое сообщение, отправленное продюсером в партицию, сохраняется в «голову» этого лога и получает свой уникальный, монотонно возрастающий offset (64-битное число, которое назначается самим брокером).

Как мы уже выяснили, сообщения не удаляются из лога после передачи консьюмерам и могут быть вычитаны сколько угодно раз.

Время гарантированного хранения данных на брокере можно контролировать с помощью специальных настроек. Длительность хранения сообщений при этом не влияет на общую производительность системы. Поэтому совершенно нормально хранить сообщения в Kafka днями, неделями, месяцами или даже годами.

image

Consumer Groups

Теперь давайте перейдём к консьюмерам и рассмотрим их принципы работы в Kafka. Каждый консьюмер Kafka обычно является частью какой-нибудь консьюмер-группы.

Каждая группа имеет уникальное название и регистрируется брокерами в кластере Kafka. Данные из одного и того же топика могут считываться множеством консьюмер-групп одновременно. Когда несколько консьюмеров читают данные из Kafka и являются членами одной и той же группы, то каждый из них получает сообщения из разных партиций топика, таким образом распределяя нагрузку.

Вернёмся к нашему примеру с топиком сервиса регистрации и представим, что у сервиса отправки писем есть своя собственная консьюмер-группа с одним консьюмером c1 внутри. Значит, этот консьюмер будет получать сообщения из всех партиций топика.

image

Если мы добавим ещё одного консьюмера в группу, то партиции автоматически распределятся между ними, и c1 теперь будет читать сообщения из первой и второй партиции, а c2 — из третьей. Добавив ещё одного консьюмера (c3), мы добьёмся идеального распределения нагрузки, и каждый из консьюмеров в этой группе будет читать данные из одной партиции.

image

А вот если мы добавим в группу ещё одного консьюмера (c4), то он не будет задействован в обработке сообщений вообще.

Важно понять: внутри одной консьюмер-группы партиции назначаются консьюмерам уникально, чтобы избежать повторной обработки.

Если консьюмеры не справляются с текущим объёмом данных, то следует добавить новую партицию в топик. Только после этого консьюмер c4 начнёт свою работу.

Механизм партиционирования является нашим основным инструментом масштабирования Kafka. Группы являются инструментом отказоустойчивости.

Кстати, как вы думаете, что будет, если один из консьюмеров в группе упадёт? Совершенно верно: партиции автоматически распределятся между оставшимися консьюмерами в этой группе.

Добавлять партиции в Kafka можно на лету, без перезапуска клиентов или брокеров. Клиенты автоматически обнаружат новую партицию благодаря встроенному механизму обновления метаданных. Однако, нужно помнить две важные вещи:

  1. Гарантия очерёдности данных — если вы пишете сообщения с ключами и хешируете номер партиции для сообщений, исходя из общего числа, то при добавлении новой партиции вы можете просто сломать порядок этой записи.
  2. Партиции невозможно удалить после их создания, можно удалить только весь топик целиком.

И ещё неочевидный момент: если вы добавляете новую партицию на проде, то есть в тот момент, когда в топик пишут сообщения продюсеры, то важно помнить про настройку auto.offset.reset=earliest в консьюмере, иначе у вас есть шанс потерять или просто не обработать кусок данных, записавшихся в новую партицию до того, как консьюмеры обновили метаданные по топику и начали читать данные из этой партиции.

Помимо этого, механизм групп позволяет иметь несколько несвязанных между собой приложений, обрабатывающих сообщения.

image

Как мы обсуждали ранее, можно добавить новую группу консьюмеров к тому же самому топику, например, для обработки и статистики регистраций. Эти две группы будут читать одни и те же сообщения из топика тех самых ивентов регистраций — в своём темпе, со своей внутренней логикой.

А теперь, зная внутреннее устройство консьюмеров в Kafka, давайте вернёмся к изначальному вопросу: «Каким образом мы можем обозначить сообщения в партиции, как обработанные?».

Для этого Kafka предоставляет механизм консьюмер-офсетов. Как мы помним, каждое сообщение партиции имеет свой собственный, уникальный, монотонно возрастающий офсет. Именно этот офсет и используется консьюмерами для сохранения партиций.

Консьюмер делает специальный запрос к брокеру, так называемый offset-commit с указанием своей группы, идентификатора топик-партиции и, собственно, офсета, который должен быть отмечен как обработанный. Брокер сохраняет эту информацию в своём собственном специальном топике. При рестарте консьюмер запрашивает у сервера последний закоммиченный офсет для нужной топик-партиции, и просто продолжает чтение сообщений с этой позиции.

В примере консьюмер в группе email-service-group, читающий партицию p1 в топике registrations, успешно обработал три сообщения с офсетами 0, 1 и 2. Для сохранения позиций консьюмер делает запрос к брокеру, коммитя офсет 3. В случае рестарта консьюмер запросит свою последнюю закоммиченную позицию у брокера и получит в ответе 3. После чего начнёт читать данные с этого офсета.

image

Консьюмеры вольны коммитить совершенно любой офсет (валидный, который действительно существует в этой топик-партиции) и могут начинать читать данные с любого офсета, двигаясь вперёд и назад во времени, пропуская участки лога или обрабатывая их заново.

Ключевой для понимания факт: в момент времени может быть только один закоммиченный офсет для топик-партиции в консьюмер-группе. Иными словами, мы не можем закоммитить несколько офсетов для одной и той же топик-партиции, эмулируя каким-то образом выборочный acknowledgment (как это делалось в системах очередей).

Представим, что обработка сообщения с офсетом 1 завершилась с ошибкой. Однако мы продолжили выполнение нашей программы в консьюмере и запроцессили сообщение с офсетом 2 успешно. В таком случае перед нами будет стоять выбор: какой офсет закоммитить — 1 или 3. В настоящей системе мы бы рекомендовали закоммитить офсет 3, добавив при этом функционал, отправляющий ошибочное сообщение в отдельный топик для повторной обработки (ручной или автоматической). Подобные алгоритмы называются Dead letter queue.

Разумеется, консьюмеры, находящиеся в разных группах, могут иметь совершенно разные закоммиченные офсеты для одной и той же топик-партиции.

Apache ZooKeeper

В заключение нужно упомянуть об ещё одном важном компоненте кластера Kafka — Apache ZooKeeper.

ZooKeeper выполняет роль консистентного хранилища метаданных и распределённого сервиса логов. Именно он способен сказать, живы ли ваши брокеры, какой из брокеров является контроллером (то есть брокером, отвечающим за выбор лидеров партиций), и в каком состоянии находятся лидеры партиций и их реплики.

В случае падения брокера именно в ZooKeeper контроллером будет записана информация о новых лидерах партиций. Причём с версии 1.1.0 это будет сделано асинхронно, и это важно с точки зрения скорости восстановления кластера. Самый простой способ превратить данные в тыкву — потеря информации в ZooKeeper. Тогда понять, что и откуда нужно читать, будет очень сложно.

В настоящее время ведутся активные работы по избавлению Kafka от зависимости в виде ZooKeeper, но пока он всё ещё с нами (если интересно, посмотрите на Kafka improvement proposal 500, там подробно расписан план избавления от ZooKeeper).

Важно помнить, что ZooKeeper по факту является ещё одной распределённой системой хранения данных, за которой необходимо следить, поддерживать и обновлять по мере необходимости.

Традиционно ZooKeeper раскатывается отдельно от брокеров Kafka, чтобы разделить границы возможных отказов. Помните, что падение ZooKeeper — это практически падение всего кластера Kafka. К счастью, нагрузка на ZooKeeper при нормальной работе кластера минимальна. Клиенты Kafka никогда не коннектятся к ZooKeeper напрямую.

Практический взгляд на хранение в Kafka

Kafka повсюду. Где есть микросервисы и распределенные вычисления, а они сейчас популярны, там почти наверняка есть и Kafka. В статье я попытаюсь объяснить, как в Kafka работает механизм хранения.

Я, конечно, постарался не усложнять, но копать будем глубоко, поэтому какое-то базовое представление о Kafka не помешает. Иначе не все будет понятно. В общем, продолжайте читать на свой страх и риск.

Обычно считается, что Kafka — это распределенная и реплицированная очередь сообщений. С технической точки зрения все верно, но термин очередь сообщений не все понимают одинаково. Я предпочитаю другое определение: распределенный и реплицированный журнал коммитов. Эта формулировка кажется более точной, ведь мы все прекрасно знаем, как журналы записываются на диск. Просто в этом случае на диск попадают сообщения, отправленные в Kafka.

Применительно к хранению в Kafka используется два термина: партиции и топики. Партиции — это единицы хранения сообщений, а топики — что-то вроде контейнеров, в которых эти партиции находятся.

С основной теорией мы определились, давайте перейдем к практике.

Я создам в Kafka топик с тремя партициями. Если хотите повторять за мной, вот как выглядит команда для локальной настройки Kafka в Windows.

kafka-topics.bat --create --topic freblogg --partitions 3 --replication-factor 1 --zookeeper localhost:2181

В каталоге журналов Kafka создано три каталога:

$ tree freblogg* freblogg-0 |-- 00000000000000000000.index |-- 00000000000000000000.log |-- 00000000000000000000.timeindex `-- leader-epoch-checkpoint freblogg-1 |-- 00000000000000000000.index |-- 00000000000000000000.log |-- 00000000000000000000.timeindex `-- leader-epoch-checkpoint freblogg-2 |-- 00000000000000000000.index |-- 00000000000000000000.log |-- 00000000000000000000.timeindex `-- leader-epoch-checkpoint

Мы создали в топике три партиции, и у каждой — свой каталог в файловой системе. Еще тут есть несколько файлов (index, log и т д.), но о них чуть позже.

Обратите внимание, что в Kafka топик — это логическое объединение, а партиция — фактическая единица хранения. То, что физически хранится на диске. Как устроены партиции?

Партиции

В теории партиция — это неизменяемая коллекция (или последовательность) сообщений. Мы можем добавлять сообщения в партицию, но не можем удалять. И под «мы» я подразумеваю продюсеров в Kafka. Продюсер не может удалять сообщения из топика.

Сейчас мы отправим в топик пару сообщений, но сначала обратите внимание на размер файлов в папках партиций.

$ ls -lh freblogg-0 total 20M - freblogg 197121 10M Aug 5 08:26 00000000000000000000.index - freblogg 197121 0 Aug 5 08:26 00000000000000000000.log - freblogg 197121 10M Aug 5 08:26 00000000000000000000.timeindex - freblogg 197121 0 Aug 5 08:26 leader-epoch-checkpoint

Как видите, файлы index вместе весят 20 МБ, а файл log совершенно пустой. В папках freblogg-1 и freblogg-2 то же самое.
Давайте отправим сообщения через console producer и посмотрим, что будет:

kafka-console-producer.bat --topic freblogg --broker-list localhost:9092

Я отправил два сообщения — сначала ввел стандартное «Hello World», а потом нажал на Enter, и это второе сообщение. Еще раз посмотрим на размеры файлов:

$ ls -lh freblogg* freblogg-0: total 20M - freblogg 197121 10M Aug 5 08:26 00000000000000000000.index - freblogg 197121 0 Aug 5 08:26 00000000000000000000.log - freblogg 197121 10M Aug 5 08:26 00000000000000000000.timeindex - freblogg 197121 0 Aug 5 08:26 leader-epoch-checkpoint freblogg-1: total 21M - freblogg 197121 10M Aug 5 08:26 00000000000000000000.index - freblogg 197121 68 Aug 5 10:15 00000000000000000000.log - freblogg 197121 10M Aug 5 08:26 00000000000000000000.timeindex - freblogg 197121 11 Aug 5 10:15 leader-epoch-checkpoint freblogg-2: total 21M - freblogg 197121 10M Aug 5 08:26 00000000000000000000.index - freblogg 197121 79 Aug 5 09:59 00000000000000000000.log - freblogg 197121 10M Aug 5 08:26 00000000000000000000.timeindex - freblogg 197121 11 Aug 5 09:59 leader-epoch-checkpoint

Два сообщения заняли две партиции, и файлы log в них теперь имеют размер. Это потому, что сообщения в партиции хранятся в файле xxxx.log. Давайте заглянем в файл log и убедимся, что сообщение и правда там.

$ cat freblogg-2/*.log @^@^B°£æÃ^@^K^Xÿÿÿÿÿÿ^@^@^@^A"^@^@^A^VHello World^@

Файлы с форматом log не очень удобно читать, но мы все же видим в конце «Hello World», то есть файл обновился, когда мы отправили сообщение в топик. Второе сообщение мы отправили в другую партицию.

Обратите внимание, что первое сообщение попало в третью партицию (freblogg-2), а второе — во вторую (freblogg-1). Для первого сообщения Kafka выбирает партицию произвольно, а следующие просто распределяет по кругу (round-robin). Если мы отправим третье сообщение, Kafka запишет его во freblogg-0 и дальше будет придерживаться этого порядка. Мы можем и сами выбирать партицию, указав ключ. Kafka хранит все сообщения с одним ключом в одной и той же партиции.

Каждому новому сообщению в партиции присваивается Id на 1 больше предыдущего. Этот Id еще называют смещением (offset). У первого сообщения смещение 0, у второго — 1 и т. д., каждое следующее всегда на 1 больше предыдущего.

Давайте используем инструмент Kafka, чтобы понять, что это за странные символы в файле log. Нам они кажутся бессмысленными, но для Kafka это метаданные каждого сообщения в очереди. Выполним команду:

kafka-run-class.bat kafka.tools.DumpLogSegments --deep-iteration --print-data-log --files logs\freblogg-2\00000000000000000000.log
umping logs\freblogg-2\00000000000000000000.log Starting offset: 0 offset: 0 position: 0 CreateTime: 1533443377944 isvalid: true keysize: -1 valuesize: 11 producerId: -1 headerKeys: [] payload: Hello World offset: 1 position: 79 CreateTime: 1533462689974 isvalid: true keysize: -1 valuesize: 6 producerId: -1 headerKeys: [] payload: amazon

(Я удалил из выходных данных кое-что лишнее.)

Здесь мы видим смещение, время создания, размер ключа и значения, а еще само сообщение (payload).

Надо понимать, что партиция привязана к брокеру. Если у нас, допустим, три брокера, а папка freblogg-0 существует в broker-1, в других брокерах ее не будет. У одного топика могут быть партиции в нескольких брокерах, но одна партиция всегда существует в одном брокере Kafka (если установлен коэффициент репликации по умолчанию 1, но об этом чуть позже).

Сегменты

Что это за файлы index и log в каталоге партиции? Партиция, может, и единица хранения в Kafka, но не минимальная. Каждая партиция разделена на сегменты, то есть коллекции сообщений. Kafka не хранит все сообщения партиции в одном файле (как в файле лога), а разделяет их на сегменты. Это дает несколько преимуществ. (Разделяй и властвуй, как говорится.)

Главное — это упрощает стирание данных. Я уже говорил, что сами мы не можем удалить сообщение из партиции, но Kafka может это сделать на основе политики хранения для топика. Удалить сегмент проще, чем часть файла, особенно когда продюсер отправляет в него данные.

$ ls -lh freblogg-0 total 20M - freblogg 197121 10M Aug 5 08:26 00000000000000000000.index - freblogg 197121 0 Aug 5 08:26 00000000000000000000.log - freblogg 197121 10M Aug 5 08:26 00000000000000000000.timeindex - freblogg 197121 0 Aug 5 08:26 leader-epoch-checkpoint

Нули (00000000000000000000) в файлах log и index в каждой папке партиции — это имя сегмента. У файла сегмента есть файлы segment.log , segment.index и segment.timeindex .

Kafka всегда записывает сообщения в файлы сегментов в рамках партиции, причем у нас всегда есть активный сегмент для записи. Когда Kafka достигает лимита по размеру сегмента, создается новый файл сегмента, который станет активным.

В имени каждого файла сегмента отражается смещение от первого сообщения. На картинке выше в сегменте 0 содержатся сообщения со смещением от 0 до 2, в сегменте 3 — от 3 до 5, и так далее. Последний сегмент, шестой, сейчас активен.

$ ls -lh freblogg* freblogg-0: total 20M - freblogg 197121 10M Aug 5 08:26 00000000000000000000.index - freblogg 197121 0 Aug 5 08:26 00000000000000000000.log - freblogg 197121 10M Aug 5 08:26 00000000000000000000.timeindex - freblogg 197121 0 Aug 5 08:26 leader-epoch-checkpoint freblogg-1: total 21M - freblogg 197121 10M Aug 5 08:26 00000000000000000000.index - freblogg 197121 68 Aug 5 10:15 00000000000000000000.log - freblogg 197121 10M Aug 5 08:26 00000000000000000000.timeindex - freblogg 197121 11 Aug 5 10:15 leader-epoch-checkpoint freblogg-2: total 21M - freblogg 197121 10M Aug 5 08:26 00000000000000000000.index - freblogg 197121 79 Aug 5 09:59 00000000000000000000.log - freblogg 197121 10M Aug 5 08:26 00000000000000000000.timeindex - freblogg 197121 11 Aug 5 09:59 leader-epoch-checkpoint

У нас всего по одному сегменту в каждой партиции, поэтому они называются 00000000000000000000. Раз других файлов сегментов нет, сегмент 00000000000000000000 и будет активным.

По умолчанию сегменту выдается целый гигабайт, но представим, что мы изменили параметры, и теперь в каждый сегмент помещается только три сообщения. Посмотрим, что получится.

Допустим, мы отправили в партицию freblogg-2 три сообщения, и она выглядит так:

Три сообщения — это наш лимит. На следующем сообщении Kafka автоматически закроет текущий сегмент, создаст новый, сделает его активным и сохранит новое сообщение в файле log этого сегмента. (Я не показываю предыдущие нули, чтобы было проще воспринять).

freblogg-2 |-- 00.index |-- 00.log |-- 00.timeindex |-- 03.index |-- 03.log |-- 03.timeindex `--

Удивительное дело, но новый сегмент называется не 01. Мы видим 03.index , 03.log . Почему так?

Kafka называет сегмент по имени минимального смещения в нем. Новое сообщение в партиции имеет смещение 3, поэтому Kafka так и называет новый сегмент. Раз у нас есть сегменты 00 и 03, мы можем быть уверены, что сообщения со смещениями 0, 1 и 2 и правда находятся в сегменте 00. Новые сообщения в партиции freblogg-2 со смещениями 3 ,4 и 5 будут храниться в сегменте 03.

В Kafka мы часто читаем сообщения по определенному смещению. Искать смещение в файле log затратно, особенно если файл разрастается до неприличных размеров (по умолчанию это 1 ГБ). Для этого нам и нужен файл .index . В файле index хранятся смещения и физическое расположение сообщения в файле log.

Файл index для файла log, который я приводил в кратком отступлении, будет выглядеть как-то так:

Если нужно прочитать сообщение со смещением 1, мы ищем его в файле index и видим, что его положение — 79. Переходим к положению 79 в файле log и читаем. Это довольно эффективный способ — мы быстро находим нужное смещение в уже отсортированном файле index с помощью бинарного поиска.

Параллелизм в партициях

Чтобы гарантировать порядок чтения сообщений из партиции, Kafka дает доступ к партиции только одному консюмеру (из группы консюмеров). Если партиция получает сообщения a, f и k, консюмер читает их в том же порядке: a, f и k. Это важно, ведь порядок потребления сообщений на уровне топика не гарантирован, если у вас несколько партиций.

Если консюмеров будет больше, параллелизм не увеличится. Нужно больше партиций. Чтобы два консюмера параллельно считывали данные из топика, нужно создать две партиции — по одной на каждого. Партиции в одном топике могут находиться в разных брокерах, поэтому два консюмера топика могут считывать данные из двух разных брокеров.

Топики

Наконец, переходим к топикам. Мы уже кое-что знаем о них. Главное, что нужно знать: топик — это просто логическое объединение нескольких партиций.

Топик может быть распределен по нескольким брокерам через партиции, но каждая партиция находится только в одном брокере. У каждого топика есть уникальное имя, от которого зависят имена партиций.

Репликация

Как работает репликация? Создавая топик в Kafka, мы указываем для него коэффициент репликации — replication-factor . Допустим, у нас два брокера и мы устанавливаем replication-factor 2 . Теперь Kafka попытается всегда создавать бэкап, или реплику, для каждой партиции в топике. Kafka распределяет партиции примерно так же, как HDFS распределяет блоки данных по нодам.

Допустим, для топика freblogg мы установили коэффициент репликации 2. Мы получим примерно такое распределение партиций:

Даже если реплицированная партиция находится в другом брокере, Kafka не разрешает ее читать, потому что в каждом наборе партиций есть LEADER, то есть лидер, и FOLLOWERS — ведомые, которые остаются в резерве. Ведомые периодически синхронизируются с лидером и ждут своего звездного часа. Когда лидер выйдет из строя, один из in-sync ведомых будет выбран новым лидером, и вы будете получать данные из этой партиции.

Лидер и ведомый одной партиции всегда находятся в разных брокерах. Думаю, не нужно объяснять, почему.

Мы подошли к концу этой длинной статьи. Если вы добрались до этого места — поздравляю. Теперь вы знаете почти все о хранении данных в Kafka. Давайте повторим, чтобы ничего не забыть.

Итоги

  • В Kafka данные хранятся в топиках.
  • Топики разделены на партиции.
  • Каждая партиция разделена на сегменты.
  • У каждого сегмента есть файл log, где хранится само сообщение, и файл index, где хранится позиция сообщения в файле log.
  • У одного топика могут быть партиции в разных брокерах, но сама партиция всегда привязана к одному брокеру.
  • Реплицированные партиции существуют пассивно. Вы обращаетесь к ним, только если сломался лидер.

От редакции:
Более подробно от работе с Apache Kafka можно узнать на курсе Слёрма. Курс сейчас в разработке, релиз 7 апреля 2021. В программе бесплатные базовые уроки, они уже доступны на Youtube и платная продвинутая часть.

Дисковое хранилище Kafka

ArcGIS GeoEvent Server использует Apache Kafka для управления всем трафиком событий от входов до сервисов GeoEvent, а затем снова от сервисов GeoEvent до выходов. Kafka предоставляет набор отделов (очередей сообщений) для событий, которые будут опубликованы, и для потребителей, подписывающихся на эти сообщения о событиях. Очереди отделов Kafka управляются на диске для постоянного хранения и для восстановления очередей сообщений после падения системы.

Основы GeoEvent Server Kafka

У каждого входа и выхода GeoEvent Server есть отдел Kafka

У каждого входа и выхода есть отдел

Каждый отдел Kafka разбит на несколько разделов. Разделы разбивают события на три отдельные очереди сообщений для распараллеливания. Каждый отдел Kafka настроен по умолчанию создавать три раздела отделов. Подписчик этого отдела раскрутит несколько потребителей событий, которые работают параллельно, чтобы повысить производительность.

Разделы отделов в отделе

Каждый раздел отделов Kafka реплицируется дважды для обеспечения отказоустойчивости и параллелизма. Реплики распределяются между тремя отдельными папками на диске.

Реплики разделов отдела в отделе

Обратите внимание, что Kafka создает и управляет большим набором разделов для потребительских посторонних отделов. Такое большое количество разделов – то, что обеспечивает системе хорошую производительность благодаря распараллеливанию.

Общие рекомендации по размеру диска

Для новой установки GeoEvent Server сервису ArcGIS GeoEvent Gateway требуется как минимум 1 ГБ дискового пространства. Каждый вход или выход, потребует минимум 720 МБ дополнительного дискового пространства перед обработкой каких-либо событий. Обратите внимание, что все размеры являются минимальными оценками и, вероятно, будут увеличиваться по мере увеличения количества элементов, которые вы настраиваете в GeoEvent Server .

Настройки GeoEvent Server Kafka

Вы можете изменить поведение экземпляра Kafka для GeoEvent Server , изменив файл свойств Kafka . Основная причина для изменения файла свойств заключается в изменении расположения файлов на диске. Тем не менее, существуют редкие случаи, когда другие свойства может потребоваться изменить.

Примечание:

Перед редактированием файла свойств ниже вы должны остановить сервисы GeoEvent Server Windows или демоны Linux, в зависимости от вашей операционной системы. После того, как файлы будут сохранены и закрыты, запустите сервисы GeoEvent Server , и будут использоваться обновленные свойства.

Файл свойств Kafka

Файл свойств, содержащий настройки Kafka ( kafka.properties ) для GeoEvent Server , можно найти в следующих местоположениях, в зависимости от вашей операционной системы.

  • Windows (по умолчанию) — C:\Program Files\ArcGIS\server\geoevent\gateway\etc\kafka.properties
  • Linux (по умолчанию) — /home/arcgis/server/GeoEvent/gateway/etc/kafka.properties

Настройки по умолчанию в этом файле установлены на оптимизацию производительности за счет увеличения использования диска.

Хранилище отделов

Отделы Kafka в GeoEvent Server хранятся в одном из следующих местоположений, в зависимости от вашей операционной системы.

  • Windows (по умолчанию) — C:\ProgramData\ESRI\GeoEvent-Gateway\kafka\
  • Linux (по умолчанию)— /home/arcgis/.esri/GeoEvent-Gateway/config.[machine name]/kafka/ (например, /home/arcgis/.esri/GeoEvent-Gateway/config.gesdev01/kafka/ )

В папке kafka\ будут три папки с журналами, где хранятся реплики разделов: log\ , log1\ и log2\ .

Для изменения местоположения отделов Kafka , измените следующие свойства, в зависимости от вашей операционной системы:

  • Свойства по умолчанию для Windows :
    • gateway.data.dir=C://ProgramData//Esri//GeoEvent-Gateway//
    • log.dirs=kafka/logs,kafka/logs1,kafka/logs2
    • gateway.data.dir=/home/arcgis/.esri/GeoEvent-Gateway/config.[machine name] (например, /home/arcgis/.esri/GeoEvent-Gateway/config.gesdev01 )
    • log.dirs=kafka/logs,kafka/logs1,kafka/logs2

    Разделы отделов

    В GeoEvent Server число по умолчанию разделов в теме равно трем. Таким образом, если вы изучите папку журнала, в которой хранятся ваши отделы, вы найдете три папки с одинаковыми именами и индексами в конце (-1, -2 и -3). Внутри каждой папки раздела, Kafka ведет журнал всех данных в разделе отдела на данный момент. Для изменения числа разделов в отделе измените следующее свойство:

    Репликация отдела

    В GeoEvent Server число реплик разделов отделов по умолчанию равно двум. Таким образом, если вы изучите каждую папку журналов, в которых хранятся ваши отделы, вы найдете две папки с одинаковыми именами и индексами в конце (-1, -2 или -3). Каждая из папок log\ , log1\ и log2\ отвечает за хранение одной реплики из двух папок разделов (какие два раздела получит папка выбирается случайным образом). Для изменения числа реплик разделов в отделе измените следующее свойство:

    Размеры файлов раздела в отделе

    По умолчанию каждый файл журнала раздела в отделе Kafka начинается с минимума в 20 МБ и растет до максимума в 100 МБ, прежде чем создается новый файл журнала. Можно иметь несколько файлов журнала в реплике раздела одновременно. Планируйте как минимум 720 МБ [(100 МБ + 20 МБ) x 3 раздела x 2 реплики = 720 МБ] на ввод/вывод. В крайних случаях высокоскоростных потоков событий каждая папка реплики раздела в отделе может увеличиваться в 3–4 раза по сравнению с максимальным размером файла журнала (до 300–400 МБ на реплику раздела). Для одного отдела с тремя разделами общее дисковое пространство может вырасти до 1800 – 2400 МБ в любой момент. Умножив этот максимальный размер на число входов и выходов вы получите необходимый размер диска, нужный для Kafka в GeoEvent Server . Свойство ниже контролирует максимальный размер файла журнала перед переходом на новый файл (по умолчанию 100 МБ):

    Если вы работаете с высокоскоростными данными, можете получить несколько файлов журнала по 100 МБ, в противном случае у вас может быть только один. Для данных с меньшей скоростью, чем меньше размер, установленный для этого свойства, тем лучше. Для данных с большей скоростью, чем больше размер, установленный для этого свойства, тем лучше. Если вы установите слишком малый размер, то в Kafka постоянно будет происходить переход на новые файлы. Если вы установите слишком большой размер, то Kafka будет редко переходить на новые файлы журнала и старые события будут храниться в очереди дольше, чем необходимо.

    Другая настройка, влияющая на количество потребляемого дискового пространства разделами отделов Kafka – байты хранения. Это свойство дает указание Kafka всегда хранить минимальный объем данных. Значение этого свойства по умолчанию составляет 100 МБ. Поэтому даже если Kafka решает, что может и должен удалить старые данные, то размер оставшихся данных никогда не будет ниже 120 МБ (100 МБ для старых файлов журнала и 20 МБ для нового файла журнала). Как и в случае со свойством байтов сегмента, описанным выше, если вы работаете с более низкими скоростями данных, вы можете уменьшить значение этого свойства. Когда работаете с высокоскоростными данными используйте 100 МБ по умолчанию. Свойство ниже контролирует минимальный объем данных, который должен оставаться в файле журнала перед удалением старых данных (по умолчанию 100 МБ):

    Управление разделами отделов

    Поскольку подписчики используют события из очереди разделов, события станут устаревшими, если они отмечены как использованные всеми подписчиками. Время, в течение которого Kafka хранит старые сообщения и частота, с которой Kafka удаляет старые сообщения могут быть настроены при помощи следующих свойств.

    • log.retention.hours=1
    • log.retention.check.interval.ms=30000

    По умолчанию срок хранения – 1 час. Любые файлы данных старше 1 часа и не хранящие в настоящий момент активные данные будут удалены. Если файл все еще активно используется для хранения данных (как это может быть в случае данных с низким объемом / скоростью), он не будет удален. Kafka по умолчанию проверяет старые файлы данных каждые 30 секунд.

    Свойства управления файлами разделов

    Свойства ниже могут быть добавлены в файл свойств Kafka для управления скоростью перехода файлов раздела. Настройка этого свойства может улучшить использование дискового пространства, но также может повлиять на производительность, если установлено слишком низкое значение. Повышение этого свойства увеличит использование дискового пространства.

    • log.roll.ms=1800000
    • log.roll.jitter.ms=180000

    Первое свойство указывает Kafka переходить к новому файлу, заменяя его новым каждые 30 минут. Kafka создаст новый файл данных будет создаваться каждые 30 минут, независимо от размеров старого файла данных. Для потоков данных с низкой скоростью это может устранить необходимость поддерживать старые данные, если файл данных заполняется не очень часто. Второе свойство определяет насколько постоянно Kafka будет выполнять переход к новому файлу данных. Рекомендованное значение – 3 минуты, означающее, что раз в 3 минуты Kafka будет проверять необходимость перехода к новому файлу данных.

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *