Pipe Syntax
This document provides detailed syntax for creating a Pipe object to automate the import of data from object storage or Kafka into the Lakehouse. The Pipe object is a powerful tool that can help users simplify the data import process and ensure efficient data flow.
Creating PIPE Syntax
Use the following syntax to create a Pipe object to automate the import of data from object storage into the Lakehouse.
Importing Data from Object Storage
To create a Pipe object to import data from object storage, you can use the following syntax:
<pipe_name>: The name of the Pipe object you want to create.VIRTUAL_CLUSTER: Specify the name of the virtual cluster.INGEST_MODE: Set toLIST_PURGEorEVENT_NOTIFICATIONto determine the data ingestion mode.COPY_JOB_HINT: Optional, Lakehouse reserved parameterIGNORE_TMP_FILE: Values can betrueorfalse, with the default value beingtrue. This parameter supports filtering files or directories that start with a dot (.) or_temporary. For example,s3://my_bucket/a/b/.SUCCESS,oss://my_bucket/a/b/_temporary/, oross://my_bucket/a/b/_temporary_123/.
Instructions
Import data from Kafka
To create a Pipe object to import data from Kafka, you can use the following syntax:
<pipe_name>: The name of the Pipe object you want to create.VIRTUAL_CLUSTER: Specify the name of the virtual cluster.BATCH_INTERVAL_IN_SECONDS: (Optional) Set the batch interval time, default is 60 seconds.BATCH_SIZE_PER_KAFKA_PARTITION: (Optional) Set the batch size per Kafka partition, default is 500,000 records.MAX_SKIP_BATCH_COUNT_ON_ERROR: (Optional) Set the maximum retry count for skipped batches on error, default is 30.RESET_KAFKA_GROUP_OFFSETS: (Optional) Sets the initial offset for Kafka when starting the pipe. This property cannot be modified after the pipe is created. Possible values arelatest,earliest,none,valid, and${TIMESTAMP_MILLISECONDS}.none: No action by default.valid: Checks if the current offset in the group is expired and resets expired partitions to the current earliest offset.earliest: Resets to the current earliest offset.latest: Resets to the current latest offset.${TIMESTAMP_MILLISECONDS}: Resets to the offset corresponding to the millisecond timestamp, for example,'1737789688000'(which corresponds to January 25, 2025, 15:21:28).
Instructions
- Using read_kafka to Continuously Import Kafka Data
- Using Kafka Table Stream to Continuously Import Kafka Data
Pause and Start PIPE
You can control the execution state of the PIPE to manage the data synchronization process.
- Pause PIPE:
- Start PIPE:
View Pipe Details
View detailed information of a specific Pipe object.
View Pipe List and Object Details
List all Pipe objects and their detailed information.
Delete Pipe Object
When a Pipe object is no longer needed, you can use the following command to delete it.
load_history function
Function Description: The load_history function is used to view the COPY job import file history of a table, with a retention period of 7 days. At the same time, when Pipe is executed, it will avoid re-importing existing files based on load_history to ensure the uniqueness of the data.
Function Syntax:
- schema_name.table_name: Specify the table name to view the import history.
Use Case:
Constraints and Limitations
- When the data source is Kafka: Only one read_kafka function is allowed in a pipe
- When the data source is object storage: Only one volume object is allowed in a pipe
