Data Pipelines and Change Capture
This chapter covers two types of objects: Pipe (continuous data ingestion) and Table Stream (change data capture). A Pipe automatically writes data from external files or message streams into a table; a Table Stream records incremental changes on a table for downstream incremental processing.
A Pipe is a continuous data ingestion object in Lakehouse. Once created, it runs automatically, continuously reading data from object storage (OSS/COS/S3) or Kafka and writing it to a target table — no manual triggering required.
Think of a Pipe as a continuously running conveyor belt. After a file is uploaded to an OSS subdirectory, the Pipe detects it and loads it within approximately 30 seconds. Kafka messages are consumed and written in batches at a configured interval. Unlike scheduled jobs, a Pipe runs persistently and processes new data as it arrives.

Pipe Types
| Type | Data Source | Detection Latency | Typical Use Case |
|---|---|---|---|
| Object Storage Pipe | OSS / COS / S3 | ~30 seconds | Automatically ingest periodically uploaded CSV/Parquet/JSON files |
| Kafka Pipe | Kafka Topic | Per batch interval (default 60 seconds) | Real-time ingestion of logs and business events |
A Pipe is functionally equivalent to a Studio sync task. The difference is that a Pipe is created and managed via SQL DDL, making it suitable for code-driven pipeline management, while Studio sync tasks are configured through a visual interface and support more data sources (including relational databases).
Continuous Ingestion from Object Storage
Prerequisites: Create a Storage Connection → Create an External Volume (must point to a specific subdirectory, not the bucket root path) → Create the target table → Create the Pipe.
Once created, the Pipe starts running immediately and checks the Volume for new files approximately every 30 seconds.
Two Ingestion Modes
| Mode | Trigger | Source File Handling | Use Case |
|---|---|---|---|
LIST_PURGE | Periodic polling scan (~30 seconds) | Source file deleted after ingestion | Simple setup, suitable for most scenarios |
EVENT_NOTIFICATION | Object storage event notification (near real-time) | Source file retained | When you need to keep the original files; OSS and S3 only |
Deduplication
A Pipe uses load_history to record the paths of already-ingested files. Each unique file path is only ingested once — re-uploading the same file will not trigger a duplicate ingestion. Records in load_history are retained for 7 days.
Continuous Consumption from Kafka
A Pipe creates a persistent consumer group that pulls data from a Kafka Topic at a configured batch interval and writes it to a table.
Monitoring and Management
Key fields in DESC PIPE output:
| Field | Description |
|---|---|
pipe_status | RUNNING / PAUSED |
pipe_kind | VOLUME (object storage) or KAFKA |
properties | Configuration such as ingest_mode and vcluster |
input_name | Data source (Volume or Kafka Topic) |
output_name | Full path of the target table |
invalid_reason | Error reason when the Pipe is in an abnormal state |
To view Pipe execution history, filter job history by query_tag in the format pipe.workspace_name.schema_name.pipe_name.
Important Notes
- Volume must point to a subdirectory:
LOCATIONcannot be the bucket root path, or Pipe creation will fail. - Each Pipe requires its own Volume: Different Pipes cannot share the same Volume.
- The COPY statement cannot be modified: To change the ingestion logic, drop the Pipe and recreate it.
- Data loading order is not strictly guaranteed.
- Recommended file sizes: gzip-compressed files should be under 50 MB; uncompressed CSV/Parquet files should be 128 MB–256 MB.
EVENT_NOTIFICATIONmode requires additional MNS message queue configuration, supports only Alibaba Cloud OSS and AWS S3, and requires RoleARN-based authorization.
Related Documentation
- Object Storage Pipe — Complete configuration for LIST_PURGE and EVENT_NOTIFICATION
- Kafka Pipe — READ_KAFKA parameter reference and consumer offset management
- Pipe Syntax Reference — Complete DDL syntax
- Table Stream — Change data capture for CDC-driven incremental processing
