Writing SQL queries in PySpark often involves string formatting, making your code error-prone, difficult to test, and vulnerable to SQL injection. A safer and more maintainable alternative is to use parameterized SQL queries with PySpark’s spark.sql()
.
This approach allows direct use of DataFrames and Python values in queries without relying on temporary views or manual type conversions.
In this article, you’ll learn how to safely write and reuse SQL queries in PySpark using parameterization. We’ll cover both PySpark’s custom string formatting style and support for named parameter markers, along with examples for reusable logic and unit testing.
The source code of this article can be found here:
Setup: Create a Spark Session and Input Data
We’ll begin by creating a Spark session and generating a sample DataFrame using the Pandas-to-Spark conversion method. For other common ways to build DataFrames in PySpark, see this guide on creating PySpark DataFrames.
from datetime import date
import pandas as pd
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
# Create a Spark DataFrame
item_price_pandas = pd.DataFrame(
{
"item_id": [1, 2, 3, 4],
"price": [4, 2, 5, 1],
"transaction_date": [
date(2025, 1, 15),
date(2025, 2, 1),
date(2025, 3, 10),
date(2025, 4, 22),
],
}
)
item_price = spark.createDataFrame(item_price_pandas)
item_price.show()
Output
+-------+-----+----------------+
|item_id|price|transaction_date|
+-------+-----+----------------+
| 1| 4| 2025-01-15|
| 2| 2| 2025-02-01|
| 3| 5| 2025-03-10|
| 4| 1| 2025-04-22|
+-------+-----+----------------+
Traditional PySpark Query Approach
The traditional approach uses f-strings to build SQL, which is not ideal because:
- Security Risk: Interpolated strings can expose your query to SQL injection.
- Limited Flexibility: F-strings can’t handle Python objects like DataFrames directly, so you have to create temporary views and manually quote values like dates to match SQL syntax.
item_price.createOrReplaceTempView("item_price_view")
transaction_date_str = "2025-02-15"
query_with_fstring = f"""SELECT *
FROM item_price_view
WHERE transaction_date > '{transaction_date_str}'
"""
spark.sql(query_with_fstring).show()
Output
+-------+-----+----------------+
|item_id|price|transaction_date|
+-------+-----+----------------+
| 3| 5| 2025-03-10|
| 4| 1| 2025-04-22|
+-------+-----+----------------+
Parameterized Queries with PySpark Custom String Formatting
PySpark supports parameterized SQL with custom string formatting, separating SQL logic from parameter values. During parsing, it safely handles each value as a typed literal and inserts it into the SQL parse tree, preventing injection attacks and ensuring correct data types.
Query
├── SELECT
│ └── *
├── FROM
│ └── {item_price}
└── WHERE
└── Condition
├── Left: transaction_date
├── Operator: >
└── Right: {transaction_date}
Because it handles each value as a typed literal, it treats the value according to its actual data type, not as raw text, when inserting it into a SQL query, meaning:
item_price
can be passed directly without creating a temporary viewtransaction_date
does not need to be manually wrapped in single quotes
parametrized_query = """SELECT *
FROM {item_price}
WHERE transaction_date > {transaction_date}
"""
spark.sql(
parametrized_query, item_price=item_price, transaction_date=transaction_date_str
).show()
Output:
+-------+-----+----------------+
|item_id|price|transaction_date|
+-------+-----+----------------+
| 3| 5| 2025-03-10|
| 4| 1| 2025-04-22|
+-------+-----+----------------+
Parameterized Queries with Parameter Markers
Custom string formatting would treat date(2023, 2, 15)
as a mathematical expression rather than a date, which would cause a type mismatch error.
parametrized_query = """SELECT *
FROM {item_price}
WHERE transaction_date > {transaction_date}
"""
spark.sql(parametrized_query, item_price=item_price, transaction_date=transaction_date).show()
Output:
[DATATYPE_MISMATCH.BINARY_OP_DIFF_TYPES] Cannot resolve "(transaction_date > ((2023 - 2) - 15))" due to data type mismatch
Parameter markers preserve type information, so date
objects are passed as proper SQL DATE literals. This allows you to safely use Python dates without formatting or quoting them manually.
query_with_markers = """SELECT *
FROM {item_price}
WHERE transaction_date > :transaction_date
"""
transaction_date = date(2025, 2, 15)
spark.sql(
query_with_markers,
item_price=item_price,
args={"transaction_date": transaction_date},
).show()
Make PySpark SQL Easier to Reuse
Parameterized SQL templates are easier to reuse across your codebase. Instead of copying and pasting full SQL strings with values hardcoded inside, you can define flexible query templates that accept different input variables.
Here’s a reusable query to filter using different transaction dates:
transaction_date_1 = date(2025, 3, 9)
spark.sql(
query_with_markers,
item_price=item_price,
args={"transaction_date": transaction_date_1},
).show()
Output:
+-------+-----+----------------+
|item_id|price|transaction_date|
+-------+-----+----------------+
| 3| 5| 2025-03-10|
| 4| 1| 2025-04-22|
+-------+-----+----------------+
transaction_date_2 = date(2025, 3, 15)
spark.sql(
query_with_markers,
item_price=item_price,
args={"transaction_date": transaction_date_2},
).show()
Output:
+-------+-----+----------------+
|item_id|price|transaction_date|
+-------+-----+----------------+
| 4| 1| 2025-04-22|
+-------+-----+----------------+
Easier Unit Testing with PySpark Parameterized Queries
Parameterization also simplifies testing by letting you pass different inputs into a reusable query string.
For example, in the code below, we define a function that takes a DataFrame and a threshold value, then filters rows using a parameterized query.
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
def filter_by_price_threshold(df, amount):
return spark.sql(
"SELECT * from {df} where price > :amount", df=df, args={"amount": amount}
)
Because the values are passed separately from the SQL logic, we can easily reuse and test this function with different parameters without rewriting the query itself.
def test_query_return_correct_number_of_rows():
# Create test input DataFrame
df = spark.createDataFrame(
[
("Product 1", 10.0, 5),
("Product 2", 15.0, 3),
("Product 3", 8.0, 2),
],
["name", "price", "quantity"],
)
# Execute query with parameters
assert filter_by_price_threshold(df, 10).count() == 1
assert filter_by_price_threshold(df, 8).count() == 2
For more tips on validating DataFrame outputs effectively, see best practices for PySpark DataFrame comparison and testing.
Summary: Benefits of Using Parameterized Queries in PySpark
Using parameterized queries in PySpark offers several advantages:
- Security: Prevents SQL injection.
- Simplicity: Avoids temporary views and quoting hassles.
- Testability: Supports reusable, testable query templates.
- Readability: Makes queries cleaner and easier to understand.
Adopting this technique leads to more robust and maintainable Spark-based data pipelines.
4 thoughts on “Writing Safer PySpark Queries with Parameters”
Hi there to all, for the reason that I am genuinely keen of reading this website’s post to be updated on a regular basis. It carries pleasant stuff.
Thanks so much for the compliment, Lola. This made my day. I hope you continue enjoying the future content
could I use these templates to build a chatbot to answer questions about data in my database?
For chatbot, you might want to checkout something like langchain instead.