Table Stream
Table Stream is a real-time data stream used to record data manipulation language (DML) changes made to a table, including insert, update, and delete operations. At the same time, Table Stream also provides metadata about each change, so you can take appropriate actions based on the changed data. Table Stream can be created on Table, Dynamic Table, Materialized View, and External Table (Kafka). It also supports creating Volume Stream on External Volumes to monitor file change events in external object storage (such as OSS).
Table Stream Syntax
Syntax for Creating Table Stream on Table, Dynamic Table, and Materialized View
-
<name>: The name of the Table Stream. -
<table_name>: The base table from which to get the incremental data. Views are not supported. -
TIMESTAMP AS OF timestamp_expression: (Optional) Specifies the timestamp expression from which the Table Stream should start receiving updates from the underlying table. If this parameter is omitted, the Table Stream will start receiving updates from the current time.- The
timestamp_expressionreturns a standard timestamp type expression. The earliest timestamp specified by TIMESTAMP AS OF depends on the TIME TRAVEL(data_retention_days) parameter. If the specified version does not exist, an error will be reported. If not specified, the current timestamp version data will be used, for example:'2023-11-07 14:49:18', a string that can be cast to a timestamp.cast('2023-11-07 14:49:18 Asia/Shanghai' as timestamp).current_timestamp() - interval '12' hours.- Any other expression that is itself a timestamp or can be cast to a timestamp.
- The
-
COMMENT: (Optional) Comment for the Table Stream. -
'TABLE_STREAM_MODE' = 'APPEND_ONLY|STANDARD': (Required) Choose one of the two values, APPEND_ONLY or STANDARD.- APPEND_ONLY records only the INSERT operations of the object. Update and delete operations are not recorded. For example, if 10 rows were initially inserted into the table and then 5 rows were deleted without moving the offset, the Table Stream would still record 10 rows of operations.
- STANDARD mode: In STANDARD mode, all DML changes to the source object can be tracked, including inserts, updates, and deletes (including table truncation). This row-level change is provided by joining and processing all delta data changes. The delta changes in the Table Stream refer to the data changes that occur between two transaction points. For example, if a row is inserted and then updated after the Table Stream's offset, the delta change is a new row. If a row is inserted and then deleted after the stream's offset, the delta change is no row. In other words, the delta change reflects the latest state of the source object, not the historical changes.
-
SHOW_INITIAL_ROWS: Optional parameter that controls the visibility of existing data in the table at the time of Stream creation:
'FALSE'(default): Existing data in the table at the time of Stream creation is not visible; only changes that occur after Stream creation are captured.'TRUE': Existing data in the table at the time of Stream creation is exposed asINSERTrecords in the first consumption. After consuming the initial snapshot, subsequent changes produceUPDATE_BEFORE/UPDATE_AFTER/DELETEas normal.
Note:
SHOW_INITIAL_ROWSonly affects whether historical data is visible at the time of Stream creation, and does not affect the values of__change_type. Regardless of the value of this parameter, in STANDARD mode, UPDATE always produces bothUPDATE_BEFOREandUPDATE_AFTERrows, and DELETE always producesDELETErecords.
Notes
- The earliest timestamp specified by TIMESTAMP AS OF depends on the TIME TRAVEL(data_retention_days) parameter. If the specified version does not exist, an error will be reported. This parameter defines the length of time that deleted data is retained. By default, Lakehouse retains data for one day. Depending on your business needs, you can extend or shorten the data retention period by adjusting the
data_retention_daysparameter. Please note that adjusting the data retention period may affect storage costs. Extending the retention period will increase storage requirements, which may increase related costs. - Data written through real-time uploads can only be read after one minute, and Table Stream can only read committed data. Data written by real-time tasks needs to wait for 1 minute to be confirmed, so Table Stream also needs to wait for 1 minute to see it.
Usage Examples
Case 1: Create a Table Stream in APPEND_ONLY mode
Case 2: Create a Table Stream in STANDARD mode
Kafka Table Stream
Supports creating a Table Stream on a Kafka external table for real-time consumption of Topic data.
Syntax
Parameter Description
TIMESTAMP AS OF: Specifies the consumption start time point (optional). Supported formats:- Explicit timestamp:
'2023-01-01 12:00:00' - Time function:
CURRENT_TIMESTAMP() - INTERVAL '1' HOUR
- Explicit timestamp:
show_initial_rows: Controls initial data loading behavior:true: Loads historical data from the offset specified when the external table was created to the offset specified by the Table Streamfalse: Starts consuming from the latest offset (latest), does not load historical data (default value)
table_stream_mode: Fixed toappend_only, only processes new Kafka data (update/delete operations are not supported)
Examples
- Creating a Kafka External Table First, you need to create a Kafka External Table. This is the foundation for creating a Table Stream. The following is the syntax for creating a Kafka external table:
Create a Kafka Connection
Create a Kafka External Table
external_table_kafkais the name of the external table.key_columnandvalue_columnrepresent the key and value of the Kafka message respectively, wherevalue_columnis required.USING kafkaspecifies that Kafka is used as the data source.- The
OPTIONSsection contains Kafka consumer configuration, such as the consumer group ID (group_id) and the topic to subscribe to (topics). CONNECTION pipe_kafkaspecifies the connection configuration to Kafka, which typically includes the Kafka cluster address and other connection parameters.
- Creating a Table Stream
Based on the Kafka external table, you can create a Table Stream for real-time processing of the Kafka data stream. The following is the syntax for creating a Table Stream:
Volume Stream
CREATE VOLUME STREAM Syntax
Used to create a Volume Stream on the Directory Table of an External Volume to monitor file creation and deletion events in external object storage.
Parameter Description
<name>: The name of the Volume Stream.
<volume_name>: The name of the External Volume to monitor. The Volume must meet the following conditions:
- Directory feature must be enabled, i.e., specify DIRECTORY = (ENABLE = TRUE) when creating the table;
- Recursive monitoring must be enabled, i.e., specify RECURSIVE = TRUE when creating the table.
Example
Notes
- Before creating a Volume Stream, you must enable DIRECTORY = (ENABLE = TRUE) and RECURSIVE = TRUE.
- Volume Stream is essentially a Table Stream built on top of a Directory Table, and its consumption mechanism is fully consistent with Table Stream: only DML statements (such as INSERT INTO ... SELECT FROM stream) advance the consumption offset; pure SELECT queries do not advance it.
- Since object storage events are delivered through message queues with approximately 1 minute of delay, you need to wait about 1 minute after uploading a file before the corresponding change record can be queried in the Volume Stream.
