Continuous Data Import from Object Storage Using Pipe
Pipe is a powerful data import feature in the Singdata Lakehouse platform. It allows users to read data directly from object storage at a fixed frequency and import it into the Lakehouse. By implementing a file detection mechanism, Pipe supports micro-batch file loading, enabling quick access to the latest data. It is particularly suitable for scenarios requiring real-time or near-real-time data processing.
Think of an object storage Pipe as a program that continuously scans an inbox — you simply upload files to OSS/S3/COS, and it automatically detects and imports them without any manual trigger.
How Pipe Works
-
File Detection:
- EVENT_NOTIFICATION_MODE: Requires enabling a message service. Uses Alibaba Cloud Message Service to notify the Lakehouse of new file uploads. Currently only Alibaba Cloud OSS and AWS S3 are supported.
- LIST_PURGE mode: Periodically scans directories, synchronizes unrecorded files, and deletes source files after synchronization.
-
COPY Statement: Defines the source location of data files and the target table, supporting multiple file formats.
-
Automated Loading: Automatically detects new files and executes the COPY statement.
-
Duplicate Import Prevention: To avoid duplicate imports, the
load_historyfunction records the COPY import history for the current table. When Pipe executes, it deduplicates based on theload_historytable name and import file name, ensuring already-imported files are not re-imported. If you need to re-import a recorded file, manually execute a COPY command. Records inload_historyare retained for 7 days. -
Pipe Import Job History: Since each execution is a Pipe-issued COPY, you can view all operations in the job history. Filter by
query_tagin the job history — all COPY jobs executed by a Pipe are tagged with the formatpipe.``workspace_name``.schema_name.pipe_name, making it easy to track and manage.
Use Cases
- Real-time Data Synchronization: When data is stored in object storage and needs frequent synchronization to access the latest data.
- Cost Optimization: Importing and exporting data via object storage avoids public network traffic costs. Within the same region, you can specify intranet transfer for object storage to further reduce costs.
Notes
- When using EVENT_NOTIFICATION_MODE, you must use the role ARN authorization method to create the storage connection.
- LIST_PURGE mode supports both access key and role ARN authorization.
- Recommended File Size: gzip compressed files should be around 50 MB. Uncompressed CSV and Parquet files should be between 128 MB and 256 MB.
- Data Loading Order: Data loading cannot guarantee strict ordering.
- Pipe Latency: Pipe loading time is affected by various factors, including file format, size, and the complexity of the COPY statement.
- Pipe and Volume Mapping: Each Pipe requires a dedicated Volume and cannot be shared.
- Modifying the COPY statement logic is not supported. If you need to change it, delete the Pipe and recreate it.
- When modifying a Pipe's
COPY_JOB_HINT, the new settings overwrite existing hints. If your Pipe already has hints such as{"cz.sql.split.kafka.strategy":"size"}, you must include all required hints together when setting new ones; otherwise existing hints will be overwritten. Separate multiple parameters with commas. - The COPY statement inside a PIPE does not support the
files,regexp, orsubdirectoryparameters.
Cost
Charged based on the computing resources used when loading files.
PIPE Syntax
-
<pipe_name>: The name of the Pipe object to create. -
VIRTUAL_CLUSTER: Specifies the Virtual Cluster name. -
INGEST_MODE: Determines the data ingestion mode — choose one:LIST_PURGE: Periodically polls and scans the directory — simple to configure, suitable for most scenarios; deletes source files after a successful import (irreversible)EVENT_NOTIFICATION: Triggers immediately upon receiving an object storage event notification — suitable for near-real-time ingestion where source files must be retained; requires additional MNS queue configuration
-
COPY_JOB_HINT: Optional, reserved parameter for Lakehouse. -
copy_statement:<copy_statement>supports all file parameters. When theON_ERROR=CONTINUE|ABORTparameter is set, it controls error handling during data loading, and the list of imported files is returned:CONTINUE: Skips error rows and continues loading subsequent data. Suitable for tolerating partial errors while maximizing data loading completion. Currently, ignorable errors are limited to file format mismatches — for example, the command specifies zip compression but the file uses zstd.ABORT: Immediately terminates the entireCOPYoperation. Suitable for strict data quality requirements where any error requires manual inspection.
Supported File Formats
Refer to COPY INTO import.
PIPE Load Examples
Using Scan File Mode (LIST_PURGE)
Step-by-step instructions
Step 1: Create a connection and volume
Step 2: Run the COPY command standalone to verify it imports successfully
Step 3: Use the above statement to build the Pipe object
Step 4: View Pipe execution history and imported files
- View the execution status of Pipe COPY jobs
Filter by query_tag in the job history. All COPY jobs executed by the Pipe are tagged with the format: pipe.workspace_name.schema_name.pipe_name
- View the history of files imported by COPY jobs
Using Event Notification Mode (Alibaba Cloud OSS and AWS S3 only)
Step 1: Enable Alibaba Cloud Message Service (MNS)
- Enable Message Service MNS in the Alibaba Cloud console.
- Configure MNS to listen to the OSS folder you want to synchronize. See the documentation
Step 2: Authorize Lakehouse to read OSS
Refer to the role ARN method in Alibaba Cloud Storage Connection Creation to authorize Lakehouse to read the corresponding OSS bucket.
Step 3: Grant MNS access to Lakehouse
Step 4: Create a Storage Connection
Step 5: Create a Volume
Step 6: Create a Pipe
Status Monitoring and Management
View Pipe Status
View Pipe Execution History
Since each execution is a Pipe-issued COPY, you can view all operations in the job history. Filter by query_tag in the job history. All COPY jobs executed by a Pipe are tagged with the format pipe.``workspace_name``.schema_name.pipe_name for easy tracking.
Stop and Start a Pipe
- Pause a Pipe:
- Resume a Pipe:
Modify Pipe Properties
You can modify Pipe properties one at a time. If multiple properties need to be changed, run the ALTER command multiple times. Below are the modifiable properties and their syntax:
Examples:
