Using Virtual Clusters for Data Processing and Analysis

Tutorial Overview

Through this tutorial, you will learn how to use a Virtual Cluster to clean, transform, and aggregate the raw data of the New York Taxi public dataset Fhvhv trips, and perform multi-concurrent queries on the result data.

The tutorial content will be completed through the following steps:

  • Environment Preparation: Check the raw data through the sample dataset, create a computing cluster and target Schema
  • Data Transformation: Clean and aggregate data using a general-purpose cluster for subsequent analysis
  • Ad-hoc Analysis: Perform single concurrent SQL query analysis using the Studio Web environment
  • Concurrent Analysis: Simulate continuous multi-concurrency from the Dashboard through Python tasks

Getting Started

What is a Virtual Cluster?

A Virtual Cluster (VC or Cluster) is a computing resource object provided by Singdata Lakehouse for data processing and analysis. Virtual Clusters provide the necessary resources such as CPU, memory, and local temporary storage (SSD medium) to execute SQL jobs in the Lakehouse. Clusters feature quick creation/destruction, scaling up/down, pausing/resuming, and are charged based on resource specifications and usage duration, with no costs incurred when paused or deleted.

Virtual Clusters offer two types of clusters, general-purpose and analytical, to meet the isolation and optimization needs of different workloads for ETL and analysis scenarios. It is recommended to use general-purpose clusters for ETL data processing and analytical clusters for query analysis or supporting data product applications. General-purpose clusters support vertical scaling to meet the needs of different scales of ETL Pipeline tasks. Analytical clusters support horizontal scaling with multiple replicas within the cluster to meet the elastic capabilities for concurrent queries.

This tutorial will use general-purpose clusters for data cleaning and transformation, and analytical clusters for low-latency concurrent analysis.

Tutorial Goals

  • Create and use Virtual Clusters for different business workloads
  • Understand and use the elastic concurrency features of analytical clusters

Step01 Check Raw Data

  1. Create Virtual Clusters

Create both general-purpose (GENERAL PURPOSE) and analytical (ANALYTICS) clusters to achieve load isolation for ETL processing and query analysis.

-- 1.1 Create a virtual cluster for ETL processing CREATE VCLUSTER ETL_VC VCLUSTER_SIZE = MEDIUM VCLUSTER_TYPE = GENERAL AUTO_SUSPEND_IN_SECOND = 60 AUTO_RESUME = TRUE; -- 1.2 Create a virtual cluster for BI analysis, set elastic concurrency to support multiple concurrent queries CREATE VCLUSTER REPORTING_VC VCLUSTER_SIZE = XSMALL VCLUSTER_TYPE = ANALYTICS MIN_REPLICAS = 1 MAX_REPLICAS = 4 MAX_CONCURRENCY = 8 AUTO_SUSPEND_IN_SECOND = 300 AUTO_RESUME = TRUE ; -- 1.3 View cluster resources show vclusters ; -- 1.4 Switch the virtual cluster used in the current session,It will only take effect if selected and executed together with the SQL to be executed.It will only take effect if selected and executed together with the SQL to be executed. USE VCLUSTER REPORTING_VC;

  1. View the original dataset in the public dataset

2.1 View the field information of the original dataset.

--Describes New York City For-Hire-Vehicle trips. desc clickzetta_sample_data.nyc_taxi_tripdata.fhvhv_tripdata;

2.2 Preview Data Details

--Sample Of Trip Record Data select * from clickzetta_sample_data.nyc_taxi_tripdata.fhvhv_tripdata limit 10;

2.3 View the Number of Records in the Dataset

--1.49 billion rows select count(*) from clickzetta_sample_data.nyc_taxi_tripdata.fhvhv_tripdata;

Step02 Use General Cluster to Clean and Transform Data

  1. Specify the use of ETL_VC for data processing, and create the schema where the target table is located,It will only take effect if selected and executed together with the SQL to be executed.

use vcluster ETL_VC; create schema tutorial; use tutorial;

  1. Clean and transform the original dataset using CTAS and write to a new table

--2. Clean and transform the original dataset CREATE table tutorial.int_fhvhv_tripdata as SELECT hvfhs_license_num, CASE WHEN hvfhs_license_num = 'HV0002' THEN 'juno' WHEN hvfhs_license_num = 'HV0003' THEN 'uber' WHEN hvfhs_license_num = 'HV0004' THEN 'via' WHEN hvfhs_license_num = 'HV0005' THEN 'lyft' ELSE null END AS company, ltrim(rtrim(upper(dispatching_base_num))) dispatching_base_num, ltrim(rtrim(upper(originating_base_num))) originating_base_num, request_datetime, on_scene_datetime, pickup_datetime, dropoff_datetime, PULocationID, DOLocationID, trip_miles, trip_time, base_passenger_fare, tolls, bcf, sales_tax, congestion_surcharge, airport_fee, tips, driver_pay, CASE WHEN shared_request_flag = 'Y' THEN true WHEN shared_request_flag IN ('N', ' ') THEN false ELSE null END AS shared_request_flag, CASE WHEN shared_match_flag = 'Y' THEN true WHEN shared_match_flag IN ('N', ' ') THEN false ELSE null END AS shared_match_flag, CASE WHEN access_a_ride_flag = 'Y' THEN true WHEN access_a_ride_flag IN ('N', ' ') THEN false ELSE null END AS access_a_ride_flag, CASE WHEN wav_request_flag = 'Y' THEN true WHEN wav_request_flag IN ('N', ' ') THEN false ELSE null END AS wav_request_flag, CASE WHEN wav_match_flag = 'Y' THEN true WHEN wav_match_flag IN ('N', ' ') THEN false ELSE null END AS wav_match_flag FROM clickzetta_sample_data.nyc_taxi_tripdata.fhvhv_tripdata;

Validate the processed data

SELECT * FROM tutorial.int_fhvhv_tripdata LIMIT 10;

  1. Aggregate the cleaned data according to the analysis topics to generate data tables for analysis

--Scenario 1: Analyze taxi trip patterns by time of day CREATE table tutorial.mart_trips_pattern_by_time AS SELECT EXTRACT(HOUR FROM pickup_datetime) AS hour, COUNT(*) AS trip_count FROM tutorial.int_fhvhv_tripdata GROUP BY hour; --Scenario 2: Analyze taxi trip patterns by day of the week CREATE table tutorial.mart_trips_pattern_by_dayofweek AS SELECT EXTRACT(DAY FROM pickup_datetime) AS day_of_week, COUNT(*) AS trip_count FROM tutorial.int_fhvhv_tripdata GROUP BY day_of_week; --Scenario 3: Analyze taxi trip patterns by pickup location CREATE table tutorial.mart_trips_pattern_by_pickup_location AS SELECT PULocationID, COUNT(*) AS trip_count FROM tutorial.int_fhvhv_tripdata GROUP BY PULocationID; --Scenario 4: Analyze taxi trip patterns by dropoff location CREATE table tutorial.mart_trips_pattern_by_dropoff_location AS SELECT DOLocationID, COUNT(*) AS trip_count FROM tutorial.int_fhvhv_tripdata GROUP BY DOLocationID; --Scenario 5:Trips per day CREATE table tutorial.mart_trips_per_day AS SELECT pickup_datetime::date AS date, sum(trip_miles) AS trip_miles FROM tutorial.int_fhvhv_tripdata GROUP BY date; --Scenario 6:Total driver pay per company CREATE table tutorial.mart_trips_driver_pay_per_company AS SELECT CONCAT(YEAR(pickup_datetime), '-', MONTH(pickup_datetime)) AS year_month, company, sum(driver_pay) AS driver_pay FROM tutorial.int_fhvhv_tripdata GROUP BY year_month,company;

Check if the data object was created successfully.

-- Check the status of the newly created data model show tables in tutorial;

--Check the data of the newly created data model SELECT * FROM tutorial.mart_trips_driver_pay_per_company WHERE substr(year_month,0,4)='2021' ORDER BY year_month ASC;

Step03 Use Analytical Cluster for Single Concurrency Query

  1. Switch the current Session's virtual cluster to REPORTING_VC

-- 1. Use analytical VC for accelerated query analysis,It will only take effect if selected and executed together with the SQL to be executed. USE VCLUSTER REPORTING_VC; -- Set query job tag for retrieval and filtering,It will only take effect if selected and executed together with the SQL to be executed. SET QUERY_TAG = 'Tutorial02';

  1. Execute 6 Business Analysis Queries Sequentially

--Scenario 1: Analyze taxi trip patterns by time of day SELECT * FROM tutorial.mart_trips_pattern_by_time ORDER BY HOUR ASC; --Scenario 2: Analyze taxi trip patterns by day of the week SELECT * FROM tutorial.mart_trips_pattern_by_dayofweek ORDER BY day_of_week ASC; --Scenario 3: Analyze taxi trip patterns by pickup location SELECT * FROM tutorial.mart_trips_pattern_by_pickup_location ORDER BY trip_count DESC LIMIT 10; --Scenario 4: Analyze taxi trip patterns by dropoff location SELECT * FROM tutorial.mart_trips_pattern_by_dropoff_location ORDER BY trip_count DESC LIMIT 10; --Scenario 5:Trips per day SELECT * FROM tutorial.mart_trips_per_day WHERE CONCAT(YEAR(date) , MONTH(date)) = '202110' ORDER BY date; --Scenario 6:Total driver pay per company SELECT * FROM tutorial.mart_trips_driver_pay_per_company WHERE substr(year_month,0,4)='2021' ORDER BY year_month ASC;

  1. Observe the Latency Results of the Query

-- Clear QUERY_TAG SET QUERY_TAG = ''; -- View the execution results of running jobs SHOW JOBS WHERE QUERY_TAG='Tutorial02' LIMIT 10;

You can also use the job history on the Studio page to view the execution status of query jobs, where each job entry shows its runtime, status, and scan metrics.

Step04 Use Python Tasks for Concurrent Queries

Test concurrent queries using Python tasks to observe the query performance and elastic concurrency expansion capabilities of the analytical cluster under continuous dynamic concurrency.

  1. In the Studio development module, create a Python task by clicking the "+" button in the task list and selecting "Python Task".

  2. Write a concurrent test script using the Lakehouse Python SDK

The script in this tutorial implements the following processing logic

  • Create a Lakehouse service instance connection, specifying the workspace and compute cluster name
  • Submit continuous, incrementally increasing concurrent queries initiated by multiple users to the Dashboard
  • Collect and print the latency under concurrent queries and the changes in the elastic expansion state of computing resources

# Step 04: Use Studio Python task for concurrent queries # Steps: # 1. Create a Lakehouse service instance connection, specifying the workspace and compute cluster name # 2. Submit continuous, gradient-increasing concurrent queries initiated by multiple users to the Dashboard # 3. Observe the dynamic elastic concurrency capability of the analytical compute cluster under continuous concurrent queries # from clickzetta import connect import random import time import concurrent.futures import threading from queue import Queue from datetime import datetime # Establish connection conn = connect( username='xxx', # Replace with the current login username password='xxx', # Replace with the login password service='api.singdata.com', instance='xxx', # Replace with the current service instance name. You can check the browser domain address, the format is: <instance-name>.<Region_ID>.app.singdata.com. For example: in 19d58db8.cn-shanghai-alicloud.app.singdata.com, 19d58db8 represents the service instance name. workspace='xxx', # Replace with the workspace name schema='tutorial', vcluster='reporting_vc' ) queries = [ """ SELECT * FROM tutorial.mart_trips_pattern_by_time ORDER BY HOUR ASC; """, """ SELECT * FROM tutorial.mart_trips_pattern_by_dayofweek ORDER BY day_of_week ASC; """, """ SELECT * FROM tutorial.mart_trips_pattern_by_pickup_location ORDER BY trip_count DESC LIMIT 10; """, """ SELECT * FROM tutorial.mart_trips_pattern_by_dropoff_location ORDER BY trip_count DESC LIMIT 10; """, """ SELECT * FROM tutorial.mart_trips_per_day WHERE CONCAT(YEAR(date) , MONTH(date)) = '202110' ORDER BY date; """, """ SELECT * FROM tutorial.mart_trips_driver_pay_per_company WHERE substr(year_month,0,4)='2021' ORDER BY year_month ASC; """ ] # Submit query and measure latency def submit_query_and_measure_latency(query): # Create cursor object cursor = conn.cursor() start_time = time.time() # Execute SQL query cursor.execute(query) # Fetch query results results = cursor.fetchall() latency = time.time() - start_time return latency # Query task def query_task(barrier, query_queue, all_latencies): while True: # Wait for all threads to be ready barrier.wait() # Submit query task query = query_queue.get() if query is None: break latency = submit_query_and_measure_latency(query) all_latencies.append(latency) query_queue.task_done() # Check the dynamic changes in the elastic concurrency configuration of the compute cluster def check_cluster_concurrency_scaling(): cursor = conn.cursor() # Execute SQL query cursor.execute('desc vcluster reporting_vc;') # Fetch query results results = cursor.fetchall() for row in results: if row[0] == 'current_replicas': print(row) # Main function if __name__ == "__main__": num_concurrent_list = [4, 8, 12, 16] # Different concurrency levels rounds = 30 for num_threads in num_concurrent_list: print(f"---Running with {num_threads} concurrent queries:---") # Shared list to store results from all threads all_latencies = [] # Create query queue query_queue = Queue() # Put query tasks into the queue for _ in range(num_threads): for _ in range(rounds): query = random.choice(queries) query_queue.put(query) # Create a Barrier to wait for all threads to be ready simultaneously barrier = threading.Barrier(num_threads) # Create and start threads threads = [] results = [] start_times = [] for _ in range(num_threads): thread = threading.Thread(target=query_task, args=(barrier, query_queue, all_latencies)) thread.start() threads.append(thread) # Wait for all query tasks to complete query_queue.join() # Stop threads for _ in range(num_threads): query_queue.put(None) for thread in threads: thread.join() # Calculate metrics all_latencies.sort() avg_latency = sum(all_latencies) / len(all_latencies) p95_index = int(len(all_latencies) * 0.95) p95_latency = all_latencies[p95_index] p99_index = int(len(all_latencies) * 0.99) p99_latency = all_latencies[p99_index] qps = len(all_latencies) / sum(all_latencies) # Print results print("Total Queries:", len(all_latencies)) print("Average Latency:", avg_latency) print("P95 Latency:", p95_latency) print("P99 Latency:", p99_latency) print("Queries per Second (QPS):", qps) check_cluster_concurrency_scaling()

When setting the maximum concurrency of a single Replica for reporting_vc to 4, the printed results are as follows:

  • --Running with 4 concurrent queries:--- Total Queries: 120 Average Latency: 0.2201933761437734 P95 Latency: 0.43064022064208984 P99 Latency: 0.683488130569458 Queries per Second (QPS): 4.5414626793635176 ('current_replicas', '1') ---Running with 8 concurrent queries:--- Total Queries: 240 Average Latency: 0.20615292688210804 P95 Latency: 0.2397170066833496 P99 Latency: 0.4295358657836914 Queries per Second (QPS): 4.850767898977571 ('current_replicas', '2') ---Running with 12 concurrent queries:--- Total Queries: 360 Average Latency: 0.2232776681582133 P95 Latency: 0.27333879470825195 P99 Latency: 0.46774768829345703 Queries per Second (QPS): 4.478728250115035 ('current_replicas', '3') ---Running with 16 concurrent queries:--- Total Queries: 480 Average Latency: 0.23430742422739664 P95 Latency: 0.25676393508911133 P99 Latency: 0.4392051696777344 Queries per Second (QPS): 4.267897200856488 ('current_replicas', '4')

The client simulates four rounds of concurrent queries with 4, 8, 12, and 16 concurrent queries respectively, with each concurrency level submitting 30 queries consecutively per round. It can be observed that reporting_vc dynamically increases the number of Replicas based on the client's concurrency level, ensuring that the average latency, P95, P99, and QPS metrics remain stable under different concurrency levels without the user being aware of the cluster's dynamic scaling.