Pipe

Pipe is the continuous data ingestion object in the Lakehouse. Once created via SQL DDL, it runs automatically, continuously reading data from object storage (OSS/COS/S3) or Kafka and writing it to a target table.

For a detailed introduction, see Pipe Object Model.


Chapter Contents

PageDescription
CREATE PIPECreate an object storage Pipe or Kafka Pipe
ALTER PIPEPause, resume, or modify batch interval and other properties
DROP PIPEDrop a Pipe (does not affect target table data)
SHOW PIPESList all Pipes in the current Schema
SHOW CREATE PIPEView the creation statement of a Pipe
DESC PIPEView Pipe details including status, source, target, and latency

Common Operations

Create an Object Storage Pipe

-- LIST_PURGE mode: periodic polling, deletes source files after import CREATE PIPE orders_pipe VIRTUAL_CLUSTER = 'DEFAULT' INGEST_MODE = 'LIST_PURGE' AS COPY INTO orders FROM VOLUME orders_vol USING CSV OPTIONS('header' = 'true');

Create a Kafka Pipe

CREATE PIPE kafka_orders_pipe VIRTUAL_CLUSTER = 'DEFAULT' BATCH_INTERVAL_IN_SECONDS = '60' AS COPY INTO orders_raw FROM ( SELECT CAST(value AS STRING) AS raw_msg FROM TABLE(READ_KAFKA( 'kafka-host:9092', 'orders_topic', '', 'pipe_orders_group', '', '', '', '', 'raw', 'raw', 0, map() )) );

Pause and Resume

-- Pause ALTER PIPE orders_pipe SET PIPE_EXECUTION_PAUSED = TRUE; -- Resume ALTER PIPE orders_pipe SET PIPE_EXECUTION_PAUSED = FALSE; -- Trigger an immediate scan ALTER PIPE orders_pipe REFRESH;

View and Drop

-- View all Pipes SHOW PIPES; -- View Pipe details DESC PIPE orders_pipe; -- Drop a Pipe DROP PIPE orders_pipe;


DocumentDescription
Pipe Object ModelCore concepts, comparison of two modes, deduplication mechanism, complete parameter reference
Object Storage Pipe Detailed ConfigurationComplete configuration for EVENT_NOTIFICATION mode
Kafka Pipe Detailed ConfigurationREAD_KAFKA parameter reference, consumer offset management
Real-time Pipeline Selection GuideComparison and selection guide for Pipe / Stream / Dynamic Table