Basic Examples
This document provides basic usage examples for LangChain Singdata integration, suitable for beginners to get started quickly.
Database Connection
Basic Connection
from langchain_clickzetta import ClickZettaEngine
Create a database engine:
engine = ClickZettaEngine(
service="your-service",
instance="your-instance",
workspace="your-workspace",
schema="your-schema",
username="your-username",
password="your-password",
vcluster="your-vcluster"
)
Test the connection:
try:
results, columns = engine.execute_query("SELECT 1 as test")
print("✅ Connection successful")
except Exception as e:
print(f"❌ Connection failed: {e}")
Using Environment Variables
import os
from langchain_clickzetta import ClickZettaEngine
Read configuration from environment variables:
engine = ClickZettaEngine(
service=os.getenv("CLICKZETTA_SERVICE"),
instance=os.getenv("CLICKZETTA_INSTANCE"),
workspace=os.getenv("CLICKZETTA_WORKSPACE"),
schema=os.getenv("CLICKZETTA_SCHEMA"),
username=os.getenv("CLICKZETTA_USERNAME"),
password=os.getenv("CLICKZETTA_PASSWORD"),
vcluster=os.getenv("CLICKZETTA_VCLUSTER")
)
Connection with Timeout Configuration
engine = ClickZettaEngine(
service="your-service",
instance="your-instance",
workspace="your-workspace",
schema="your-schema",
username="your-username",
password="your-password",
vcluster="your-vcluster",
connection_timeout=60, # Connection timeout 60 seconds
query_timeout=1800, # Query timeout 30 minutes
hints={
"sdk.job.timeout": 3600, # Job timeout 1 hour
"query_tag": "langchain_demo"
}
)
SQL Queries
Basic Queries
Simple query:
results, columns = engine.execute_query("SELECT COUNT(*) as total FROM users")
print(f"Total users: {results[0]['total']}")
View all tables:
tables = engine.get_table_names()
print(f"Tables in database: {tables}")
Get table structure info:
table_info = engine.get_table_info(table_names=["users"])
print(f"Table structure:\n{table_info}")
Natural Language SQL Queries
from langchain_clickzetta import ClickZettaSQLChain
from langchain_community.llms import Tongyi
Initialize the large language model:
llm = Tongyi(
dashscope_api_key="your-dashscope-api-key",
model_name="qwen-plus"
)
Create SQL chain:
sql_chain = ClickZettaSQLChain.from_engine(
engine=engine,
llm=llm,
return_sql=True
)
Natural language queries:
questions = [
"How many tables are in the database?",
"How many records are in the users table?",
"Count users by age group"
]
for question in questions:
try:
result = sql_chain.invoke({"query": question})
print(f"Question: {question}")
print(f"SQL: {result['sql_query']}")
print(f"Answer: {result['result']}")
print("---")
except Exception as e:
print(f"Query failed: {e}")
Parameterized Queries
Parameterized query to avoid SQL injection:
def get_users_by_age(min_age: int):
sql = "SELECT name, age FROM users WHERE age >= ?"
results, columns = engine.execute_query(sql, parameters={"age": min_age})
return results
Usage example:
adult_users = get_users_by_age(18)
print(f"Adult users: {len(adult_users)}")
Vector Store
Basic Vector Store
from langchain_clickzetta import ClickZettaVectorStore
from langchain_community.embeddings import DashScopeEmbeddings
from langchain_core.documents import Document
Initialize the embedding model:
embeddings = DashScopeEmbeddings(
dashscope_api_key="your-dashscope-api-key",
model="text-embedding-v4"
)
Create vector store:
vector_store = ClickZettaVectorStore(
engine=engine,
embeddings=embeddings,
table_name="example_vectors"
)
Add documents:
documents = [
Document(page_content="Singdata is a high-performance analytical database"),
Document(page_content="LangChain is an AI application development framework"),
Document(page_content="Vector search enables semantic retrieval")
]
vector_store.add_documents(documents)
print("✅ Documents added to vector store")
Similarity Search
Basic similarity search:
query = "What is a database?"
results = vector_store.similarity_search(query, k=3)
print(f"Query: {query}")
for i, doc in enumerate(results, 1):
print(f"{i}. {doc.page_content}")
Similarity Search with Scores
Get similarity scores:
results_with_scores = vector_store.similarity_search_with_score(query, k=3)
print(f"Query: {query}")
for i, (doc, score) in enumerate(results_with_scores, 1):
print(f"{i}. Score: {score:.4f} - {doc.page_content}")
Add documents with metadata:
documents_with_metadata = [
Document(
page_content="Python is a programming language",
metadata={"category": "programming", "level": "beginner"}
),
Document(
page_content="Machine learning is a branch of AI",
metadata={"category": "ai", "level": "intermediate"}
)
]
vector_store.add_documents(documents_with_metadata)
Search with filter conditions:
results = vector_store.similarity_search(
"programming related",
k=5,
filter={"category": "programming"}
)
print("Programming-related documents:")
for doc in results:
print(f"- {doc.page_content}")
Full-Text Search
from langchain_clickzetta.retrievers import ClickZettaFullTextRetriever
Create full-text retriever:
fulltext_retriever = ClickZettaFullTextRetriever(
engine=engine,
table_name="example_documents",
search_type="phrase", # Phrase search
k=5
)
Add documents to full-text index:
documents = [
Document(page_content="Artificial intelligence technology is developing rapidly"),
Document(page_content="Big data analytics applications in business"),
Document(page_content="Cloud computing provides scalable infrastructure")
]
fulltext_retriever.add_documents(documents)
Perform full-text search:
query = "artificial intelligence"
results = fulltext_retriever.get_relevant_documents(query)
print(f"Full-text search results: {query}")
for doc in results:
print(f"- {doc.page_content}")
if "relevance_score" in doc.metadata:
print(f" Relevance score: {doc.metadata['relevance_score']}")
Key-Value Store
Basic Key-Value Operations
from langchain_clickzetta import ClickZettaStore
Create key-value store:
store = ClickZettaStore(
engine=engine,
table_name="example_store"
)
Store data:
data = [
("user:123", b"Zhang San"),
("user:456", b"Li Si"),
("config:theme", b"dark"),
("config:language", b"zh-CN")
]
store.mset(data)
print("✅ Data stored")
Retrieve data:
keys = ["user:123", "user:456", "config:theme"]
values = store.mget(keys)
for key, value in zip(keys, values):
if value:
print(f"{key}: {value.decode('utf-8')}")
Prefix Search
Get all user-related keys:
user_keys = list(store.yield_keys(prefix="user:"))
print(f"User keys: {user_keys}")
Get all config-related keys:
config_keys = list(store.yield_keys(prefix="config:"))
print(f"Config keys: {config_keys}")
Delete Operations
Delete specified keys:
store.mdelete(["user:456", "config:theme"])
print("✅ Specified keys deleted")
Verify deletion result:
remaining_values = store.mget(["user:123", "user:456", "config:language"])
for key, value in zip(["user:123", "user:456", "config:language"], remaining_values):
status = "exists" if value else "deleted"
print(f"{key}: {status}")
Chat History
Basic Chat History Management
from langchain_clickzetta import ClickZettaChatMessageHistory
from langchain_core.messages import HumanMessage, AIMessage
Create chat history manager:
chat_history = ClickZettaChatMessageHistory(
engine=engine,
session_id="user_123",
table_name="example_chat_history"
)
Add conversation messages:
chat_history.add_message(HumanMessage(content="Hello"))
chat_history.add_message(AIMessage(content="Hello! How can I help you?"))
chat_history.add_message(HumanMessage(content="Tell me about Singdata"))
chat_history.add_message(AIMessage(content="Singdata is a next-generation cloud-native lakehouse platform..."))
print("✅ Chat history saved")
Retrieving Chat History
Get all messages:
messages = chat_history.messages
print(f"Chat history ({len(messages)} messages total):")
for msg in messages:
speaker = "User" if msg.__class__.__name__ == "HumanMessage" else "AI"
print(f"{speaker}: {msg.content}")
Get Recent Conversations
Get the 3 most recent messages:
recent_messages = chat_history.get_messages_by_count(3)
print(f"Recent conversations ({len(recent_messages)} messages total):")
for msg in recent_messages:
speaker = "User" if msg.__class__.__name__ == "HumanMessage" else "AI"
print(f"{speaker}: {msg.content}")
Get Conversations by Time Range
Get today's conversations:
from datetime import datetime, timedelta
today = datetime.now().strftime("%Y-%m-%d 00:00:00")
tomorrow = (datetime.now() + timedelta(days=1)).strftime("%Y-%m-%d 00:00:00")
today_messages = chat_history.get_messages_by_time_range(
start_time=today,
end_time=tomorrow
)
print(f"Today's conversations ({len(today_messages)} messages total):")
for msg in today_messages:
speaker = "User" if msg.__class__.__name__ == "HumanMessage" else "AI"
print(f"{speaker}: {msg.content}")
Clear Chat History
Get conversation statistics:
message_count = chat_history.get_session_count()
print(f"Session has {message_count} messages total")
Clear all messages in current session:
chat_history.clear()
print("✅ Chat history cleared")
Verify cleanup result:
remaining_count = chat_history.get_session_count()
print(f"Messages remaining after cleanup: {remaining_count}")
Batch Operations Examples
Batch Add Documents
def batch_add_documents(vector_store, document_texts, batch_size=10):
"""Batch add documents to vector store"""
documents = [Document(page_content=text) for text in document_texts]
# Process in batches
for i in range(0, len(documents), batch_size):
batch = documents[i:i + batch_size]
vector_store.add_documents(batch)
print(f"Processed {min(i + batch_size, len(documents))}/{len(documents)} documents")
Usage example:
sample_texts = [
f"This is the {i}th sample document, containing some test content"
for i in range(1, 51) # 50 documents
]
batch_add_documents(vector_store, sample_texts, batch_size=10)
Batch Queries
def batch_search(vector_store, queries, k=3):
"""Batch execute search queries"""
results = {}
for query in queries:
try:
docs = vector_store.similarity_search(query, k=k)
results[query] = [doc.page_content for doc in docs]
except Exception as e:
results[query] = f"Query failed: {e}"
return results
Usage example:
queries = [
"What is a database?",
"How to do machine learning?",
"What are the advantages of cloud computing?"
]
search_results = batch_search(vector_store, queries)
for query, results in search_results.items():
print(f"Query: {query}")
if isinstance(results, list):
for i, result in enumerate(results, 1):
print(f" {i}. {result[:50]}...")
else:
print(f" {results}")
print()
Utility Functions
def test_connection(engine):
"""Test database connection"""
try:
# Test basic query
results, _ = engine.execute_query("SELECT CURRENT_TIMESTAMP as now")
print(f"✅ Connection successful, current time: {results[0]['now']}")
# Test table access
tables = engine.get_table_names()
print(f"✅ Accessible tables: {len(tables)}")
return True
except Exception as e:
print(f"❌ Connection test failed: {e}")
return False
Usage example:
if test_connection(engine):
print("Database connection is normal, you can continue")
else:
print("Please check connection configuration")
def get_document_stats(vector_store):
"""Get document storage statistics"""
try:
# Query total document count
sql = f"SELECT COUNT(*) as total FROM {vector_store.table_name}"
results, _ = vector_store.engine.execute_query(sql)
total_docs = results[0]['total']
# Query recently added documents
sql = f"""
SELECT COUNT(*) as recent
FROM {vector_store.table_name}
WHERE created_at >= NOW() - INTERVAL 1 DAY
"""
results, _ = vector_store.engine.execute_query(sql)
recent_docs = results[0]['recent']
return {
"total_documents": total_docs,
"recent_documents": recent_docs
}
except Exception as e:
print(f"Failed to get statistics: {e}")
return None
Usage example:
stats = get_document_stats(vector_store)
if stats:
print(f"Total documents: {stats['total_documents']}")
print(f"Added in last 24 hours: {stats['recent_documents']}")
Complete Example: Mini Q&A System
def create_mini_qa_system():
"""Create a mini Q&A system"""
# 1. Initialize components
engine = ClickZettaEngine(
# Your connection parameters...
)
embeddings = DashScopeEmbeddings(
dashscope_api_key="your-api-key",
model="text-embedding-v4"
)
vector_store = ClickZettaVectorStore(
engine=engine,
embeddings=embeddings,
table_name="mini_qa_docs"
)
# 2. Add knowledge base documents
knowledge_docs = [
"Python is a high-level programming language with simple and easy-to-learn syntax",
"Machine learning is an important branch of artificial intelligence",
"Databases are used for storing and managing large amounts of data",
"Cloud computing provides elastic and scalable computing resources"
]
vector_store.add_documents([
Document(page_content=text) for text in knowledge_docs
])
# 3. Q&A function
def ask_question(question: str):
# Retrieve relevant documents
relevant_docs = vector_store.similarity_search(question, k=2)
print(f"Question: {question}")
print("Relevant information:")
for i, doc in enumerate(relevant_docs, 1):
print(f" {i}. {doc.page_content}")
# 4. Test Q&A
test_questions = [
"What is Python?",
"What is machine learning?",
"What is cloud computing used for?"
]
for question in test_questions:
ask_question(question)
print()
Run the mini Q&A system:
create_mini_qa_system()
Best Practices Summary
-
Connection Management
- Reuse the
ClickZettaEngine instance
- Use environment variables to manage configuration
- Set appropriate timeout values
-
Vector Store
- Choose an appropriate embedding model
- Use meaningful table names
- Add metadata for easier filtering
-
Performance Optimization
- Batch process large amounts of data
- Use indexes to accelerate queries
- Set appropriate retrieval count (k value)
-
Error Handling
- Use try-catch to handle exceptions
- Log errors
- Provide user-friendly error messages
These basic examples provide you with a starting point for using LangChain Singdata. You can build more complex AI applications based on these examples.