Kafka External Table
Overview
[Preview Release] This feature is currently in public preview release.
This document mainly introduces how to create an external table in SQL that connects to the Kafka message queue system. By defining an external table, you can easily read data streams from Kafka and query and analyze these data streams as tables.
Create Storage Connection
First, you need to create a storage connection to connect to the Kafka server. Currently, connections requiring certificates are not supported.
Syntax
Parameter Description
- connection_name: The name of the connection, used for subsequent references.
- TYPE: The type of connection, here it is
kafka. - BOOTSTRAP_SERVERS: The address list of the Kafka cluster, formatted as
['host1:port1', 'host2:port2', ...]. - SECURITY_PROTOCOL: Security protocol, can be
PLAINTEXT, etc.
Create External Table
After creating a storage connection, you can define an external table to read data from Kafka.
Syntax
Parameter Description
- external_table_name: The name of the external table.
- Field Description
| Field | Meaning | Type |
|---|---|---|
| topic | Kafka topic name | STRING |
| partition | Data partition ID | INT |
| offset | Offset in the Kafka partition | BIGINT |
| timestamp | Kafka message timestamp | TIMESTAMP_LTZ |
| timestamp_type | Kafka message timestamp type | STRING |
| headers | Kafka message headers | MAP<STRING, BINARY> |
| key | Column name of the message key, type is binary. You can convert the binary type to a readable string using type conversion such as cast(key_column as string) | BINARY |
| value | Column name of the message body, type is binary. You can convert the binary type to a readable string using type conversion such as cast(value_column as string) | BINARY |
- USING kafka: Specify using Kafka as the data source.
- OPTIONS:
- group_id: Kafka consumer group ID.
- topics: Kafka topic name.
- starting_offset: Starting offset, default is
earliest, can beearliestorlatest. - ending_offset: Ending offset, default is
latest, can beearliestorlatest. - cz.kafka.seek.timeout.ms: Kafka seek timeout (milliseconds).
- cz.kafka.request.retry.times: Kafka request retry count.
- cz.kafka.request.retry.intervalMs: Kafka request retry interval (milliseconds).
- CONNECTION: Specify the previously created storage connection name.
