Generic selectors
Exact matches only
Search in title
Search in content
Post Type Selectors
Filter by Categories
About Article
Analyze Data
Archive
Best Practices
Better Outputs
Blog
Code Optimization
Code Quality
Command Line
Daily tips
Dashboard
Data Analysis & Manipulation
Data Engineer
Data Visualization
DataFrame
Delta Lake
DevOps
DuckDB
Environment Management
Feature Engineer
Git
Jupyter Notebook
LLM
LLM
Machine Learning
Machine Learning
Machine Learning & AI
Manage Data
MLOps
Natural Language Processing
NumPy
Pandas
Polars
PySpark
Python Tips
Python Utilities
Python Utilities
Scrape Data
SQL
Testing
Time Series
Tools
Visualization
Visualization & Reporting
Workflow & Automation
Workflow Automation

SQL

Stop Writing SQL for AI Agents: Build Direct Database Access with FastMCP

Table of Contents

The Copy-Paste Database Problem
What is Model Context Protocol (MCP)?
What is FastMCP?
Installation and Database Setup

Installation

Database Setup: Sample E-commerce Data
Building Your First Database Tools

Connect to the Database
List Tables
Flexible Query Execution Tool
Data Export Tool

Schema Discovery with Resources

Table Schema Resource with URI Patterns
Sample Data Resource
Table Statistics Resource

Connect to MCP Clients

Create the Server File
Install Dependencies

Connect with MCP Clients

Connect to Claude Code
Connect to Claude Desktop
Connect to other MCP clients

Test the Server
Conclusion

The Copy-Paste Database Problem
AI has revolutionized how we write code, making complex database queries accessible to anyone who can describe what they need. But the workflow becomes frustrating when you’re trapped in this repetitive cycle: ask your AI for SQL, copy the query, paste it into your database tool, run it, copy the results, and paste them back to your AI for analysis.
Here’s what this workflow looks like in practice:
# 1. Ask AI for SQL
"Can you write a query to find high-value customers?"

# 2. Copy AI's response
SELECT customer_id, total_spent FROM customers WHERE total_spent > 1000

# 3. Paste into database tool, run query, copy results
# 4. Paste results back to AI for analysis

This manual approach creates several challenges:

Switching between AI and database tools breaks your analytical flow
Copying and pasting introduces transcription errors
AI can’t explore data independently or learn from previous queries

What if your AI could connect directly to your database, run queries autonomously, and provide insights without you ever copying and pasting SQL? That’s exactly what you’ll build in this article using MCP.
What is Model Context Protocol (MCP)?
Model Context Protocol (MCP) is a standard that allows AI models to connect directly to external systems like databases, APIs, and file systems. Instead of being limited to generating code or text, MCP enables AI models to take actions and access real data.
Think of MCP as a bridge between your AI assistant and your database. When you ask AI to “find high-value customers,” the assistant discovers your database tools, calls the appropriate functions, and provides actionable insights—all without you leaving the conversation.

MCP works through two key components:

Tools: Functions that AI models can call to perform actions and modify system state (like executing SQL queries, inserting data, exporting files)
Resources: Data sources that AI models can access for information only (like table schemas, sample data, or documentation)

This direct connection eliminates the copy-paste cycle and enables truly autonomous data analysis.
What is FastMCP?
FastMCP is a Python framework that makes building MCP servers incredibly simple. While you could build MCP servers from scratch, FastMCP provides decorators and utilities that reduce the complexity to just a few lines of code.
In this article, you’ll build a database assistant that:

Connects to SQLite databases and executes queries automatically
Exports results to CSV files with full schema discovery
Provides sample data and calculates database statistics

This complete solution transforms any AI assistant into a powerful database analytics tool. For large-scale analytics workloads, consider our DuckDB deep dive which offers superior performance for analytical queries.

Installation and Database Setup
Installation
To install fastmcp, type the following command:
pip install fastmcp

Other dependencies you’ll need for this article are:
pip install sqlalchemy pandas

SQLAlchemy is the Python SQL toolkit and Object-Relational Mapping (ORM) library we’ll use for database operations.
Database Setup: Sample E-commerce Data
Before building FastMCP tools, let’s create a realistic e-commerce database that we’ll use throughout all examples. The complete database setup code is available in setup_database.py.
from setup_database import create_sample_database

# Create the database we'll use throughout the article
db_path = create_sample_database("ecommerce.db")

Building Your First Database Tools
In this section, you’ll create FastMCP tools that:

Connect to databases and manage connections
Discover tables and database structure
Execute queries with proper transaction handling
Export results to CSV files

Connect to the Database
Start with creating a tool called connect_db to connect to the SQLite database. To create a tool, you need to:

Initialize FastMCP server
Write a Python function with your tool logic
Add the @mcp.tool decorator

from fastmcp import FastMCP
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker
import pandas as pd

# Global database connection
db_engine = None
db_session_factory = None

# Initialize FastMCP server
mcp = FastMCP("Database Analytics Assistant")

@mcp.tool
def connect_db(database_path: str) -> dict:
"""Connect to an SQLite database file"""
global db_engine, db_session_factory

# Create new engine and session factory
db_engine = create_engine(f"sqlite:///{database_path}")
db_session_factory = sessionmaker(bind=db_engine)

return {"success": True, "database_path": database_path}

FastMCP tools return JSON responses that AI assistants convert into user-friendly text. This structured approach ensures reliable data exchange between your database and AI systems. For type-safe AI responses, check out our PydanticAI guide.
Here’s what you’ll see when calling the connect_db tool with the ecommerce.db path:
JSON output:
{
"success": true,
"database_path": "ecommerce.db"
}

AI-generated summary:
Connected to the database at ecommerce.db

List Tables
The list_tables tool uses SQLAlchemy’s inspector to discover all tables in the connected database. This gives the AI a complete view of your database structure.
@mcp.tool
def list_tables() -> dict:
"""List all tables in the connected database"""
global db_engine
from sqlalchemy import inspect

inspector = inspect(db_engine)
table_names = inspector.get_table_names()

return {"success": True, "tables": table_names}

Here’s the response you’ll get when calling the list_tables tool:
Human input:
Show me all tables in the ecommerce database

AI converts this to tool call: list_tables()
JSON output:
{
"success": true,
"tables": ["users", "orders"]
}

AI-generated summary:
The database has two tables: users and orders.

Flexible Query Execution Tool
The execute_query tool executes only SELECT queries on the connected database for security. This read-only approach prevents accidental data modification or deletion while allowing powerful data analysis and exploration.
Start by creating a helper function to check if the query is safe.
def _is_safe_query(sql: str) -> bool:
"""Check if a SQL query is safe to execute. Only SELECT queries are allowed."""
sql_lower = sql.lower().strip()
return sql_lower.startswith("select")

Then, create the execute_query tool.
@mcp.tool
def execute_query(sql: str, ctx: Context = None) -> Dict[str, Any]:
"""Execute a SQL query on the connected database."""
global db_session_factory

# Check if query is safe before execution
if not _is_safe_query(sql):
return {
"success": False,
"error": "Potentially dangerous SQL operations are not allowed. Only SELECT queries are permitted."
}

with db_session_factory() as session:
# Execute the SQL query
result = session.execute(text(sql))
rows = result.fetchall()
return {"success": True, "results": [dict(row._mapping) for row in rows]}

Here’s the response you’ll get when calling the execute_query tool with the SQL query:
Human input:
Show me statistics for the users table

AI converts this to:
SELECT
COUNT(*) as total_users,
AVG(age) as average_age,
MIN(age) as youngest_age,
MAX(age) as oldest_age,
COUNT(CASE WHEN age < 30 THEN 1 END) as users_under_30,
COUNT(CASE WHEN age >= 30 THEN 1 END) as users_30_and_over
FROM users

JSON output:
{
"success": true,
"results": [
{
"total_users": 5,
"average_age": 32.0,
"youngest_age": 22,
"oldest_age": 45,
"users_under_30": 2,
"users_30_and_over": 3
}
]
}

AI-generated summary:
Users table statistics:

Metric
Value

Total Users
5

Average Age
32.0

Youngest User
22

Oldest User
45

Users Under 30
2

Users 30 and Over
3

Data Export Tool
The export_to_csv tool executes a SQL query and exports the results to a CSV file. This is useful for creating reports or sharing data with others.
import pandas as pd

@mcp.tool
def export_to_csv(sql: str, filename: str) -> dict:
"""Execute a SQL query and export results to CSV file"""
global db_engine

# Execute query and export to CSV using pandas
df = pd.read_sql(sql, db_engine)
df.to_csv(filename, index=False)

return {"success": True, "filename": filename, "rows_exported": len(df)}

Here’s the response you’ll get when calling the export_to_csv tool:
Human input:
Export product sales data to a CSV file called sales_report.csv

AI converts this to:
SELECT product_name, SUM(quantity) as total_sold, SUM(price * quantity) as total_revenue
FROM orders
GROUP BY product_name
ORDER BY total_revenue DESC

JSON output:
{
"success": true,
"filename": "sales_report.csv",
"rows_exported": 7
}

AI-generated summary:
Successfully exported 7 rows of product sales data to sales_report.csv

Schema Discovery with Resources
Tools require complex security validation and can fail due to database errors or malformed queries. Resources eliminate these risks by providing read-only access to metadata like schemas and sample data without query execution overhead.
In this section, you’ll create FastMCP resources that:

Provide schema information for tables
Provide sample data for tables
Provide statistics for tables

Table Schema Resource with URI Patterns
The get_table_schema resource uses URI patterns to provide flexible access to table schemas. The AI can request schema information for any table by using the URI pattern schema://tables/{table_name}.
from sqlalchemy import inspect

@mcp.resource("schema://tables/{table_name}")
def get_table_schema(table_name: str) -> dict:
"""Get column information for a specific table"""
global db_engine

# Get database inspector
inspector = inspect(db_engine)

# Get column information
columns = inspector.get_columns(table_name)

# Build column info list
column_info = []
for col in columns:
column_info.append({
"name": col["name"],
"type": str(col["type"]),
"nullable": col["nullable"],
})

return {"table_name": table_name, "columns": column_info}

Here’s the response you’ll get when calling the get_table_schema resource:
Human input:
Show me the schema for the users table

AI accesses the resource using URI: schema://tables/users
JSON output:
{
"table_name": "users",
"columns": [
{
"name": "id",
"type": "INTEGER",
"nullable": false
},
{
"name": "name",
"type": "TEXT",
"nullable": false
},
{
"name": "email",
"type": "TEXT",
"nullable": false
},
{
"name": "age",
"type": "INTEGER",
"nullable": true
},
{
"name": "created_at",
"type": "TIMESTAMP",
"nullable": true
}
]
}

AI-generated summary:
The users table schema:

Column
Type
Required

id
INTEGER
Yes

name
TEXT
Yes

email
TEXT
Yes

age
INTEGER
No

created_at
TIMESTAMP
No

Sample Data Resource
The get_table_data resource provides sample data with pagination support. This helps the AI understand the actual data structure and content without overwhelming it with large datasets.
@mcp.resource("data://tables/{table_name}")
def get_table_data(table_name: str, limit: int = 10, offset: int = 0) -> dict:
"""Get sample rows from a specific table with pagination"""
global db_session_factory

with db_session_factory() as session:
# Get sample data with pagination
result = session.execute(
text(f"SELECT * FROM {table_name} LIMIT :limit OFFSET :offset"),
{"limit": limit, "offset": offset},
)
rows = result.fetchall()

# Convert to dict
data = [dict(row._mapping) for row in rows]

return {"table_name": table_name, "sample_data": data, "rows_returned": len(data)}

Here’s the response you’ll get when calling the get_table_data resource:
Human input:
Show me sample data from the users table

AI accesses the resource using URI: data://tables/users
JSON output:
{
"table_name": "users",
"sample_data": [
{
"id": 1,
"name": "Alice Johnson",
"email": "alice@example.com",
"age": 28,
"created_at": "2023-01-15 10:30:00"
},
{
"id": 2,
"name": "Bob Smith",
"email": "bob@example.com",
"age": 35,
"created_at": "2023-02-20 14:15:00"
},
{
"id": 3,
"name": "Charlie Brown",
"email": "charlie@example.com",
"age": 22,
"created_at": "2023-03-10 09:45:00"
}
],
"rows_returned": 3
}

AI-generated summary:
Sample data from the users table:

User ID
Name
Email
Age
Created

1
Alice Johnson
alice@example.com
28
2023-01-15

2
Bob Smith
bob@example.com
35
2023-02-20

3
Charlie Brown
charlie@example.com
22
2023-03-10

Table Statistics Resource
The get_table_stats resource provides comprehensive statistics for a specific table. This helps the AI understand the size and composition of the table.
@mcp.resource("stats://tables/{table_name}")
def get_table_stats(table_name: str) -> dict:
"""Get comprehensive statistics for a specific table"""
global db_engine, db_session_factory

with db_session_factory() as session:
# Get basic table statistics
total_rows = session.execute(
text(f"SELECT COUNT(*) FROM {table_name}")
).scalar()

# Get column information
inspector = inspect(db_engine)
columns = inspector.get_columns(table_name)

return {
"table_name": table_name,
"total_rows": total_rows,
"column_count": len(columns),
}

Here’s the response you’ll get when calling the get_table_stats resource:
Human input:
Show me statistics for the users table

AI accesses the resource using URI: stats://tables/users
JSON output:
{
"table_name": "users",
"total_rows": 5,
"column_count": 5
}

AI-generated summary:
The users table contains 5 rows and has 5 columns (id, name, email, age, created_at).
Connect to MCP Clients
Now that you’ve built all the tools and resources, let’s deploy your FastMCP database server and connect it to an MCP client.
An MCP client is any application that can communicate with MCP servers to access tools and resources, such as Claude Code, Claude Desktop, Cursor, Zed, Continue, and other MCP-compatible development tools.
Create the Server File
First, combine all the code from this article into a single file called database_mcp_server.py. You can find the complete implementation in the example repository.
Install Dependencies
Install UV (the fast Python package manager) if you haven’t already:
curl -LsSf https://astral.sh/uv/install.sh | sh

Use uv (the fast Python package manager) to install all dependencies:
uv sync

Connect with MCP Clients
This FastMCP server works with any MCP-compatible client. Here’s how to connect it to Claude Code and other MCP clients.
Connect to Claude Code
Add your server to Claude Code’s MCP configuration:
claude mcp add database-analytics — uv run database_mcp_server.py

Verify the server is registered:
claude mcp list

Connect to Claude Desktop
Add this configuration to your Claude Desktop config file:
{
"mcpServers": {
"database-analytics": {
"command": "uv",
"args": ["run", "database_mcp_server.py"],
"cwd": "/path/to/your/project"
}
}
}

Connect to other MCP clients
Popular MCP-compatible clients include Cursor, Zed, Continue, and Codeium. Each client has its own configuration format but uses the same server command: uv run database_mcp_server.py. For client-specific setup guides, visit the Model Context Protocol documentation.
Test the Server
Once your FastMCP server is running and connected to an MCP client, you can test all the database functionality through natural language commands. Here are practical examples to verify everything works:
Connect to the database:
Connect to my SQLite database at ./ecommerce.db

Output:
✅ Connected to the database at ./ecommerce.db

Explore the schema:
What tables are available in this database?

Output:
The database has 2 tables: users and orders.

Examine the table structure:
Show me the schema for the users table

Output:
The users table has 5 columns: id (INTEGER, required), name (TEXT, required), email (TEXT, required), age (INTEGER, optional), and created_at (TIMESTAMP, optional).

Preview the data:
Show me some sample data from the users table

Output:
Here are 3 sample users:

– Alice Johnson (alice@example.com, age 28)
– Bob Smith (bob@example.com, age 35)
– Charlie Brown (charlie@example.com, age 22)

Run analytics queries:
Calculate total sales by product category

Output:
Product sales summary:

– Laptop: $999.99 (1 sold)
– Tablet: $499.99 (1 sold)
– Monitor: $299.99 (1 sold)
– Headphones: $149.99 (1 sold)

Export the results:
Export the query "SELECT product_name, SUM(quantity) as total_sold FROM orders GROUP BY product_name" to CSV file called sales_report.csv

Output:
✅ Successfully exported 7 rows of product sales data to sales_report.csv

Get table statistics:
Show me statistics for the users table

Output:
Users table statistics: 5 total rows, 5 columns

Conclusion
You’ve successfully built a complete FastMCP database server that transforms how AI assistants interact with databases. Your server delivers:

Direct database connectivity for AI assistants
Flexible query execution with proper transaction handling
Automated data export to CSV files
Comprehensive schema discovery and data exploration
Real-time database statistics and analysis

The best way to learn is to build. Try building your own FastMCP server with different database types, tools, and resources. Here are some ideas to get you started:

Extend the server with additional database operations (INSERT, UPDATE, DELETE)
Add support for multiple database types (PostgreSQL, MySQL, etc.)
Implement advanced analytics tools for statistical analysis such as data visualization, clustering, and anomaly detection
Coordinate multiple AI agents for complex data workflows using LangGraph

Favorite

Stop Writing SQL for AI Agents: Build Direct Database Access with FastMCP Read More »

Writing Safer PySpark Queries with Parameters

Table of Contents

Setup: Create a Spark Session and Input Data
Traditional PySpark Query Approach
Parameterized Queries with PySpark Custom String Formatting
Parameterized Queries with Parameter Markers
Make PySpark SQL Easier to Reuse
Easier Unit Testing with PySpark Parameterized Queries
Summary: Benefits of Using Parameterized Queries in PySpark

Writing SQL queries in PySpark often involves string formatting, making your code error-prone, difficult to test, and vulnerable to SQL injection. A safer and more maintainable alternative is to use parameterized SQL queries with PySpark’s spark.sql().
This approach allows direct use of DataFrames and Python values in queries without relying on temporary views or manual type conversions.
In this article, you’ll learn how to safely write and reuse SQL queries in PySpark using parameterization. We’ll cover both PySpark’s custom string formatting style and support for named parameter markers, along with examples for reusable logic and unit testing.
The source code of this article can be found here:
Source Code

Setup: Create a Spark Session and Input Data
We’ll begin by creating a Spark session and generating a sample DataFrame using the Pandas-to-Spark conversion method. For other common ways to build DataFrames in PySpark, see this guide on creating PySpark DataFrames.
#| eval: false
from datetime import date
import pandas as pd
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

# Create a Spark DataFrame
item_price_pandas = pd.DataFrame(
{
"item_id": [1, 2, 3, 4],
"price": [4, 2, 5, 1],
"transaction_date": [
date(2025, 1, 15),
date(2025, 2, 1),
date(2025, 3, 10),
date(2025, 4, 22),
],
}
)

item_price = spark.createDataFrame(item_price_pandas)
item_price.show()

Output
+——-+—–+—————-+
|item_id|price|transaction_date|
+——-+—–+—————-+
| 1| 4| 2025-01-15|
| 2| 2| 2025-02-01|
| 3| 5| 2025-03-10|
| 4| 1| 2025-04-22|
+——-+—–+—————-+

Traditional PySpark Query Approach
The traditional approach uses f-strings to build SQL, which is not ideal because:

Security Risk: Interpolated strings can expose your query to SQL injection.
Limited Flexibility: F-strings can’t handle Python objects like DataFrames directly, so you have to create temporary views and manually quote values like dates to match SQL syntax.

#| eval: false
item_price.createOrReplaceTempView("item_price_view")
transaction_date_str = "2025-02-15"

query_with_fstring = f"""SELECT *
FROM item_price_view
WHERE transaction_date > '{transaction_date_str}'
"""

spark.sql(query_with_fstring).show()

Output
+——-+—–+—————-+
|item_id|price|transaction_date|
+——-+—–+—————-+
| 3| 5| 2025-03-10|
| 4| 1| 2025-04-22|
+——-+—–+—————-+

Parameterized Queries with PySpark Custom String Formatting
PySpark supports parameterized SQL with custom string formatting, separating SQL logic from parameter values. During parsing, it safely handles each value as a typed literal and inserts it into the SQL parse tree, preventing injection attacks and ensuring correct data types.
Query
├── SELECT
│ └── *
├── FROM
│ └── {item_price}
└── WHERE
└── Condition
├── Left: transaction_date
├── Operator: >
└── Right: {transaction_date}

Because it handles each value as a typed literal, it treats the value according to its actual data type, not as raw text, when inserting it into a SQL query, meaning:

item_price can be passed directly without creating a temporary view
transaction_date does not need to be manually wrapped in single quotes

#| eval: false
parametrized_query = """SELECT *
FROM {item_price}
WHERE transaction_date > {transaction_date}
"""

spark.sql(
parametrized_query, item_price=item_price, transaction_date=transaction_date_str
).show()

Output:
+——-+—–+—————-+
|item_id|price|transaction_date|
+——-+—–+—————-+
| 3| 5| 2025-03-10|
| 4| 1| 2025-04-22|
+——-+—–+—————-+

Parameterized Queries with Parameter Markers
Custom string formatting would treat date(2023, 2, 15) as a mathematical expression rather than a date, which would cause a type mismatch error.
#| eval: false
parametrized_query = """SELECT *
FROM {item_price}
WHERE transaction_date > {transaction_date}
"""

spark.sql(parametrized_query, item_price=item_price, transaction_date=transaction_date).show()

Output:
[DATATYPE_MISMATCH.BINARY_OP_DIFF_TYPES] Cannot resolve "(transaction_date > ((2023 – 2) – 15))" due to data type mismatch

Parameter markers preserve type information, so date objects are passed as proper SQL DATE literals. This allows you to safely use Python dates without formatting or quoting them manually.
#| eval: false
query_with_markers = """SELECT *
FROM {item_price}
WHERE transaction_date > :transaction_date
"""

transaction_date = date(2025, 2, 15)

spark.sql(
query_with_markers,
item_price=item_price,
args={"transaction_date": transaction_date},
).show()

Make PySpark SQL Easier to Reuse
Parameterized SQL templates are easier to reuse across your codebase. Instead of copying and pasting full SQL strings with values hardcoded inside, you can define flexible query templates that accept different input variables.
Here’s a reusable query to filter using different transaction dates:
#| eval: false
transaction_date_1 = date(2025, 3, 9)

spark.sql(
query_with_markers,
item_price=item_price,
args={"transaction_date": transaction_date_1},
).show()

Output:
+——-+—–+—————-+
|item_id|price|transaction_date|
+——-+—–+—————-+
| 3| 5| 2025-03-10|
| 4| 1| 2025-04-22|
+——-+—–+—————-+

You can easily change the filter with a different date:
#| eval: false
transaction_date_2 = date(2025, 3, 15)

spark.sql(
query_with_markers,
item_price=item_price,
args={"transaction_date": transaction_date_2},
).show()

Output:
+——-+—–+—————-+
|item_id|price|transaction_date|
+——-+—–+—————-+
| 4| 1| 2025-04-22|
+——-+—–+—————-+

Easier Unit Testing with PySpark Parameterized Queries
Parameterization also simplifies testing by letting you pass different inputs into a reusable query string.
For example, in the code below, we define a function that takes a DataFrame and a threshold value, then filters rows using a parameterized query.
#| eval: false
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

def filter_by_price_threshold(df, amount):
return spark.sql(
"SELECT * from {df} where price > :amount", df=df, args={"amount": amount}
)

Because the values are passed separately from the SQL logic, we can easily reuse and test this function with different parameters without rewriting the query itself.
#| eval: false
def test_query_return_correct_number_of_rows():
# Create test input DataFrame
df = spark.createDataFrame(
[
("Product 1", 10.0, 5),
("Product 2", 15.0, 3),
("Product 3", 8.0, 2),
],
["name", "price", "quantity"],
)

# Execute query with parameters
assert filter_by_price_threshold(df, 10).count() == 1
assert filter_by_price_threshold(df, 8).count() == 2

For more tips on validating DataFrame outputs effectively, see best practices for PySpark DataFrame comparison and testing.
Summary: Benefits of Using Parameterized Queries in PySpark
Using parameterized queries in PySpark offers several advantages:

Security: Prevents SQL injection.
Simplicity: Avoids temporary views and quoting hassles.
Testability: Supports reusable, testable query templates.
Readability: Makes queries cleaner and easier to understand.

Adopting this technique leads to more robust and maintainable Spark-based data pipelines.
Favorite

Writing Safer PySpark Queries with Parameters Read More »

Simplify SQL Parsing and Transpilation with SQLGlot

Motivation

Writing and maintaining SQL queries across different database systems can be challenging. Each system has its own dialect, syntax, and quirks, making it difficult to ensure compatibility and consistency when working with multiple databases.

For example, consider a scenario where you need to convert a SQL query written in one dialect (e.g., MySQL) to another (e.g., Spark SQL):

SELECT IFNULL(employee_name, 'Unknown') AS employee_status
FROM employees;

This query uses the MySQL-specific IFNULL function to replace null values in column_name with 'default_value'. However, Spark SQL uses the COALESCE function for this operation. Converting such queries manually can be tedious and prone to errors, especially when dealing with large-scale migrations.

Introduction to SQLGlot

SQLGlot is a Python-based SQL parser, transpiler, optimizer, and execution engine. It supports translating between 24 different SQL dialects, including MySQL, Spark SQL, Presto, Snowflake, and BigQuery.

To install SQLGlot, use the following command:

pip install sqlglot

In this post, we will demonstrate how SQLGlot can simplify the process of converting SQL queries between dialects, specifically from MySQL to Spark SQL.

SQL Parsing and Transpilation with SQLGlot

SQLGlot makes it easy to transpile SQL queries between different dialects. For example, consider the following MySQL query:

mysql_query = """
SELECT IFNULL(employee_name, 'Unknown') AS employee_status
FROM employees;
"""

This query uses the MySQL-specific IFNULL function. To convert it to Spark SQL, you can use SQLGlot as follows:

import sqlglot

# Transpile the MySQL query to Spark SQL dialect
spark_sql_query = sqlglot.transpile(mysql_query, read="mysql", write="spark")[0]

print(spark_sql_query)

Output:

SELECT COALESCE(employee_name, 'Unknown') AS employee_status
FROM employees;

Here, SQLGlot automatically converts the MySQL IFNULL function to Spark SQL’s COALESCE function. This ensures compatibility and saves time when migrating queries between MySQL and Spark SQL.

Conclusion

SQLGlot is a versatile tool for SQL parsing and transpilation, making it easier to adapt queries for different database systems. By automating the conversion process, SQLGlot reduces the risk of errors and accelerates database migrations.

Link to SQLGlot.
Favorite

Simplify SQL Parsing and Transpilation with SQLGlot Read More »

Use PySpark UDFs to Make SQL Logic Reusable

Motivation

Complex SQL queries often involve repetitive calculations and nested subqueries that make code maintenance difficult and prone to errors. When dealing with large-scale data processing, data engineers frequently need to rewrite the same logic multiple times within their queries.

Consider a scenario where you need to repeat complex CASE statements across different queries:

customers_df = spark.createDataFrame([
(1, "John", 25, 60000),
(2, "Jane", 17, 0),
(3, "Bob", 68, 45000)
], ["customer_id", "name", "age", "income"])

# Register the DataFrame as a temporary table
customers_df.createOrReplaceTempView("customers")

# Duplicated CASE logic across queries
query1 = spark.sql("""
SELECT customer_id,
CASE
WHEN age < 18 THEN 'minor'
WHEN age > 65 THEN 'senior'
WHEN income > 50000 THEN 'prime'
ELSE 'standard'
END as segment
FROM customers
""")

query2 = spark.sql("""
SELECT CASE
WHEN age < 18 THEN 'minor'
WHEN age > 65 THEN 'senior'
WHEN income > 50000 THEN 'prime'
ELSE 'standard'
END as segment,
COUNT(*) as count
FROM customers
GROUP BY CASE
WHEN age < 18 THEN 'minor'
WHEN age > 65 THEN 'senior'
WHEN income > 50000 THEN 'prime'
ELSE 'standard'
END
""")

query1.show()

Output:

+———–+—————-+
|customer_id|customer_segment|
+———–+—————-+
| 1| prime|
| 2| minor|
| 3| senior|
+———–+—————-+

query2.show()

Output:

+——-+—–+
|segment|count|
+——-+—–+
| prime| 1|
| minor| 1|
| senior| 1|
+——-+—–+

Introduction to PySpark

PySpark is Apache Spark’s Python API that enables you to write reusable Python functions for use in SQL queries. Install PySpark:

pip install pyspark[sql]

Reducing Duplication with UDFs

Instead of repeating complex CASE statements, create a single UDF:

from pyspark.sql.types import StringType

# Define the segmentation logic once
def segment_customers(age, income):
if age is None or income is None:
return None
if age < 18:
return "minor"
elif age > 65:
return "senior"
elif income > 50000:
return "prime"
return "standard"

# Register UDF with explicit return type
spark.udf.register("segment_customers", segment_customers, StringType())

Now you can reuse this logic across multiple queries:

# Query 1: Simple segmentation
query1 = spark.sql("""
SELECT
customer_id,
segment_customers(age, income) AS segment
FROM customers
""")

# Query 2: Segment counts
query2 = spark.sql("""
SELECT
segment_customers(age, income) AS segment,
COUNT(*) as count
FROM customers
GROUP BY segment_customers(age, income)
""")

Conclusion

PySpark UDFs provide a powerful way to reduce code duplication and maintain consistency in complex SQL queries. By centralizing business logic in well-documented, reusable functions, you can write clearer, more maintainable code while ensuring consistent implementation across your entire application.

Link to PySpark
Favorite

Use PySpark UDFs to Make SQL Logic Reusable Read More »

DuckDB: Simplify DataFrame Analysis with Serverless SQL

Using SQL with pandas empowers data scientists to leverage SQL’s powerful querying capabilities alongside the data manipulation functionalities of pandas.

However, traditional database systems often demand the management of a separate DBMS server, introducing additional complexity to the workflow.

With DuckDB, you can efficiently run SQL operations on pandas DataFrames without the need to manage a separate DBMS server.

DuckDB: Simplify DataFrame Analysis with Serverless SQL Read More »

SQLPage: Generate Web UIs From SQL Queries

Motivation

Data scientists and analysts often need to build web interfaces to share their data and analyses, but most lack web development expertise. They end up either creating static reports or relying on web developers to build interactive dashboards.

Example:

# Traditional way: Need to learn Flask/FastAPI, HTML, CSS, JavaScript
from flask import Flask, render_template
import pandas as pd

app = Flask(__name__)

@app.route('/')
def show_websites():
# Need to handle database connection
# Need to create HTML templates
# Need to style the output
websites_df = pd.read_sql("""
SELECT name, url, type, description
FROM website
""", db_connection)
return render_template('websites.html', websites=websites_df.to_dict('records'))

Introduction to SQLPage

SQLPage is a tool that automatically builds web interfaces directly from SQL queries. It eliminates the need to write any traditional web programming code by providing pre-built components that render your data.

SQL-Based Web Interface

SQLPage solves the web interface problem by letting you:

Create web pages using only SQL queries

Automatically generate styled components

Handle interactive elements without JavaScript

Create a file named dashboard.sql:

— Create a list component with a title
SELECT
'list' as component,
'Popular websites' as title;

— Query data and specify how to display each item
SELECT
name as title,
url as link,
CASE type
WHEN 1 THEN 'blue'
ELSE 'red'
END as color,
description,
icon,
active
FROM website;

The SQL code above will generate:

A styled list component with a header

Each website is displayed as a list item with:

The name as a clickable title

Color-coded items based on type

Description and icon for each website

Active state indication

Conclusion

SQLPage enables data professionals to create interactive web interfaces using only SQL, making it easier to share data insights with stakeholders. Instead of learning multiple web technologies, analysts can leverage their SQL skills to build professional-looking web applications.

Link to SQLPage
Favorite

SQLPage: Generate Web UIs From SQL Queries Read More »

Simplify CSV Data Management with DuckDB

The Traditional Way

Traditional database systems require a predefined table schema and a subsequent data import process when working with CSV data. This can be a tedious and time-consuming process.

To demonstrate this, let’s create a CSV file called customer.csv.

import pandas as pd

# Create a sample dataframe
data = {
"name": ["Alice", "Bob", "Charlie", "David", "Eve"],
"age": [25, 32, 45, 19, 38],
"city": ["New York", "London", "Paris", "Berlin", "Tokyo"],
}

df = pd.DataFrame(data)

# Save the dataframe as a CSV file
df.to_csv("customers.csv", index=False)

To load this CSV file in Postgres, you need to run the following query:

— Create the table
CREATE TABLE customers (
name VARCHAR(100),
age INT,
city VARCHAR(100)
);

— Load data from CSV
COPY customers
FROM 'customers.csv'
DELIMITER ','
CSV HEADER;

The DuckDB Way

In contrast, DuckDB allows for direct reading of CSV files from disk, eliminating the need for explicit table creation and data loading.

import duckdb

duckdb.sql("SELECT * FROM 'customers.csv'")

┌─────────┬───────┬──────────┐
│ name │ age │ city │
│ varchar │ int64 │ varchar │
├─────────┼───────┼──────────┤
│ Alice │ 25 │ New York │
│ Bob │ 32 │ London │
│ Charlie │ 45 │ Paris │
│ David │ 19 │ Berlin │
│ Eve │ 38 │ Tokyo │
└─────────┴───────┴──────────┘

By using DuckDB, you can simplify your data import process and focus on analyzing your data.

Installation

To use DuckDB, you can install it using pip:

pip install duckdb

Link to DuckDB.
Favorite

Simplify CSV Data Management with DuckDB Read More »

DuckDB + PyArrow: 2900x Faster Than pandas for Large Dataset Processing

DuckDB optimizes query execution with multiple optimizations, while PyArrow efficiently manages in-memory data processing and storage. Combining DuckDB and PyArrow allows you to efficiently process datasets larger than memory on a single machine.

In the code above, we convert a Delta Lake table with over 6 million rows to a pandas DataFrame and a PyArrow dataset, which are then used by DuckDB.

Running DuckDB on a PyArrow dataset is approximately 2906 times faster than running DuckDB on a pandas DataFrame.

DuckDB + PyArrow: 2900x Faster Than pandas for Large Dataset Processing Read More »

0
    0
    Your Cart
    Your cart is empty
    Scroll to Top

    Work with Khuyen Tran

    Work with Khuyen Tran