Generic selectors
Exact matches only
Search in title
Search in content
Post Type Selectors
Filter by Categories
About Article
Analyze Data
Archive
Best Practices
Better Outputs
Blog
Code Optimization
Code Quality
Command Line
Daily tips
Dashboard
Data Analysis & Manipulation
Data Engineer
Data Visualization
DataFrame
Delta Lake
DevOps
DuckDB
Environment Management
Feature Engineer
Git
Jupyter Notebook
LLM
LLM Tools
Machine Learning
Machine Learning & AI
Machine Learning Tools
Manage Data
MLOps
Natural Language Processing
NumPy
Pandas
Polars
PySpark
Python Helpers
Python Tips
Python Utilities
Scrape Data
SQL
Testing
Time Series
Tools
Visualization
Visualization & Reporting
Workflow & Automation
Workflow Automation

Writing Safer PySpark Queries with Parameters

Table of Contents

Writing Safer PySpark Queries with Parameters

Table of Contents

Introduction

Writing SQL queries in PySpark often involves string formatting, making your code error-prone, difficult to test, and vulnerable to SQL injection. A safer and more maintainable alternative is to use parameterized SQL queries with PySpark’s spark.sql().

This approach allows direct use of DataFrames and Python values in queries without relying on temporary views or manual type conversions.

Key Takeaways

Here’s what you’ll learn:

  • Eliminate SQL injection vulnerabilities by using parameterized queries instead of string formatting
  • Pass DataFrames directly as parameters without creating temporary views or manual type conversion
  • Build reusable query templates that work across different datasets and environments
  • Simplify unit testing with parameterized functions that separate logic from data
  • Handle complex data types like dates automatically without manual quoting or formatting

💻 Get the Code: The complete source code and Jupyter notebook for this tutorial are available on GitHub. Clone it to follow along!

Setup: Create a Spark Session and Input Data

We’ll begin by creating a Spark session and generating a sample DataFrame using the Pandas-to-Spark conversion method. For other common ways to build DataFrames in PySpark, see this guide on creating PySpark DataFrames.

from datetime import date
import pandas as pd
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

# Create a Spark DataFrame
item_price_pandas = pd.DataFrame(
    {
        "item_id": [1, 2, 3, 4],
        "price": [4, 2, 5, 1],
        "transaction_date": [
            date(2025, 1, 15),
            date(2025, 2, 1),
            date(2025, 3, 10),
            date(2025, 4, 22),
        ],
    }
)

item_price = spark.createDataFrame(item_price_pandas)
item_price.show()

Output

+-------+-----+----------------+
|item_id|price|transaction_date|
+-------+-----+----------------+
|      1|    4|      2025-01-15|
|      2|    2|      2025-02-01|
|      3|    5|      2025-03-10|
|      4|    1|      2025-04-22|
+-------+-----+----------------+

Traditional PySpark Query Approach

The traditional approach uses f-strings to build SQL, which is not ideal because:

  • Security Risk: Interpolated strings can expose your query to SQL injection.
  • Limited Flexibility: F-strings can’t handle Python objects like DataFrames directly, so you have to create temporary views and manually quote values like dates to match SQL syntax.
item_price.createOrReplaceTempView("item_price_view")
transaction_date_str = "2025-02-15"

query_with_fstring = f"""SELECT *
FROM item_price_view
WHERE transaction_date > '{transaction_date_str}'
"""

spark.sql(query_with_fstring).show()

Output

+-------+-----+----------------+
|item_id|price|transaction_date|
+-------+-----+----------------+
|      3|    5|      2025-03-10|
|      4|    1|      2025-04-22|
+-------+-----+----------------+

Parameterized Queries with PySpark Custom String Formatting

PySpark supports parameterized SQL with custom string formatting, separating SQL logic from parameter values. During parsing, it safely handles each value as a typed literal and inserts it into the SQL parse tree, preventing injection attacks and ensuring correct data types.

Query
├── SELECT
│   └── *
├── FROM
│   └── {item_price}
└── WHERE
    └── Condition
        ├── Left: transaction_date
        ├── Operator: >
        └── Right: {transaction_date}

Because it handles each value as a typed literal, it treats the value according to its actual data type, not as raw text, when inserting it into a SQL query, meaning:

  • item_price can be passed directly without creating a temporary view
  • transaction_date does not need to be manually wrapped in single quotes
parametrized_query = """SELECT *
FROM {item_price}
WHERE transaction_date > {transaction_date}
"""

spark.sql(
    parametrized_query, item_price=item_price, transaction_date=transaction_date_str
).show()

Output:

+-------+-----+----------------+
|item_id|price|transaction_date|
+-------+-----+----------------+
|      3|    5|      2025-03-10|
|      4|    1|      2025-04-22|
+-------+-----+----------------+

Parameterized Queries with Parameter Markers

Custom string formatting would treat date(2023, 2, 15) as a mathematical expression rather than a date, which would cause a type mismatch error.

parametrized_query = """SELECT *
FROM {item_price}
WHERE transaction_date > {transaction_date}
"""

spark.sql(parametrized_query, item_price=item_price, transaction_date=transaction_date).show()

Output:

[DATATYPE_MISMATCH.BINARY_OP_DIFF_TYPES] Cannot resolve "(transaction_date > ((2023 - 2) - 15))" due to data type mismatch

Parameter markers preserve type information, so date objects are passed as proper SQL DATE literals. This allows you to safely use Python dates without formatting or quoting them manually.

query_with_markers = """SELECT *
FROM {item_price}
WHERE transaction_date > :transaction_date
"""

transaction_date = date(2025, 2, 15)

spark.sql(
    query_with_markers,
    item_price=item_price,
    args={"transaction_date": transaction_date},
).show()

Make PySpark SQL Easier to Reuse

Parameterized SQL templates are easier to reuse across your codebase. Instead of copying and pasting full SQL strings with values hardcoded inside, you can define flexible query templates that accept different input variables.

Here’s a reusable query to filter using different transaction dates:

transaction_date_1 = date(2025, 3, 9)

spark.sql(
    query_with_markers,
    item_price=item_price,
    args={"transaction_date": transaction_date_1},
).show()

Output:

+-------+-----+----------------+
|item_id|price|transaction_date|
+-------+-----+----------------+
|      3|    5|      2025-03-10|
|      4|    1|      2025-04-22|
+-------+-----+----------------+

You can easily change the filter with a different date:

transaction_date_2 = date(2025, 3, 15)

spark.sql(
    query_with_markers,
    item_price=item_price,
    args={"transaction_date": transaction_date_2},
).show()

Output:

+-------+-----+----------------+
|item_id|price|transaction_date|
+-------+-----+----------------+
|      4|    1|      2025-04-22|
+-------+-----+----------------+

Easier Unit Testing with PySpark Parameterized Queries

Parameterization also simplifies testing by letting you pass different inputs into a reusable query string.

For example, in the code below, we define a function that takes a DataFrame and a threshold value, then filters rows using a parameterized query.

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

def filter_by_price_threshold(df, amount):
    return spark.sql(
        "SELECT * from {df} where price > :amount", df=df, args={"amount": amount}
    )

Because the values are passed separately from the SQL logic, we can easily reuse and test this function with different parameters without rewriting the query itself.

def test_query_return_correct_number_of_rows():
    # Create test input DataFrame
    df = spark.createDataFrame(
        [
            ("Product 1", 10.0, 5),
            ("Product 2", 15.0, 3),
            ("Product 3", 8.0, 2),
        ],
        ["name", "price", "quantity"],
    )

    # Execute query with parameters
    assert filter_by_price_threshold(df, 10).count() == 1
    assert filter_by_price_threshold(df, 8).count() == 2

For more tips on validating DataFrame outputs effectively, see best practices for PySpark DataFrame comparison and testing.

Summary: Benefits of Using Parameterized Queries in PySpark

Using parameterized queries in PySpark offers several advantages:

  • Security: Prevents SQL injection.
  • Simplicity: Avoids temporary views and quoting hassles.
  • Testability: Supports reusable, testable query templates.
  • Readability: Makes queries cleaner and easier to understand.

Adopting this technique leads to more robust and maintainable Spark-based data pipelines.

4 thoughts on “Writing Safer PySpark Queries with Parameters”

  1. russell winterbotham

    could I use these templates to build a chatbot to answer questions about data in my database?

Leave a Comment

Your email address will not be published. Required fields are marked *

0
    0
    Your Cart
    Your cart is empty
    Scroll to Top

    Work with Khuyen Tran

    Work with Khuyen Tran