Kafka Connect¶
Kafka Connect – компонент Apache Kafka с открытым исходным кодом, является основой для подключения Kafka к внешним системам, таким как базы данных, хранилища key-value, поисковые индексы и файловые системы. С Kafka Connect можно использовать существующие реализации коннекторов для перемещения данных в сервис Kafka и из него:
- Source Connector – принимает базы данных и обновляет таблицы потоков для топиков Kafka. Также собирает метрики со всех серверов приложений в топики Kafka, делая данные доступными для потоковой обработки с низкой задержкой;
- Sink Connector – доставляет данные из топиков Kafka во вторичные индексы, такие как Elasticsearch, или в пакетные системы для автономного анализа, такие как Hadoop.
Kafka Connect ориентирован на потоковую передачу данных из сервиса Kafka и в него, что упрощает написание высококачественных, надежных и высокопроизводительных плагинов. Это также позволяет фреймворку давать гарантии, которые трудно достичь с помощью других структур. Kafka Connect является неотъемлемым компонентом конвейера ETL в сочетании с сервисом Kafka и потоковой обработкой.
Kafka Connect может работать либо как автономный процесс для выполнения заданий на одной машине (например, сбор журналов), либо как распределенный, масштабируемый, отказоустойчивый сервис, поддерживающий всю структуру. Это позволяет сократить масштаб до разработки, тестирования и небольших продуктовых развертываний с низким барьером для входа и низкими эксплуатационными накладными расходами, а также увеличить масштаб поддержки конвейера данных большой организации.
Основные преимущества использования Kafka Connect:
- Data Centric Pipeline – использование значимых абстракций данных для извлечения или передачи данных в Kafka;
- Flexibility and Scalability (гибкость и масштабируемость) – работа с потоковыми и пакетно-ориентированными системами на одном узле или масштабирование до сервиса по всей ширине организации;
- Reusability and Extensibility (повторное использование и расширяемость) – использование существующих коннекторов и возможность расширения их для адаптации к конкретным потребностям и сокращения времени на разработку.
Connectors & Tasks¶
Копирование данных между сервисом Kafka и сторонней системой осуществляется посредством создаваемых пользователями инстансов Kafka Connectors. Коннекторы бывают двух видов: SourceConnectors – импортируют данные из другой системы, и SinkConnectors – экспортируют данные в другую систему. Например, JDBCSourceConnector импортирует реляционную базу данных в Kafka, а HDFSSinkConnector экспортирует содержимое топика Kafka в файлы HDFS.
Реализации класса Connector не выполняют копирование данных самостоятельно: их конфигурация описывает набор данных для копирования, и Connector отвечает за разбиение этого задания на набор задач – Tasks, которые могут быть распределены между объектами Kafka Connect. Tasks также бывают двух видов: SourceTask и SinkTask. При необходимости реализация класса Connector может отслеживать изменения данных внешних систем и запрашивать реконфигурацию задачи.
С назначением данных, которые должны быть скопированы, каждая задача Task должна скопировать свое подмножество данных в сервис Kafka или из него. Данные, которые копирует коннектор, должны быть представлены как партиционированный поток, аналогично модели топика Kafka, где каждая партиция представляет собой упорядоченную последовательность записей со смещениями. Каждой задаче назначается подмножество партиций для обработки. Порой это сопоставление очевидно: каждый файл в наборе файлов журнала можно считать партицией, каждую строку в файле – записью, а смещения – просто позициии в файле. В иных случаях сопоставление с моделью требует больше усилий: коннектор JDBC может сопоставить каждую таблицу с партицией, но смещение менее ясно. Один из возможных вариантов сопоставления это использовать в качестве смещения последнюю запрашиваемую отметку времени при генерации запросов.
Source Connector, создавший две задачи, которые копируют данные из входных партиций и записывают в сервис Kafka, приведен на Рис.102. .
Рис. 102. Пример реализации Source Connector
Partitions & Records¶
Каждая партиция представляет собой упорядоченную последовательность записей ключ-значение, где и ключи, и значения могут иметь сложные структуры. Поддерживаются многие примитивные типы, а также массивы, структуры и вложенные структуры данных. Для большинства типов можно напрямую использовать стандартные типы Java, такие как java.lang.Integer, java.lang.Map и java.lang.Collection. Для структурированных записей следует использовать класс Struct.
На Рис.103. представлен партиционированный поток: модель данных, в которой коннекторы сопоставляют все системы source и sink. Каждая запись содержит ключи и значения (со схемами), идентификатор партиции и смещения в ней.
Рис. 103. Пример партиционированного потока
Для отслеживания структуры и совместимости записей в партициях схемы (Schemas) могут быть включены в каждую запись. Поскольку схемы обычно генерируются “на лету” на основе источника данных, класс SchemaBuilder включен, что делает их построение очень простым.
Schemas: | Определение абстрактного типа данных. Типы данных могут быть примитивными типами (целочисленные типы, типы с плавающей запятой, логические, строки и байты) или сложными типами (типизированные массивы, карты с одной схемой ключей и схемами значений, а также структурами, которые имеют фиксированный набор имен полей, каждый из которых имеет схема связанных значений). Любой тип может быть указан как необязательный, что позволяет его опускать (в результате чего значения отсутствуют) и может указывать значение по умолчанию. |
---|
Такой формат данных среды выполнения не предполагает какого-либо конкретного формата сериализации; это преобразование осуществляется с помощью Converter, которые обрабатывают формат времени выполнения org.apache.kafka.connect.data и сериализованные данные byte[].
Converter: | Интерфейс конвертера обеспечивает поддержку перевода между форматом данных выполнения Kafka Connect и byte[]. Внутренне это включает промежуточный шаг к формату, используемому слоем сериализации (например, JsonNode, GenericRecord, Message). |
---|
В дополнение к ключу и значению записи имеют идентификаторы партиций и смещения, которые используются фреймворком для периодической фиксации смещений обработанных данных. В случае сбоя обработка может возобновиться с последнего зафиксированного смещения, что позволяет избежать повторной обработки и дублирования событий.
© Copyright 2022, Arenadata.io.
Ивентная модель данных с использованием Kafka и Kafka Connect: Построение гибкой и распределенной архитектуры
Привет, Хабр! В наше время при постоянном росте объемов данных и необходимостью более быстрой и надежной обработки информации, мы сталкиваемся с требованием к эффективному обмену и синхронизации данных между различными системами. Отслеживание и обработка данных в реальном времени стало жизненно необходимым для современных приложений.
В этой статье мы рассмотрим, как Kafka Connect – мощный инструмент из экосистемы Apache Kafka – приходит на помощь при решении сложной задачи синхронизации данных между базами данных. Мы рассмотрим, как используя Kafka Connect, мы можем эффективно следить за изменениями в одной базе данных, обрабатывать их в нашем Java приложении и мгновенно записывать их в другую базу данных, обеспечивая надежность и безопасность данных.
Построим гибкую и масштабируемую архитектуру, которая позволит нам забыть о проблемах, связанных с несогласованными данными, и наслаждаться мгновенным доступом к актуальной информации для наших бизнес-процессов.
Что мы будем использовать?
Предполагается, что всё уже установлено и работает в штатном режиме. В связи с использованием minikube мы будем жить с некоторыми ограничениями.
Комментарии к коду приведены непосредственно в коде
Какой флоу мы построим?
Мы будем асинхронно передавать сообщения. Kafka — шина событий, в топик которой попадают ивенты, происходящие в другом сервисе. Мы сможем получить:
- Независимую обработку данных
- Масштабируемость (Использование нескольких брокеров)
- Отказоустойчивость (В случае недоступности сервиса, данные в топики будут ждать, пока сервис не восстановится)
- Гибкость архитектуры (Возможность обрабатывать ивенты в нескольких местах для разных целей)
За передачу ивентов будет отвечать Kafka Connect , предназначенный для интеграции между источниками данных и Kafka. Он имеет:
- Коннекторы для подключения к базам данных, файловой системе, облачным сервисам, а также пользовательские коннекторы
- Горизонтальное масштабирование
- Обработку данных в реальном времени
Разработка первого Java приложения
Для начала мы напишем Java приложение, которое будет писать в нашу базу данных:
Код отправителя типовой. Мы лишь вставляем несколько записей в базу. Полный код доступен на GitHub. Единственное, что могу отметить, это сущность, которую мы будем записывать:
public class PersonalData < @Id @GeneratedValue(strategy = GenerationType.IDENTITY) private Long id; private String bankBic; private String bankName; @Column(name = "last_update") private LocalDateTime lastUpdate = LocalDateTime.now(); public PersonalData(String bankBic) < this.bankBic = bankBic; >>
bankBic будем получать, используя рандомайзер, bankName всегда будет null, в качестве даты последнего обновления берём текущую.
Деплой в DockerHub
После окончания написания кода, необходимо загрузить наш сервис в DockerHub.
Для этого напишем Dockerfile для подготовки образа:
FROM maven:3.8.4-openjdk-17-slim AS builder COPY pom.xml /build/ COPY src /build/src/ WORKDIR /build RUN mvn -B -e -C -T 1C -DskipTests clean package \ && rm -rf ~/.m2 FROM openjdk:17-slim COPY --from=builder /build/target/producer-*.jar /app/application.jar WORKDIR /app EXPOSE 8080 ENTRYPOINT ["java", "-jar", "application.jar"]
Теперь, когда всё готово мы можем собрать наше приложение с присвоением репозитория, имени и тега.
После успешной сборки образа, отправим его в DockerHub:
Теперь, когда образ загружен, необходимо подготовить для него деплоймент
Но перед этим подготовим нашу базу PostgreSQL, куда будет писать наш сервис
Подготовка окружения
Первым делом мы запустим PostgreSQL. Для этого нам понадобится создать 3 файла: storage.yaml, pv.yaml, pvc.yaml. Содержимое приведено ниже, подробнее о файлах ещё ниже:
kind: StorageClass apiVersion: storage.k8s.io/v1 metadata: name: local-storage provisioner: kubernetes.io/no-provisioner volumeBindingMode: WaitForFirstConsumer
apiVersion: v1 kind: PersistentVolume metadata: name: "pv-pg" labels: type: local spec: capacity: storage: "4Gi" volumeMode: Filesystem accessModes: - ReadWriteOnce persistentVolumeReclaimPolicy: Retain storageClassName: local-storage local: path: "/opt/" nodeAffinity: required: nodeSelectorTerms: - matchExpressions: - key: kubernetes.io/hostname operator: In values: - minikube
kind: PersistentVolumeClaim apiVersion: v1 metadata: name: "pvc-pg" spec: storageClassName: "local-storage" accessModes: - ReadWriteOnce resources: requests: storage: "4Gi"
StorageClass — хранилище, где мы определяем, что:
provisioner: kubernetes.io/no-provisioner — мы не будем динамически создавать локальные хранилища поскольку в kubernetes нет такой возможности;
volumeBindingMode: WaitForFirstConsumer — предоставлять хранилище будем лишь при необходимости в этом.
PV (Persistent Volume) — постоянные тома, которые предоставляют непосредственно хранилище для данных.
capacity: storage: «4Gi» — определяем размер.
accessModes: — ReadWriteOnce — режим доступа. В нашем случае хранилище доступно только для одного пода. Возможно варианты:
- ReadOnlyMany — множество подов, но только на чтение;
- ReadWriteMany — чтение и запись для множества подов;
persistentVolumeReclaimPolicy: Retain — определяет политику восстановления тома после того, как он перестанет использоваться. В нашем случае том будет освобождён и останется существовать. Также доступны:
- Delete — удаление данных и PV;
- Recycle — устаревшая политика. Удаляет данные, но не PV;
storageClassName: local-storage — указываем какой StorageClass используем.
local: path: «/opt/» — определяем путь к локальному хранилищу на узле (путь должен существовать).
PVC (PersistentVolumeClaim) — запрос к PV на предоставление ресурсов определённого размера.
storageClassName: «local-storage» — указываем какой StorageClass используем.
accessModes: — ReadWriteOnce — определяем режим доступа к PVC аналогично PV
resources: requests: storage: «4Gi» — определяем запрашиваемый размер хранилища в 4 гигабайта.
StorageClass определяет тип и параметры хранилища, но не физическое хранилище. Это делает PersistentVolume. PersistentVolumeClaim запрашивает предоставление хранилища и если есть доступный PV с подходящим размером и классом хранилища, то PVC к нему привяжется и поды смогут использовать это хранилище.
Теперь, когда всё готово, необходимо установить PostgreSQL. Для этого используем репозиторий bitnami.
Добавим репозиторий bitnami:
helm repo add bitnami https://charts.bitnami.com/bitnami
helm repo update
И теперь выполним команду для установки:
helm install postgresql-dev bitnami/postgresql --set primary.persistence.existingClaim=pvc-pg,auth.postgresPassword=pgpass
Выполним команду kubectl get pods -w с флагом -w , чтобы отслеживать состояние подов в прямом эфире. Проследим за запуском нашей базы и в случае успеха можно переходить дальше.
Отправитель
Теперь мы можем подготовить деплоймент, чтобы развернуть наше приложение, которое уже сможет выполнять свои функции — писать в базу данных. Создадим файл producer-deploy.yaml со следующим содержимым:
apiVersion: apps/v1 kind: Deployment metadata: name: producer-dep labels: app: marmarks-dep spec: replicas: 1 selector: matchLabels: app: marmarks-dep template: metadata: labels: app: marmarks-dep spec: containers: - name: producer-dep image: "marmarks/producer:0.2" imagePullPolicy: IfNotPresent envFrom: - configMapRef: name: producer-config-map - secretRef: name: producer-secrets ports: - containerPort: 8080 protocol: TCP readinessProbe: httpGet: path: /actuator/health port: 8080 initialDelaySeconds: 5 periodSeconds: 3 --- apiVersion: v1 kind: ConfigMap metadata: name: producer-config-map labels: app: marmarks-dep data: SERVER_PORT: "8080" SPRING_PROFILES_ACTIVE: "dev" SPRING_DATASOURCE_URL: "jdbc:postgresql://postgresql-dev:5432/postgres" --- apiVersion: v1 kind: Secret metadata: name: producer-secrets labels: app: marmarks-dep type: Opaque stringData: SPRING_DATASOURCE_USERNAME: "postgres" SPRING_DATASOURCE_PASSWORD: "pgpass"
Здесь, помимо деплоймнета, мы создадим ConfigMap и хранилище секретов, куда передадим необходимые переменные окружения для подключения. Более подробный разбор конфигураций будет приведён в следующей части статьи, где мы рассмотрим Helm Chart.
Запустим наш деплоймент, используя kubectl apply -f producer-deploy.yaml
Выполним команду kubectl get pods -w .
Когда образ будет загружен и под станет доступным, можем зайти внутрь пода PostgreSQL выполнив команду kubectl exec -i -t -n default postgresql-dev-0 -c postgresql — sh -c «clear; (bash || ash || sh)»
Находясь внутри подключимся к базе выполнив команду: psql -h postgresql-dev -p 5432 -U postgres -d postgres
Следующим шагом у нас запросят пароль. В моём случае я введу pgpass
Теперь, когда мы внутри, выполним SELECT * FROM personal_data; для получения содержащихся внутри записей. Получим что-то типа такого:
| id | bank_bic | bank_name | last_update |
|——|——————-|———————|———————————————————|
| 1 | 1234567 | | 2023-07-27 17:06:44.052581 |
| 2 | 1234568 | | 2023-07-27 17:06:44.266662 |
| 3 | 1234569 | | 2023-07-27 17:06:44.272390 |
| 4 | 1234510 | | 2023-07-27 17:06:44.276271 |
| 5 | 1234511 | | 2023-07-27 17:06:44.339826 |
Kafka
Теперь, когда наш образ и база данных готовы для работы и мы убедились, что они работают, мы поднимим Kafka Cluster. Теперь загрузим Zookeeper и Kafka из репозитория bitnami:
helm install zookeeper bitnami/zookeeper
helm install kafka bitnami/kafka
Так мы используем настройки по умолчанию. Для того, чтобы установить число реплик равным 3, необходимо добавить —set replicaCount=3 в команду для чарта Kafka. Если чарт уже запущен, сделаем helm upgrade kafka bitnami/kafka —set replicaCount=3
Альтернативным путём будет запустить чарты из их репозитория на GitHub.
Kafka Connect
Теперь необходимо подготовить необходимый образ kafka-connect. Для этого напишем Dockerfile, где установим в наш образ jdbc connect для чтения из базы данных, а также avro converter для конвертации данных в формат Avro:
FROM confluentinc/cp-kafka-connect-base:6.2.1 RUN confluent-hub install --no-prompt confluentinc/kafka-connect-avro-converter:5.5.4 RUN confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:10.1.1
После этого соберем наш образ и загрузим в DockerHub, как мы делали это ранее.
Когда наш образ собран и загружен, обратимся к confluentinc для получения helm чарта — GitHub. Клонируем себе репозиторий и выполним команду:
helm dependency update charts/cp-kafka/
После этого используем наш образ для запуска Kafka Connect:
helm install kafka-connect \ --set image="marmarks/cp-kafka-connect-jdbc" \ --set imageTag="6.2.1" \ --set kafka.bootstrapServers="PLAINTEXT://kafka-headless:9092" \ --set prometheus.jmx.enabled=false \ ./charts/cp-kafka-connect
где Image, imageTag, необходимо заменить на свои параметры.
kafka.bootstrapServers — путь до брокера. Необходимо заменить на свой путь, если запускали kafka отличным от моего способа и в этом есть необходимость.
prometheus.jmx.enabled=false чтобы отключить сбор метрик.
Если реплик Kafka не три, то следует дополнить команду сменив число реплик топиков, поскольку по умолчанию их 3. Итоговая команда (для 1 реплики) будет выглядеть так:
helm install kafka-connect \ --set image="marmarks/cp-kafka-connect-jdbc" \ --set imageTag="6.2.1" \ --set kafka.bootstrapServers="PLAINTEXT://kafka-headless:9092" \ --set prometheus.jmx.enabled=false \ --set config.storage.replication.factor="1" \ --set offset.storage.replication.factor="1" \ --set status.storage.replication.factor="1" \ ./charts/cp-kafka-connect
Schema Regestry
Следующим шагом следует запустить Schema Regestry. Для этого используем следующую команду:
helm install schema-registry \ --set kafka.bootstrapServers="PLAINTEXT://kafka-headless:9092" \ --set prometheus.jmx.enabled=false ./charts/cp-schema-registry
Здесь мы снова указываем путь до брокера (при необходимости изменить) и отключаем сбор метрик.
Для работы с Kafka Connect и брокером, мы поднимем под Kafka Client. Создадим файл kafka-client.yaml с содержимым:
apiVersion: v1 kind: Pod metadata: name: kafka-client namespace: default spec: containers: - name: kafka-client image: confluentinc/cp-enterprise-kafka:5.4.1 command: - sh - -c - "sleep infinity"
Применим с помощью kubectl apply -f kafka-client.yaml
Когда kafka-client запущен, мы для удобства зайдём в него с помощью команды:
kubectl exec -i -t -n default kafka-client -c kafka-client — sh -c «clear; (bash || ash || sh)»
Проверим наличие коннекторов:
curl -X GET http://kafka-connect-cp-kafka-connect:8083/connectors
На текущем этапы мы получим [] , что будет означать, что коннекторов не создано. Создадим наш коннектор:
curl -X POST \ -H "Content-Type: application/json" \ --data ' < "name": "kafka-connector", "config": < "connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector", "key.converter": "io.confluent.connect.avro.AvroConverter", "value.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "http://schema-registry-cp-schema-registry:8081", "value.converter.schema.registry.url": "http://schema-registry-cp-schema-registry:8081", "tasks.max": 1, "connection.url": "jdbc:postgresql://postgresql-dev:5432/postgres?user=postgres&password=pgpass", "table.whitelist": "personal_data", "mode": "incrementing", "incrementing.column.name": "id", "topic.prefix": "kafka-connect-", "poll.interval.ms": 1000 >>' \ http://kafka-connect-cp-kafka-connect:8083/connectors
- «name»: «kafka-connector» — укажем имя для коннектора.
- «connector.class»: «io.confluent.connect.jdbc.JdbcSourceConnector» — определяет класс коннектора, позволяющий читать данные из базы данных с использованием JDBC.
- «key.converter»: «io.confluent.connect.avro.AvroConverter» — указываем конвертер для ключей сообщений, который будет использоваться при записи данных в Kafka. Используем AvroConverter, который позволяет сериализовать ключи сообщений в Avro формат перед отправкой в Kafka.
- «value.converter»: «io.confluent.connect.avro.AvroConverter» — задаем конвертер для значений сообщений, который будет использоваться при записи данных в Kafka. Аналогично ключам, значения также сериализуются в Avro формат.
- «key.converter.schema.registry.url»: «http://schema-registry-cp-schema-registry:8081« — указываем URL-адрес Schema Registry, который будет использоваться для регистрации схемы ключей сообщений в Avro формате.
- «value.converter.schema.registry.url»: «http://schema-registry-cp-schema-registry:8081« — указываем URL-адрес Schema Registry, который будет использоваться для регистрации схемы значений сообщений в Avro формате.
- «tasks.max»: 1 — указываем количество потоков для данного коннектора, когда у нас больше 1 пода Kafka Connect, имеет смысл ставить больше потоков.
- «connection.url»: «jdbc:postgresql://postgresql-dev:5432/postgres?user=postgres&password=pgpass» — задаем URL-подключения к базе данных PostgreSQL, которая будет использоваться для чтения данных.
- «table.whitelist»: «personal_data» — определяем список таблиц, которые будут считываться из базы данных. Если не перечислить здесь таблицы, то данные будут считываться из всех таблиц доступных для пользователя переданного в connection.url
- «mode»: «incrementing» — указываем режим считывания данных, это инкрементный режим, который использует значение столбца для определения обновлений.
- «incrementing.column.name»: «id» — указываем имя столбца, используемого для определения обновлений в инкрементном режиме.
- «timestamp.column.name»: «last_update» — указываем имя столбца, для получения обновлений уже существующих записей
- «topic.prefix»: «kafka-connect-« — задаем префикс для имен топиков, которые будут создаваться в Kafka при записи данных из базы данных.
- «poll.interval.ms»: 1000: Задает интервал между опросами базы данных на предмет обновлений данных. В данном случае, это 1000 миллисекунд (1 секунда).
Дополнительно отмечу, что вручную мы никаких схем регистрировать не будем. Kafka Connect сам создаст необходимые схемы и запишет их в хранилище. Помимо приведённых выше конфигураций, в документации можно найти ещё множество различных настроек.
А работает ли?
После отправки выполним curl -X GET http://kafka-connect-cp-kafka-connect:8083/connectors/kafka-connector/status для получения статуса созданного коннектора. В норме статус будет выглядеть следующим образом:
< "name": "kafka-connector", "connector": < "state": "RUNNING", "worker_id": "10.244.2.104:8083" >, "tasks": [ < "id": 0, "state": "RUNNING", "worker_id": "10.244.2.104:8083" >], "type": "source" >
Чтобы получить конфигурацию коннектора необходимо выполнить команду:
curl -X GET http://kafka-connect-cp-kafka-connect:8083/connectors/kafka-connector/config
Теперь получим список доступных топиков в Kafka, используя команду:
kafka-topics —bootstrap-server kafka:9092 —list
Здесь мы увидим топик с нашими записями из таблицы personal data -> kafka-connect-personal_data. Топик с конфигурациями, оффсетом, статусами -> kafka-connect-cp-kafka-connect-config, kafka-connect-cp-kafka-connect-offset, kafka-connect-cp-kafka-connect-status, соответственно.
Теперь проверим наличие записей в нашем топике:
kafka-console-consumer —bootstrap-server kafka:9092 —topic kafka-connect-personal_data —from-beginning
В случае , если всё прошло успешно, это будет выглядеть как-то так:
Теперь посмотрим на схемы в Schema Regestry. Для получения списка доступных схем выполним команду:
curl -X GET http://schema-registry-cp-schema-registry:8081/subjects
В моём случае ответ выглядит так: [«kafka-connect-personal_data-value»]
Теперь узнаем версию схемы:
curl -X GET http://schema-registry-cp-schema-registry:8081/subjects/kafka-connect-personal_data-value/versions
В моём случае она первая, и теперь посмотрим саму схему:
curl -X GET http://schema-registry-cp-schema-registry:8081/subjects/kafka-connect-personal_data-value/versions/1
< "subject": "kafka-connect-personal_data-value", "version": 1, "id": 1, "schema": < "type": "record", "name": "personal_data", "fields": [ < "name": "id", "type": "long" >, < "name": "bank_bic", "type": ["null", "string"], "default": null >, < "name": "bank_name", "type": ["null", "string"], "default": null >, < "name": "last_update", "type": ["null", < "type": "long", "connect.version": 1, "connect.name": "org.apache.kafka.connect.data.Timestamp", "logicalType": "timestamp-millis" >], "default": null > ], "connect.name": "personal_data" > >
Мы развернули множество сервисов, убедились, что каждый из них общается друг с другом, все данные успешно доставляются от одного сервису к другому, и всё работает в штатном режиме. Повторим наш флоу: данные попадают в базу данных, откуда их считывает опираясь на поля id (для новых записей) и last_update (для уже существующих) Kafka Connect, после этого он отправляет их брокеру Kafka в отдельный топик, а также генерирует схему данных, которую отправляет в Schema Regestry. Теперь осталось подготовить сервис, который будет получать схемы из реестра, считывать данные из топика, модифицировать их по своему усмотрению и записывать в базу данных. Также подготовим Helm Chart для удобного развёртывания нашего отправителя и получателя. Всё это будет проделано в следующей части статьи.
Как устроена Kafka Connect: основы интеграции Kafka с системами Big Data
В прошлый раз мы говорили про соединение потоков в Kafka. Сегодня рассмотрим базовые принципы устройства еще одной утилиты брокера Kafka для интеграции с внешними системами Kafka Connect. Читайте далее про архитектуру интеграционной библиотеки, благодаря которой Apache Kafka легко взаимодействует с различными Big Data хранилищами и базами данных.
Плагины-коннекторы в Kafka Connect
Плагины-коннекторы — это исполняемые библиотеки утилиты Kafka Connect, отвечающие за перемещение данных между брокером Kafka и каким-либо Big Data хранилищем (например, Amazon AWS S3, Elasticsearch и т.д.) [1].
Особенности работы утилиты Kafka Connect мы подробнее рассмотрим далее.
Утилита Kafka Connect
Kafka Connect — это утилита брокера сообщений Apache Kafka, которая отвечает за перемещение данных между Kafka и другими хранилищами больших данных. Кафка Коннект выполняется в виде кластера процессов-исполнителей (worker processes). На каждом процессе-исполнителе устанавливаются коннекторы, которые запускают задачи (tasks) для параллельного перемещения больших объемов данных и эффективного использования доступных ресурсов рабочих узлов. Задачам коннектора-источника необходимо прочитывать данные из системы-источника и передавать эти данные процессам-исполнителям. Задачи коннектора-приемника получают данные от процессов-исполнителей и записывают их в целевую информационную систему (хранилище) [1].
Кафка Коннект не требует дополнительной установки. Она находится в том же пакете, что что и Apache Kafka. Запуск Кафка Коннект происходит командной строке через файл connect-distributed.bat :
bin/connect-distributed.bat config/connect-distributed.properties
Рассмотрим пример подключения Кафка Коннект к хранилищу Amazon AWS S3 для загрузки данных из корзины (bucket) в топики (topic) Apache Kafka. Для этого необходимо прежде всего прописать учетные данные (открытый и закрытый ключи) в следующем формате:
aws_access_key_id = AKIAIOSFODNN7EXAMPLE aws_secret_access_key = wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
Далее через Strimzi (менеджер пакетов для платформы развертывания Kubernetes) необходимо настроить Docker-файл для коннектора S3:
FROM strimzi/kafka:0.16.1-kafka-2.4.0 USER root:root COPY ./my-plugins/ /opt/kafka/plugins/ USER 1001
Имея настроенный Docker-образ можно развернуть Кафка Коннект путем создания следующего ресурса в Kubernetes (платформа для развертывания веб-приложений):
apiVersion: kafka.strimzi.io/v1alpha1 kind: KafkaConnect metadata: name: my-connect-cluster spec: image: docker.io/scholzj/kafka:camel-kafka-2.4.0 replicas: 3 bootstrapServers: my-cluster-kafka-bootstrap:9092 externalConfiguration: volumes: - name: aws-credentials secret: secretName: aws-credentials config: config.providers: file config.providers.file.class: org.apache.kafka.common.config.provider.FileConfigProvider key.converter: org.apache.kafka.connect.json.JsonConverter value.converter: org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable: false value.converter.schemas.enable: false
После развертывания экземпляра Кафка Коннект необходимо создать коннектор S3 c помощью Strimzi:
apiVersion: kafka.strimzi.io/v1alpha1 kind: KafkaConnector metadata: name: s3-connector labels: strimzi.io/cluster: my-connect-cluster spec: class: org.apache.camel.kafkaconnector.CamelSourceConnector tasksMax: 1 config: key.converter: org.apache.kafka.connect.storage.StringConverter value.converter: org.apache.camel.kafkaconnector.converters.S3ObjectConverter camel.source.kafka.topic: s3-topic camel.source.url: aws-s3://camel-connector-test?autocloseBody=false camel.source.maxPollDuration: 10000 camel.component.aws-s3.configuration.access-key: $ camel.component.aws-s3.configuration.secret-key: $ camel.component.aws-s3.configuration.region: US_EAST_1
Теперь Кафка Коннект готова для загрузки данных из хранилища Amazon AWS S3 в брокер Apache Kafka [2].
Kafka Connect по умолчанию включает большинство готовых коннекторов для интеграции с наиболее популярными хранилищами Big Data (HDFS, Cassandra, Amazon S3, Elasticsearch и пр.), которые требуется только настроить для дальнейшего использования. Однако, если необходимо, можно создать собственный коннектор, например, для интеграции с уникальной системой, применяемой в вашем случае.
Таким образом, благодаря утилите Kafka Connect у брокера Apache Kafka есть возможность загрузки данных из любого хранилища больших данных, включая также облачные хранилища (Amazon AWS, Drophbox, ICloud). Благодаря этому Apache Kafka является универсальным средством для хранения и обмена большими потоками данных, что позволяет активно использовать этот фреймворк в задачах Data Science и разработке распределенных приложений. В следующей статье мы рассмотрим движок Apache Kafka KSQL.
Освоить Apache Kafka на профессиональном уровне в качестве администратора Big Data кластеров, разработчика распределенных приложений и прочих прикладных областях Data Science вы сможете на практических курсах по Kafka в нашем лицензированном учебном центре обучения и повышения квалификации ИТ-специалистов в Москве:
- Администрирование кластера Kafka
- Kafka Streams для разработчиков
- Интеграция Apache Kafka для разработчиков
- Администрирование кластера Arenadata Streaming Kafka
- Н.Нархид, Г.Шапира, Т.Палино. Apache Kafka. Потоковая обработка и анализ данных
- https://habr.com/ru/company/redhatrussia/blog/508056/
Зачем вам Kafka Connect: разбираем на примере интеграции Elasticsearch с Кафка
Сегодня поговорим, как связать Elasticsearch с Apache Kafka: рассмотрим, зачем нужны коннекторы, когда их следует использовать и какие особенности популярных в Big Data форматов JSON и AVRO стоит при этом учитывать. Также читайте в нашей статье, что такое Logstash Shipper, чем он отличается от FileBeat и при чем тут Kafka Connect.
Когда и зачем нужна интеграция Elasticsearch с Apache Kafka: 3 практических примера
Напомним, в ELK Stack компонент Logstash отвечает за сбор, преобразование и сохранение в общем хранилище данных из разных файлов, СУБД, логов и прочих мест в режиме реального времени. Это похоже на основное назначение Apache Kafka — распределенной стриминговой платформы, которая собирает и агрегирует большие данные разных форматов из множества источников. Возникает вопрос: зачем добавлять Kafka в ELK-стек, используя дополнительное средство сбора потоковых данных? Здесь можно выделить несколько сценариев [1]:
· временная остановка кластера Elasticsearch (ES) с целью обновления версии или внесения других изменений. Чтобы не потерять данные, приходящие из разных систем, Kafka можно использовать в качестве временного буфера для сохранения информации. Когда ELK-кластер возобновит работу, Logstash продолжит собирать, преобразовывать и отправлять в ES пропущенные данные, считывая их из топиков Apache Kafka с того места, где случился останов.
· Выравнивание пропускной способности компонентов ELK, чтобы, например, при внезапном увеличении объема данных ES-кластер не «захлебнулся» от высокой скорости поступления новой информации. На практике такая ситуация может возникнуть в случае ошибки в обновленном Big Data приложении или аномальном непредвиденном росте пользовательской активности.
Таким образом, добавление Кафка в Эластик-стек позволяет не зависеть от мониторинга событий. Совместное использование FileBeat с Kafka дает возможность создавать разные топики для каждого сервиса, улучшая «реактивность» всей Big Data системы. Напомним, FileBeat — это легковесный серверный агент для отправки определенных типов рабочих данных в Elasticsearch. Он занимает использует гораздо меньше системных ресурсов, чем Logstash. Хотя функциональные возможности Logstash по вводу, фильтрации и выводу для сбора, обогащения и преобразования данных из различных источников гораздо больше, чем у FileBeat. Можно сказать, что Logstash «дороже», чем FileBeat.
Возвращаясь к преимуществам включения Kafka в ELK Stack, отметим, что любая команда разработчиков или администраторов Big Data систем может подписаться на топики Kafka для сбора метрик или выдачи сигналов тревоги, уведомляющих о случившихся или потенциальных авариях. Это весьма востребована, поскольку на практике, в основном, Elasticsearch с Kibana используются для информирования, а не для оповещения или мониторинга [2].
Например, в этом случае можно использовать 2 экземпляра Logstash — для отправки и индексации данных соответственно. Отправитель (Logstash Shipper) будет немедленно сохранять данные в топиках Kafka. А индексатор (Logstash Indexer) считывает из Кафка данные со своей собственной скоростью, выполняя при этом дорогостоящие преобразования, включая поиск и индексацию в Elasticsearch. Также FileBeat может отслеживать файлы и отправлять их в Kafka через приемник Logstash [1].
Что такое Kafka Connect и как это работает
Разумеется, есть еще множество других кейсов по совместному использованию Kafka с компонентами ELK Stack. Причем Apache Kafka также может выступать приемником данных из Elasticsearch. Для всех вариантов отлично подходит Kafka Connect — компонент Кафка, который обеспечивает потоковую интеграцию с внешними хранилищами данных, включая JDBC, Elasticsearch, IBM MQ, S3, BigQuery и другие. Наличие расширенного API позволяет дополнить Kafka Connect собственными коннекторами. А REST API облегчает их настройку и управления. Модульная природа Kafka Connect делает возможным гибко удовлетворить все интеграционные потребности [3]:
· коннекторы (connectors) — это файлы JAR, которые определяют, как интегрироваться с внешним хранилищем данных;
· конвертеры (converters) используются для сериализации и десериализации данных;
· преобразования (transforms) отвечают за дополнительную обработку сообщений «на лету».
Обычно для каждой внешней системы используются свой коннектор. В частности, за интеграцию Apache Kafka с ELK отвечает коннектор Kafka Connect Elasticsearch. Он позволяет перемещать данные, записывая их из топика Кафка в индекс Elasticsearch с приведением к одному типу. Например, в кейсах по аналитике больших данных каждое сообщение в Kafka рассматривается как событие, которое коннектор идентифицирует по топику (topic), разделу (partition) и смещению (offset), чтобы преобразовать в уникальные ES-документы. При использовании Elasticsearch в качестве key-value хранилища ключи из сообщений Kafka будут идентификаторами ES-документов, гарантируя упорядоченное обновления. Оба рассмотренные варианта использования поддерживают идемпотентную семантику записи Elasticsearch, т.е. точно однократную доставку (exactly once). Подробнее о гарантиях доставки сообщений в Apache Kafka мы рассказывали здесь.
Также стоит упомянуть про маппирование или отображение данных, которое определяет, как документ и содержащиеся в нем поля хранятся и индексируются в ES. Пользователи могут явно определять сопоставления типов в индексах. Если отображение не задано явно, Elasticsearch может определять имена и типы полей из данных. Однако такие типы, как метка времени (timestamp) и десятичная дробь, могут быть выведены некорректно. Kafka Connect Elasticsearch позволяет выводить сопоставления из схем сообщений Кафка. Таким образом, благодаря эволюционной поддержке схем данных, коннектор может обрабатывать изменения схемы в обратной, прямой и полностью совместимой конфигурации. В ряде случаев доступны некоторые несовместимые изменения схемы, например, конвертация поля из целого числа в строку [4].
5 особенностей интеграции ES с Кафка, о которых нужно знать
Следует помнить несколько важных моментов, при использовании Kafka Connect Elasticsearch [5]:
· ·· данные сериализуются на основе значений по умолчанию, указанных в ваших worker’ах Kafka Connect, например, Avro. Если нужно что-то другое, следует вручную добавить переопределения.
· · При передаче данных в Elasticsearch из KSQL (Kafka SQL), необходимо установить для преобразователя ключей значение STRING. Пока, все что относится к поддержке ключей выражается так: “Key.converter”: “org.apache.kafka.connect.storage.StringConverter”
· · Коннектор автоматически изменяет имена топиков в верхнем регистре на имена индексов в нижнем регистре в Elasticsearch, вручную сопоставлять не нужно.
· · Можно использовать регулярные выражения для сопоставления нескольких топиков, определив themes.regex в конфигурации топика.
· · отдельно стоит сказать про параметр schema.ignore. Если он равен True, можно просто передать JSON-документ в Elasticsearch — сопоставление типов полей выполнится автоматически. Это актуально, если в данных отсутствует явная схема, например, формат JSON, CSV и пр. При использовании формата AVRO или JSON со встроенной схемой следует установить schema.ignore = false. Это позволит Kafka Connect явно создать сопоставление типов в Elasticsearch при предаче данных. На практике в большинстве случаев используется schema.ignore = true, что позволяет передать данные, не вдаваясь в технические подробности.
В следующей статье мы продолжим разговор про коннекторы Apache Kafka Connect и рассмотрим наиболее распространенные ошибки интеграции с Elasticsearch. А практические детали по связыванию Apache Kafka с другими внешними источниками для потоковой обработки больших данных вы узнаете на практических курсах по Кафка в нашем лицензированном учебном центре повышения квалификации и обучения руководителей и ИТ-специалистов (разработчиков, архитекторов, инженеров и аналитиков Big Data) в Москве:
Источники