Перейти к основному содержанию
Перейти к основному содержанию

JDBC connector

Примечание

Этот коннектор следует использовать только в том случае, если ваши данные простые и состоят из примитивных типов данных, например int. Специфичные для ClickHouse типы, такие как map, не поддерживаются.

В наших примерах мы используем дистрибутив Kafka Connect от Confluent.

Ниже мы описываем простую установку, при которой сообщения считываются из одного Kafka-топика и вставляются в таблицу ClickHouse. Мы рекомендуем Confluent Cloud, который предоставляет щедрый бесплатный тариф для тех, у кого нет собственного Kafka-окружения.

Обратите внимание, что для коннектора JDBC требуется схема (нельзя использовать обычный JSON или CSV с JDBC-коннектором). Хотя схема может быть закодирована в каждом сообщении, настойчиво рекомендуется использовать Confluent Schema Registry, чтобы избежать связанных накладных расходов. Предоставленный скрипт вставки автоматически выводит схему из сообщений и добавляет её в реестр — таким образом, этот скрипт может быть повторно использован для других наборов данных. Предполагается, что ключи Kafka имеют тип String. Дополнительные сведения о схемах Kafka можно найти здесь.

Лицензия

JDBC Connector распространяется под Confluent Community License

Шаги

Соберите данные для подключения

To connect to ClickHouse with HTTP(S) you need this information:

Parameter(s)Description
HOST and PORTTypically, the port is 8443 when using TLS or 8123 when not using TLS.
DATABASE NAMEOut of the box, there is a database named default, use the name of the database that you want to connect to.
USERNAME and PASSWORDOut of the box, the username is default. Use the username appropriate for your use case.

The details for your ClickHouse Cloud service are available in the ClickHouse Cloud console. Select a service and click Connect:

ClickHouse Cloud service connect button

Choose HTTPS. Connection details are displayed in an example curl command.

ClickHouse Cloud HTTPS connection details

If you are using self-managed ClickHouse, the connection details are set by your ClickHouse administrator.

1. Установите Kafka Connect и коннектор

Предполагается, что вы скачали пакет Confluent и установили его локально. Следуйте инструкциям по установке коннектора, описанным здесь.

Если вы используете метод установки через confluent-hub, ваши локальные конфигурационные файлы будут обновлены.

Для отправки данных из Kafka в ClickHouse мы используем sink-компонент коннектора.

2. Скачайте и установите JDBC-драйвер

Скачайте и установите JDBC-драйвер ClickHouse clickhouse-jdbc-<version>-shaded.jar отсюда. Установите его в Kafka Connect, следуя инструкциям здесь. Другие драйверы могут работать, но не были протестированы.

Примечание

Типичная проблема: в документации предлагается скопировать JAR в share/java/kafka-connect-jdbc/. Если вы сталкиваетесь с проблемами при обнаружении драйвера Connect, скопируйте драйвер в share/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib/. Либо измените plugin.path, чтобы включить драйвер — см. ниже.

3. Подготовьте конфигурацию

Следуйте этим инструкциям для настройки Connect, соответствующей вашему типу установки, учитывая различия между автономным и распределённым кластером. При использовании Confluent Cloud актуальна распределённая конфигурация.

Следующие параметры важны для использования JDBC-коннектора с ClickHouse. Полный список параметров можно найти здесь:

  • _connection.url_ - должен иметь формат jdbc:clickhouse://&lt;clickhouse host>:&lt;clickhouse http port>/&lt;target database>
  • connection.user - пользователь с правами записи в целевую базу данных
  • table.name.format - таблица ClickHouse, в которую выполняется вставка данных. Таблица должна уже существовать.
  • batch.size - количество строк, отправляемых в одном пакете. Убедитесь, что задано достаточно большое значение. Согласно рекомендациям ClickHouse минимальным следует считать значение 1000.
  • tasks.max - коннектор JDBC Sink поддерживает запуск одной или нескольких задач. Это можно использовать для увеличения производительности. Вместе с размером пакета это ваши основные средства повышения производительности.
  • value.converter.schemas.enable - установите в false при использовании реестра схем, в true — если вы встраиваете схемы в сообщения.
  • value.converter - установите в соответствии с вашим типом данных, например, для JSON — io.confluent.connect.json.JsonSchemaConverter.
  • key.converter - установите org.apache.kafka.connect.storage.StringConverter. Мы используем строковые ключи.
  • pk.mode - не актуален для ClickHouse. Установите none.
  • auto.create - не поддерживается и должен иметь значение false.
  • auto.evolve - мы рекомендуем значение false для этого параметра, хотя поддержка может появиться в будущем.
  • insert.mode - установите "insert". Другие режимы в настоящее время не поддерживаются.
  • key.converter - укажите в соответствии с типами ваших ключей.
  • value.converter - укажите на основе типа данных в вашем топике. Эти данные должны иметь поддерживаемую схему — форматы JSON, Avro или Protobuf.

Если вы используете наш пример набора данных для тестирования, убедитесь, что заданы следующие параметры:

  • value.converter.schemas.enable - установите в false, так как мы используем реестр схем. Установите в true, если вы встраиваете схему в каждое сообщение.
  • key.converter - установите "org.apache.kafka.connect.storage.StringConverter". Мы используем строковые ключи.
  • value.converter - установите "io.confluent.connect.json.JsonSchemaConverter".
  • value.converter.schema.registry.url - укажите URL сервера реестра схем, а также учетные данные для этого сервера через параметр value.converter.schema.registry.basic.auth.user.info.

Примеры файлов конфигурации для тестового набора данных GitHub можно найти здесь, при условии, что Connect запускается в режиме standalone, а Kafka размещена в Confluent Cloud.

4. Создание таблицы ClickHouse

Убедитесь, что таблица создана, удалив её, если она уже существует из предыдущих примеров. Ниже приведён пример, совместимый с уменьшенным набором данных GitHub. Обратите внимание на отсутствие каких-либо типов Array или Map, которые в настоящее время не поддерживаются:

CREATE TABLE github
(
    file_time DateTime,
    event_type Enum('CommitCommentEvent' = 1, 'CreateEvent' = 2, 'DeleteEvent' = 3, 'ForkEvent' = 4, 'GollumEvent' = 5, 'IssueCommentEvent' = 6, 'IssuesEvent' = 7, 'MemberEvent' = 8, 'PublicEvent' = 9, 'PullRequestEvent' = 10, 'PullRequestReviewCommentEvent' = 11, 'PushEvent' = 12, 'ReleaseEvent' = 13, 'SponsorshipEvent' = 14, 'WatchEvent' = 15, 'GistEvent' = 16, 'FollowEvent' = 17, 'DownloadEvent' = 18, 'PullRequestReviewEvent' = 19, 'ForkApplyEvent' = 20, 'Event' = 21, 'TeamAddEvent' = 22),
    actor_login LowCardinality(String),
    repo_name LowCardinality(String),
    created_at DateTime,
    updated_at DateTime,
    action Enum('none' = 0, 'created' = 1, 'added' = 2, 'edited' = 3, 'deleted' = 4, 'opened' = 5, 'closed' = 6, 'reopened' = 7, 'assigned' = 8, 'unassigned' = 9, 'labeled' = 10, 'unlabeled' = 11, 'review_requested' = 12, 'review_request_removed' = 13, 'synchronize' = 14, 'started' = 15, 'published' = 16, 'update' = 17, 'create' = 18, 'fork' = 19, 'merged' = 20),
    comment_id UInt64,
    path String,
    ref LowCardinality(String),
    ref_type Enum('none' = 0, 'branch' = 1, 'tag' = 2, 'repository' = 3, 'unknown' = 4),
    creator_user_login LowCardinality(String),
    number UInt32,
    title String,
    state Enum('none' = 0, 'open' = 1, 'closed' = 2),
    assignee LowCardinality(String),
    closed_at DateTime,
    merged_at DateTime,
    merge_commit_sha String,
    merged_by LowCardinality(String),
    review_comments UInt32,
    member_login LowCardinality(String)
) ENGINE = MergeTree ORDER BY (event_type, repo_name, created_at)

5. Запустите Kafka Connect

Запустите Kafka Connect в автономном или распределённом режиме.

./bin/connect-standalone connect.properties.ini github-jdbc-sink.properties.ini

6. Добавьте данные в Kafka

Отправьте сообщения в Kafka, используя предоставленные скрипт и конфигурацию. Вам нужно будет изменить файл конфигурации github.config, чтобы указать свои учетные данные Kafka. По умолчанию скрипт настроен для работы с Confluent Cloud.

python producer.py -c github.config

Этот скрипт можно использовать для записи любого файла ndjson в топик Kafka. Он попытается автоматически вывести схему. Приведённая примерная конфигурация запишет только 10 000 сообщений — при необходимости измените здесь. Эта конфигурация также удаляет все несовместимые поля типа Array из набора данных при отправке в Kafka.

Это необходимо для того, чтобы коннектор JDBC мог преобразовывать сообщения в операторы INSERT. Если вы используете собственные данные, убедитесь, что вы либо добавляете схему к каждому сообщению (установив _value.converter.schemas.enable _в true), либо что ваш клиент публикует сообщения, ссылающиеся на схему в реестре.

Kafka Connect должен начать считывать сообщения и вставлять строки в ClickHouse. Обратите внимание, что предупреждения вида "[JDBC Compliant Mode] Transaction is not supported." являются ожидаемыми и их можно игнорировать.

Простой запрос к целевой таблице "Github" должен подтвердить, что данные были вставлены.

SELECT count() FROM default.github;
| count\(\) |
| :--- |
| 10000 |

Рекомендуемые дополнительные материалы