Using Table Stream and Pipe to Import Kafka Data into Lakehouse
1. Background
In big data processing, efficiently ingesting streaming data from Kafka into a Lakehouse is a common requirement. Singdata Lakehouse provides powerful Table Stream and Pipe functionality that makes this process simpler and more efficient. This article describes how to use Table Stream and Pipe to import Kafka data into the Lakehouse, covering the complete process of creating a Kafka external table and a Kafka Table Stream.
2. Steps
Create a Kafka External Table
Before using Table Stream and Pipe, create an external table integrated with Kafka to access data in Kafka.
Create a Table Stream
Create a Table Stream on the Kafka external table to capture real-time data changes from Kafka.
kafka_table_stream_pipe1: Name of the Table Stream.ON TABLE external_table_kafka: Specifies that the Table Stream is created based on the previously created Kafka external table.table_stream_mode='append_only': Sets the mode to append-only, meaning only newly added data rows are captured.
After creation, verify the data in the Table Stream with the following query:
This query converts the value field in the Table Stream to a string type and returns it for subsequent processing.
Create a Target Table
Create a target table to store data imported from Kafka.
kafka_sink_table_1: Name of the target table.a TIMESTAMP: First field for storing timestamp data.b STRING: Second field for storing string data.
Create a Pipe
Use a Pipe to continuously import data from the Table Stream into the target table.
kafka_pipe_stream: Name of the Pipe.VIRTUAL_CLUSTER = 'test_alter': Specifies the Virtual Cluster to use.COPY INTO kafka_sink_table_1: Copies data into the target tablekafka_sink_table_1.SELECT CURRENT_TIMESTAMP(), CAST(value AS STRING) FROM kafka_table_stream_pipe1: Selects data from the Table Stream, using the current timestamp and the convertedvaluefield as the two columns for the target table.
Other configurable properties:
INITIAL_DELAY_IN_SECONDS: Initial job scheduling delay (optional, default 0 seconds)BATCH_INTERVAL_IN_SECONDS: (Optional) Batch processing interval, default 60 seconds.BATCH_SIZE_PER_KAFKA_PARTITION: (Optional) Batch size per Kafka partition, default 500,000 records.MAX_SKIP_BATCH_COUNT_ON_ERROR: (Optional) Maximum number of batches to skip on error, default 30.RESET_KAFKA_GROUP_OFFSETS: (Optional) Initial Kafka offset when starting the Pipe. Cannot be modified after creation. Possible values:latest,earliest,none,valid,${TIMESTAMP_MILLISECONDS}none: No action (default)valid: Checks if the current group offset is expired and resets expired partition offsets to the current earliestearliest: Resets to the current earliestlatest: Resets to the current latest${TIMESTAMP_MILLISECONDS}: Resets to the offset corresponding to the millisecond timestamp, e.g.,1737789688000(2025-01-25 15:21:28)
3. Verify Results
Verify whether data has been successfully imported by querying the target table:
Check the running status of the Pipe to ensure it is working properly:
This command lists all created Pipes and their status information, including whether they are running and the last run time.
4. Status Monitoring and Management
Check Kafka Consumption Latency
Use the DESC PIPE command. The JSON string in pipe_latency contains the following fields:
lastConsumeTimestamp: The last consumed offset timestampoffsetLag: The backlog of Kafka datatimeLag: Consumption latency, calculated as the current time minus the last consumed offset timestamp. When Kafka consumption is abnormal, the value is -1
View Pipe Execution History
Since each Pipe execution is a COPY operation, you can view all operations in the job history. Filter by query_tag in Job History. All Pipe COPY jobs are tagged in the format pipe.``workspace_name``.schema_name.pipe_name for easy tracking.
Stop and Start a Pipe
- Pause a Pipe:
- Resume a Pipe:
Modify Pipe Properties
You can modify Pipe properties one at a time. If multiple properties need to be changed, run the ALTER command multiple times. Below are the modifiable properties and their syntax:
Examples:
Notes
- Modifying the COPY statement logic is not supported. If you need to modify it, delete the Pipe and recreate it.
- When modifying the
COPY_JOB_HINTof a Pipe, the new settings will overwrite all existing hints. If your Pipe already has hints such as{"cz.sql.split.kafka.strategy":"size"}, you must include all required hints together when setting new ones; otherwise existing hints will be overwritten. Separate multiple parameters with commas.
