...
🚀 Cyber Monday Week • 58% OFF EBOOK & 10% OFF PAPERBACK • VIEW THE BOOK 🚀
Generic selectors
Exact matches only
Search in title
Search in content
Post Type Selectors
Filter by Categories
About Article
Analyze Data
Archive
Best Practices
Better Outputs
Blog
Code Optimization
Code Quality
Command Line
Daily tips
Dashboard
Data Analysis & Manipulation
Data Engineer
Data Visualization
DataFrame
Delta Lake
DevOps
DuckDB
Environment Management
Feature Engineer
Git
Jupyter Notebook
LLM
LLM Tools
Machine Learning
Machine Learning & AI
Machine Learning Tools
Manage Data
MLOps
Natural Language Processing
Newsletter Archive
NumPy
Pandas
Polars
PySpark
Python Helpers
Python Tips
Python Utilities
Scrape Data
SQL
Testing
Time Series
Tools
Visualization
Visualization & Reporting
Workflow & Automation
Workflow Automation

python

Auto-created tag for python

The Hidden Cost of Python Dictionaries (And 3 Safer Alternatives)

Table of Contents

Introduction
What Are Typed Data Containers?
Using Dictionaries
Using NamedTuple
Using dataclass
Using Pydantic
Final Thoughts
Related Tutorials

Introduction
Imagine you’re processing customer records. The pipeline runs without errors, but customers never receive their welcome emails. After digging through the code, you discover the issue is a simple typo in a dictionary key.
def load_customer(row):
return {"customer_id": row[0], "name": row[1], "emial": row[2]} # Typo

def send_welcome_email(customer):
email = customer.get("email") # Returns None silently
if email:
print(f"Sending email to {email}")
# No email sent, no error raised

customer = load_customer(["C001", "Alice", "alice@example.com"])
send_welcome_email(customer) # Nothing happens

Since .get() returns None for a missing key, the bug stays hidden.
This is exactly the type of issue we want to catch earlier. In this article, we’ll look at how typed data containers like NamedTuple, dataclass, and Pydantic help surface these bugs at runtime.
💻 Get the Code: The complete source code and Jupyter notebook for this tutorial are available on GitHub. Clone it to follow along!
What Are Typed Data Containers?
Python offers several ways to structure data, each adding more safety than the last:

dict: No protection. Bugs surface only when you access a missing key.
NamedTuple: Basic safety. Catches typos at write time in your IDE and at runtime.
dataclass: Static analysis support. Tools like mypy catch errors before your code runs.
Pydantic: Full protection. Validates data the moment you create an instance.

Let’s see how each tool handles the same customer data:
Using Dictionaries
Dictionaries are quick to create but provide no safety:
customer = {
"customer_id": "C001",
"name": "Alice Smith",
"email": "alice@example.com",
"age": 28,
"is_premium": True,
}

print(customer["name"])

Alice Smith

Typo Bugs
A typo in the key name causes a KeyError at runtime:
customer["emial"] # Typo: should be "email"

KeyError: 'emial'

The error tells you what went wrong but not where. When dictionaries pass through multiple functions, finding the source of a typo can take significant debugging time:
def load_customer(row):
return {"customer_id": row[0], "name": row[1], "emial": row[2]} # Typo here

def validate_customer(customer):
return customer # Passes through unchanged

def send_email(customer):
return customer["email"] # KeyError raised here

customer = load_customer(["C001", "Alice", "alice@example.com"])
validated = validate_customer(customer)
send_email(validated) # Error points here, but bug is in load_customer

KeyError Traceback (most recent call last)
13 customer = load_customer(["C001", "Alice", "alice@example.com"])
14 validated = validate_customer(customer)
—> 15 send_email(validated) # Error points here, but bug is in load_customer

Cell In[6], line 10, in send_email(customer)
9 def send_email(customer):
—> 10 return customer["email"]

KeyError: 'email'

The stack trace shows where the KeyError was raised, not where "emial" was written. The bug and its symptom are 13 lines apart here, but in production code, they could be in different files entirely.
Using .get() makes it worse by returning None silently:
email = customer.get("email") # Returns None – key is "emial" not "email"
print(f"Sending email to: {email}")

Sending email to: None

This silent failure is dangerous: your notification system might skip thousands of customers, or worse, your code could write None to a database column, corrupting your data pipeline.
Type Confusion
Typos cause crashes, but wrong types can corrupt your data silently. Since dictionaries have no schema, nothing stops you from assigning the wrong type to a field:
customer = {
"customer_id": "C001",
"name": 123, # Should be a string
"age": "twenty-eight", # Should be an integer
}

total_age = customer["age"] + 5

TypeError: can only concatenate str (not "int") to str

The error message is misleading: it says “concatenate str” but the real problem is that age should never have been a string in the first place.
Using NamedTuple
NamedTuple is a lightweight way to define a fixed structure with named fields and type hints, like a dictionary with a schema:
from typing import NamedTuple

class Customer(NamedTuple):
customer_id: str
name: str
email: str
age: int
is_premium: bool

customer = Customer(
customer_id="C001",
name="Alice Smith",
email="alice@example.com",
age=28,
is_premium=True,
)

print(customer.name)

Alice Smith

IDE Autocomplete Catches Typos
Your IDE can’t autocomplete dictionary keys, so typing customer[" shows no suggestions. With NamedTuple, typing customer. displays all available fields: customer_id, name, email, age, is_premium.
Even if you skip autocomplete and type manually, typos are flagged instantly with squiggly lines:
customer.emial
~~~~~

Running the code will raise an error:
customer.emial

AttributeError: 'Customer' object has no attribute 'emial'

The error names the exact object and missing attribute, so you know immediately what to fix.
Immutability Prevents Accidental Changes
NamedTuples are immutable, meaning once created, their values cannot be changed:
customer.name = "Bob" # Raises an error

AttributeError: can't set attribute

This prevents bugs where data is accidentally modified during processing.
Limitations: No Runtime Type Validation
Type hints in NamedTuple are not enforced at runtime, so you can still pass in wrong types:
# Wrong types are accepted without error
customer = Customer(
customer_id="C001",
name=123, # Should be str, but int is accepted
email="alice@example.com",
age="twenty-eight", # Should be int, but str is accepted
is_premium=True,
)

print(f"Name: {customer.name}, Age: {customer.age}")

Name: 123, Age: twenty-eight

The code runs, but with incorrect data types. The bug surfaces later when you try to use the data.
Using dataclass
dataclass reduces the boilerplate of writing classes that mainly hold data. Instead of manually writing __init__ and other methods, you just declare your fields.
It provides the same IDE support as NamedTuple, plus three additional features:

Mutable objects: You can change field values after creation
Mutable defaults: Safe defaults for lists and dicts with field(default_factory=list)
Post-init logic: Run custom validation or compute derived fields with __post_init__

from dataclasses import dataclass

@dataclass
class Customer:
customer_id: str
name: str
email: str
age: int
is_premium: bool = False # Default value

customer = Customer(
customer_id="C001",
name="Alice Smith",
email="alice@example.com",
age=28,
)

print(f"{customer.name}, Premium: {customer.is_premium}")

Alice Smith, Premium: False

Mutability Allows Updates
Dataclass trades NamedTuple’s immutability protection for flexibility. You can modify fields after creation:
customer.name = "Alice Johnson" # Changed after marriage
customer.is_premium = True # Upgraded their account

print(f"{customer.name}, Premium: {customer.is_premium}")

Alice Johnson, Premium: True

For extra safety, use @dataclass(slots=True) to prevent accidentally adding new attributes:
@dataclass(slots=True)
class Customer:
customer_id: str
name: str
email: str
age: int
is_premium: bool = False

customer = Customer(
customer_id="C001",
name="Alice",
email="alice@example.com",
age=28,
)

customer.nmae = "Bob" # Typo

AttributeError: 'Customer' object has no attribute 'nmae'

Mutable Defaults with default_factory
Mutable defaults like lists don’t work as expected. You might think each instance gets its own empty list, but Python creates the default [] once and all instances share it:
from typing import NamedTuple

class Order(NamedTuple):
order_id: str
items: list = []

order1 = Order("001")
order2 = Order("002")

order1.items.append("apple")
print(f"Order 1: {order1.items}")
print(f"Order 2: {order2.items}") # Also has "apple"!

Order 1: ['apple']
Order 2: ['apple']

Order 2 has “apple” even though we only added it to Order 1. Modifying one order’s items affects every order.
Dataclass prevents this mistake by rejecting mutable defaults:
@dataclass
class Order:
items: list = []

ValueError: mutable default <class 'list'> for field items is not allowed: use default_factory

Dataclass offers field(default_factory=…) as the solution. The factory function runs at instance creation, not class definition, so each object gets its own list:
from dataclasses import dataclass, field

@dataclass
class Order:
order_id: str
items: list = field(default_factory=list) # Each instance gets its own list

order1 = Order("001")
order2 = Order("002")

order1.items.append("apple")
print(f"Order 1: {order1.items}")
print(f"Order 2: {order2.items}") # Not affected by order1

Order 1: ['apple']
Order 2: []

Unlike the NamedTuple example, Order 2 stays empty because it has its own list.
Post-Init Validation with __post_init__
Without validation, invalid data passes through silently:
@dataclass
class Customer:
customer_id: str
name: str
email: str
age: int
is_premium: bool = False

customer = Customer(
customer_id="C001",
name="", # Empty name
email="invalid",
age=-100,
)
print(f"Created: {customer}") # No error – bad data is in your system

Created: Customer(customer_id='C001', name='', email='invalid', age=-100, is_premium=False)

Dataclass provides __post_init__ to catch these issues at creation time so you can validate fields before the object is used:
@dataclass
class Customer:
customer_id: str
name: str
email: str
age: int
is_premium: bool = False

def __post_init__(self):
if self.age < 0:
raise ValueError(f"Age cannot be negative: {self.age}")
if "@" not in self.email:
raise ValueError(f"Invalid email: {self.email}")

customer = Customer(
customer_id="C001",
name="Alice",
email="invalid-email",
age=28,
)

ValueError: Invalid email: invalid-email

The error message tells you exactly what’s wrong, making the bug easy to fix.
Limitations: Manual Validation Only
__post_init__ requires you to write every validation rule yourself. If you forget to check a field, bad data can still slip through.
In this example, __post_init__ only validates email format, so wrong types for name and age pass undetected:
@dataclass
class Customer:
customer_id: str
name: str
email: str
age: int
is_premium: bool = False

def __post_init__(self):
if "@" not in self.email:
raise ValueError(f"Invalid email: {self.email}")

customer = Customer(
customer_id="C001",
name=123, # No validation for name type
email="alice@example.com",
age="twenty-eight", # No validation for age type
)

print(f"Name: {customer.name}, Age: {customer.age}")

Name: 123, Age: twenty-eight

Type hints alone don’t enforce types at runtime. For automatic validation, you need a library that actually checks types when objects are created.
📚 For comprehensive coverage of dataclasses and Pydantic in production workflows, check out Production-Ready Data Science.
Using Pydantic
Pydantic is a data validation library that enforces type hints at runtime. Unlike NamedTuple and dataclass, it actually checks that values match their declared types when objects are created. Install it with:
pip install pydantic

To create a Pydantic model, inherit from BaseModel and declare your fields with type hints:
from pydantic import BaseModel

class Customer(BaseModel):
customer_id: str
name: str
email: str
age: int
is_premium: bool = False

customer = Customer(
customer_id="C001",
name="Alice Smith",
email="alice@example.com",
age=28,
)

print(f"{customer.name}, Age: {customer.age}")

Alice Smith, Age: 28

For using Pydantic to enforce structured outputs from AI models, see our PydanticAI tutorial.
Runtime Validation
Remember how dataclass accepted name=123 without complaint? Pydantic catches this automatically with a ValidationError:
from pydantic import BaseModel, ValidationError

class Customer(BaseModel):
customer_id: str
name: str
email: str
age: int
is_premium: bool = False

try:
customer = Customer(
customer_id="C001",
name=123,
email="alice@example.com",
age="thirty",
)
except ValidationError as e:
print(e)

2 validation errors for Customer
name
Input should be a valid string [type=string_type, input_value=123, input_type=int]
age
Input should be a valid integer, unable to parse string as an integer [type=int_parsing, input_value='thirty', input_type=str]

The error message shows:

Which fields failed validation (name, age)
What was expected (valid string, valid integer)
What was received (123 as int, 'thirty' as str)

This tells you everything you need to fix the bug in one place, instead of digging through stack traces.
Type Coercion
Unlike dataclass which stores whatever you pass, Pydantic automatically converts compatible types to match your type hints:
customer = Customer(
customer_id="C001",
name="Alice Smith",
email="alice@example.com",
age="28", # String "28" is converted to int 28
is_premium="true", # String "true" is converted to bool True
)

print(f"Age: {customer.age} (type: {type(customer.age).__name__})")
print(f"Premium: {customer.is_premium} (type: {type(customer.is_premium).__name__})")

Age: 28 (type: int)
Premium: True (type: bool)

This is useful when reading data from CSV files or APIs where everything comes as strings.
Constraint Validation
Beyond types, you often need business rules: age must be positive, names can’t be empty, customer IDs must follow a pattern.
In dataclass, you define fields in one place and validate them in __post_init__. The validation logic grows with each constraint:
@dataclass
class Customer:
customer_id: str
name: str
email: str
age: int
is_premium: bool = False

def __post_init__(self):
if not self.customer_id:
raise ValueError("Customer ID cannot be empty")
if not self.name or len(self.name) < 1:
raise ValueError("Name cannot be empty")
if "@" not in self.email:
raise ValueError(f"Invalid email: {self.email}")
if self.age < 0 or self.age > 150:
raise ValueError(f"Age must be between 0 and 150: {self.age}")

Pydantic puts constraints directly in Field(), keeping rules next to the data they validate:
from pydantic import BaseModel, Field, ValidationError

class Customer(BaseModel):
customer_id: str
name: str = Field(min_length=1)
email: str
age: int = Field(ge=0, le=150) # Age must be between 0 and 150
is_premium: bool = False

try:
customer = Customer(
customer_id="C001",
name="", # Empty name
email="alice@example.com",
age=-5, # Negative age
)
except ValidationError as e:
print(e)

2 validation errors for Customer
name
String should have at least 1 character [type=string_too_short, input_value='', input_type=str]
age
Input should be greater than or equal to 0 [type=greater_than_equal, input_value=-5, input_type=int]

Nested Validation
Data structures are rarely flat. A customer has an address, an order contains items. When something is wrong inside a nested object, you need to know exactly where.
Pydantic validates each level and reports the full path to any error:
from pydantic import BaseModel, Field, ValidationError

class Address(BaseModel):
street: str
city: str
zip_code: str = Field(pattern=r"^\d{5}$") # Must be 5 digits

class Customer(BaseModel):
customer_id: str
name: str
address: Address

try:
customer = Customer(
customer_id="C001",
name="Alice Smith",
address={
"street": "123 Main St",
"city": "New York",
"zip_code": "invalid", # Invalid zip code
},
)
except ValidationError as e:
print(e)

1 validation error for Customer
address.zip_code
String should match pattern '^\d{5}$' [type=string_pattern_mismatch, input_value='invalid', input_type=str]

The error message shows address.zip_code, pinpointing the exact location in the nested structure.
For extracting structured data from documents using Pydantic, see our LlamaIndex data extraction guide.
Final Thoughts
To summarize what each tool provides:

dict: Quick to create. No structure or validation.
NamedTuple: Fixed structure with IDE autocomplete. Immutable.
dataclass: Mutable fields, safe defaults, custom logic via __post_init__.
Pydantic: Runtime type enforcement, automatic type coercion, built-in constraints.

Personally, I use dict for quick prototyping:
stats = {"rmse": 0.234, "mae": 0.189, "r2": 0.91}

Then Pydantic when the code moves to production. For example, a training config should reject invalid values like negative learning rates:
from pydantic import BaseModel, Field

class TrainingConfig(BaseModel):
epochs: int = Field(ge=1)
batch_size: int = Field(ge=1)
learning_rate: float = Field(gt=0)

config = TrainingConfig(epochs=10, batch_size=32, learning_rate=0.001)

Pick the level of protection that matches your needs. A notebook experiment doesn’t need Pydantic, but a production API does.
Related Tutorials

Database Integration: SQLModel vs psycopg2 for combining Pydantic-style validation with databases
Testing: Pytest for Data Scientists to test your data containers
Configuration: Hydra for Python Configuration for validated configuration management

Favorite

The Hidden Cost of Python Dictionaries (And 3 Safer Alternatives) Read More »

pandas vs Polars vs DuckDB: A Data Scientist’s Guide to Choosing the Right Tool

Table of Contents

Introduction
Tool Strengths at a Glance
Setup
Syntax Comparison
Data Loading Performance
Query Optimization
GroupBy Performance
Memory Efficiency
Join Operations
Interoperability
Decision Matrix
Final Thoughts

Introduction
pandas has been the standard tool for working with tabular data in Python for over a decade. But as datasets grow larger and performance requirements increase, two modern alternatives have emerged: Polars, a DataFrame library written in Rust, and DuckDB, an embedded SQL database optimized for analytics.
Each tool excels in different scenarios:

Tool
Backend
Execution Model
Best For

pandas
C/Python
Eager, single-threaded
Small datasets, prototyping, ML integration

Polars
Rust
Lazy/Eager, multi-threaded
Large-scale analytics, data pipelines

DuckDB
C++
SQL-first, multi-threaded
SQL workflows, embedded analytics, file queries

This guide compares all three tools with practical examples, helping you choose the right one for your workflow.

💻 Get the Code: The complete source code and Jupyter notebook for this tutorial are available on GitHub. Clone it to follow along!

Tool Strengths at a Glance
pandas
pandas is the original DataFrame library for Python that excels at interactive data exploration and integrates seamlessly with the ML ecosystem. Key capabilities include:

Direct compatibility with scikit-learn, statsmodels, and visualization libraries
Rich ecosystem of extensions (pandas-profiling, pandasql, etc.)
Mature time series functionality
Familiar syntax that most data scientists already know

Polars
Polars is a Rust-powered DataFrame library designed for speed that brings multi-threaded execution and query optimization to Python. Key capabilities include:

Speeds up operations by using all available CPU cores by default
Builds a query plan first, then executes only what’s needed
Streaming mode for processing datasets larger than RAM
Expressive method chaining with a pandas-like API

DuckDB
DuckDB is an embedded SQL database optimized for analytics that brings database-level query optimization to local files. Key capabilities include:

Native SQL syntax with full analytical query support
Queries CSV, Parquet, and JSON files directly without loading
Uses disk storage automatically when data exceeds available memory
Zero-configuration embedded database requiring no server setup

Setup
Install all three libraries:
pip install pandas polars duckdb

Generate sample data for benchmarking:
import pandas as pd
import numpy as np

np.random.seed(42)
n_rows = 5_000_000

data = {
"category": np.random.choice(["Electronics", "Clothing", "Food", "Books"], size=n_rows),
"region": np.random.choice(["North", "South", "East", "West"], size=n_rows),
"amount": np.random.rand(n_rows) * 1000,
"quantity": np.random.randint(1, 100, size=n_rows),
}

df_pandas = pd.DataFrame(data)
df_pandas.to_csv("sales_data.csv", index=False)
print(f"Created sales_data.csv with {n_rows:,} rows")

Created sales_data.csv with 5,000,000 rows

Syntax Comparison
All three tools can perform the same operations with different syntax. Here’s a side-by-side comparison of common tasks.
Filtering Rows
pandas:
Uses bracket notation with boolean conditions, which is concise but can become hard to read with complex conditions.
import pandas as pd

df_pd = pd.read_csv("sales_data.csv")
result_pd = df_pd[(df_pd["amount"] > 500) & (df_pd["category"] == "Electronics")]
result_pd.head()

category
region
amount
quantity

7
Electronics
West
662.803066
80

15
Electronics
North
826.004963
25

30
Electronics
North
766.081832
7

31
Electronics
West
772.084261
36

37
Electronics
East
527.967145
35

Polars:
Uses method chaining with pl.col() expressions, avoiding the repeated df["column"] references required by pandas.
import polars as pl

df_pl = pl.read_csv("sales_data.csv")
result_pl = df_pl.filter(
(pl.col("amount") > 500) & (pl.col("category") == "Electronics")
)
result_pl.head()

category
region
amount
quantity

str
str
f64
i64

“Electronics”
“West”
662.803066
80

“Electronics”
“North”
826.004963
25

“Electronics”
“North”
766.081832
7

“Electronics”
“West”
772.084261
36

“Electronics”
“East”
527.967145
35

DuckDB:
Uses standard SQL with a WHERE clause, which is more readable by those who know SQL.
import duckdb

result_duckdb = duckdb.sql("""
SELECT * FROM 'sales_data.csv'
WHERE amount > 500 AND category = 'Electronics'
""").df()
result_duckdb.head()

category
region
amount
quantity

0
Electronics
West
662.803066
80

1
Electronics
North
826.004963
25

2
Electronics
North
766.081832
7

3
Electronics
West
772.084261
36

4
Electronics
East
527.967145
35

Selecting Columns
pandas:
Double brackets return a DataFrame with selected columns.
result_pd = df_pd[["category", "amount"]]
result_pd.head()

category
amount

0
Food
516.653322

1
Books
937.337226

2
Electronics
450.941022

3
Food
674.488081

4
Food
188.847906

Polars:
The select() method clearly communicates column selection intent.
result_pl = df_pl.select(["category", "amount"])
result_pl.head()

category
amount

str
f64

“Food”
516.653322

“Books”
937.337226

“Electronics”
450.941022

“Food”
674.488081

“Food”
188.847906

DuckDB:
SQL’s SELECT clause makes column selection intuitive for SQL users.
result_duckdb = duckdb.sql("""
SELECT category, amount FROM 'sales_data.csv'
""").df()
result_duckdb.head()

category
amount

0
Food
516.653322

1
Books
937.337226

2
Electronics
450.941022

3
Food
674.488081

4
Food
188.847906

GroupBy Aggregation
pandas:
Uses a dictionary to specify aggregations, but returns multi-level column headers that often require flattening before further use.
result_pd = df_pd.groupby("category").agg({
"amount": ["sum", "mean"],
"quantity": "sum"
})
result_pd.head()

amount

quantity

sum
mean
sum

Books
6.247506e+08
499.998897
62463285

Clothing
6.253924e+08
500.139837
62505224

Electronics
6.244453e+08
499.938189
62484265

Food
6.254034e+08
499.916417
62577943

Polars:
Uses explicit alias() calls for each aggregation, producing flat column names directly without post-processing.
result_pl = df_pl.group_by("category").agg([
pl.col("amount").sum().alias("amount_sum"),
pl.col("amount").mean().alias("amount_mean"),
pl.col("quantity").sum().alias("quantity_sum"),
])
result_pl.head()

category
amount_sum
amount_mean
quantity_sum

str
f64
f64
i64

“Clothing”
6.2539e8
500.139837
62505224

“Books”
6.2475e8
499.998897
62463285

“Electronics”
6.2445e8
499.938189
62484265

“Food”
6.2540e8
499.916417
62577943

DuckDB:
Standard SQL aggregation with column aliases produces clean, flat output ready for downstream use.
result_duckdb = duckdb.sql("""
SELECT
category,
SUM(amount) as amount_sum,
AVG(amount) as amount_mean,
SUM(quantity) as quantity_sum
FROM 'sales_data.csv'
GROUP BY category
""").df()
result_duckdb.head()

category
amount_sum
amount_mean
quantity_sum

0
Food
6.254034e+08
499.916417
62577943.0

1
Electronics
6.244453e+08
499.938189
62484265.0

2
Clothing
6.253924e+08
500.139837
62505224.0

3
Books
6.247506e+08
499.998897
62463285.0

Adding Columns
pandas:
The assign() method creates new columns with repeated DataFrame references like df_pd["amount"].
result_pd = df_pd.assign(
amount_with_tax=df_pd["amount"] * 1.1,
high_value=df_pd["amount"] > 500
)
result_pd.head()

category
region
amount
quantity
amount_with_tax
high_value

0
Food
South
516.653322
40
568.318654
True

1
Books
East
937.337226
45
1031.070948
True

2
Electronics
North
450.941022
93
496.035124
False

3
Food
East
674.488081
46
741.936889
True

4
Food
East
188.847906
98
207.732697
False

Polars:
The with_columns() method uses composable expressions that chain naturally without repeating the DataFrame name.
result_pl = df_pl.with_columns([
(pl.col("amount") * 1.1).alias("amount_with_tax"),
(pl.col("amount") > 500).alias("high_value")
])
result_pl.head()

category
region
amount
quantity
amount_with_tax
high_value

str
str
f64
i64
f64
bool

“Food”
“South”
516.653322
40
568.318654
true

“Books”
“East”
937.337226
45
1031.070948
true

“Electronics”
“North”
450.941022
93
496.035124
false

“Food”
“East”
674.488081
46
741.936889
true

“Food”
“East”
188.847906
98
207.732697
false

DuckDB:
SQL’s SELECT clause defines new columns directly in the query, keeping transformations readable.
result_duckdb = duckdb.sql("""
SELECT *,
amount * 1.1 as amount_with_tax,
amount > 500 as high_value
FROM df_pd
""").df()
result_duckdb.head()

category
region
amount
quantity
amount_with_tax
high_value

0
Food
South
516.653322
40
568.318654
True

1
Books
East
937.337226
45
1031.070948
True

2
Electronics
North
450.941022
93
496.035124
False

3
Food
East
674.488081
46
741.936889
True

4
Food
East
188.847906
98
207.732697
False

Conditional Logic
pandas:
Requires np.where() for simple conditions or slow apply() for complex logic, which breaks method chaining.
import numpy as np

result_pd = df_pd.assign(
value_tier=np.where(
df_pd["amount"] > 700, "high",
np.where(df_pd["amount"] > 300, "medium", "low")
)
)
result_pd[["category", "amount", "value_tier"]].head()

category
amount
value_tier

0
Food
516.653322
medium

1
Books
937.337226
high

2
Electronics
450.941022
medium

3
Food
674.488081
medium

4
Food
188.847906
low

Polars:
The when().then().otherwise() chain is readable and integrates naturally with method chaining.
result_pl = df_pl.with_columns(
pl.when(pl.col("amount") > 700).then(pl.lit("high"))
.when(pl.col("amount") > 300).then(pl.lit("medium"))
.otherwise(pl.lit("low"))
.alias("value_tier")
)
result_pl.select(["category", "amount", "value_tier"]).head()

category
amount
value_tier

str
f64
str

“Food”
516.653322
“medium”

“Books”
937.337226
“high”

“Electronics”
450.941022
“medium”

“Food”
674.488081
“medium”

“Food”
188.847906
“low”

DuckDB:
Standard SQL CASE WHEN syntax is more readable by those who know SQL.
result_duckdb = duckdb.sql("""
SELECT category, amount,
CASE
WHEN amount > 700 THEN 'high'
WHEN amount > 300 THEN 'medium'
ELSE 'low'
END as value_tier
FROM df_pd
""").df()
result_duckdb.head()

category
amount
value_tier

0
Food
516.653322
medium

1
Books
937.337226
high

2
Electronics
450.941022
medium

3
Food
674.488081
medium

4
Food
188.847906
low

Window Functions
pandas:
Uses groupby().transform() which requires repeating the groupby clause for each calculation.
result_pd = df_pd.assign(
category_avg=df_pd.groupby("category")["amount"].transform("mean"),
category_rank=df_pd.groupby("category")["amount"].rank(ascending=False)
)
result_pd[["category", "amount", "category_avg", "category_rank"]].head()

category
amount
category_avg
category_rank

0
Food
516.653322
499.916417
604342.0

1
Books
937.337226
499.998897
78423.0

2
Electronics
450.941022
499.938189
685881.0

3
Food
674.488081
499.916417
407088.0

4
Food
188.847906
499.916417
1015211.0

Polars:
The over() expression appends the partition to any calculation, avoiding repeated group definitions.
result_pl = df_pl.with_columns([
pl.col("amount").mean().over("category").alias("category_avg"),
pl.col("amount").rank(descending=True).over("category").alias("category_rank")
])
result_pl.select(["category", "amount", "category_avg", "category_rank"]).head()

category
amount
category_avg
category_rank

str
f64
f64
f64

“Food”
516.653322
499.916417
604342.0

“Books”
937.337226
499.998897
78423.0

“Electronics”
450.941022
499.938189
685881.0

“Food”
674.488081
499.916417
407088.0

“Food”
188.847906
499.916417
1015211.0

DuckDB:
SQL window functions with OVER (PARTITION BY …) are the industry standard for this type of calculation.
result_duckdb = duckdb.sql("""
SELECT category, amount,
AVG(amount) OVER (PARTITION BY category) as category_avg,
RANK() OVER (PARTITION BY category ORDER BY amount DESC) as category_rank
FROM df_pd
""").df()
result_duckdb.head()

category
amount
category_avg
category_rank

0
Clothing
513.807166
500.139837
608257

1
Clothing
513.806596
500.139837
608258

2
Clothing
513.806515
500.139837
608259

3
Clothing
513.806063
500.139837
608260

4
Clothing
513.806056
500.139837
608261

Data Loading Performance
pandas reads CSV files on a single CPU core. Polars and DuckDB use multi-threaded execution, distributing the work across all available cores to read different parts of the file simultaneously.
pandas
Single-threaded CSV parsing loads data sequentially.
┌─────────────────────────────────────────────┐
│ CPU Core 1 │
│ ┌─────────────────────────────────────────┐ │
│ │ Chunk 1 → Chunk 2 → Chunk 3 → … → End │ │
│ └─────────────────────────────────────────┘ │
│ CPU Core 2 [idle] │
│ CPU Core 3 [idle] │
│ CPU Core 4 [idle] │
└─────────────────────────────────────────────┘

pandas_time = %timeit -o pd.read_csv("sales_data.csv")

1.05 s ± 26.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

Polars
Multi-threaded parsing distributes file reading across all available cores.
┌─────────────────────────────────────────────┐
│ CPU Core 1 ┌────────────────┐ │
│ │ ████████████ │ │
│ CPU Core 2 ┌────────────────┐ │
│ │ ████████████ │ │
│ CPU Core 3 ┌────────────────┐ │
│ │ ████████████ │ │
│ CPU Core 4 ┌────────────────┐ │
│ │ ████████████ │ │
└─────────────────────────────────────────────┘

polars_time = %timeit -o pl.read_csv("sales_data.csv")

137 ms ± 34 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

DuckDB
Similar to Polars, file reading is distributed across all available cores.
┌─────────────────────────────────────────────┐
│ CPU Core 1 ┌────────────────┐ │
│ │ ████████████ │ │
│ CPU Core 2 ┌────────────────┐ │
│ │ ████████████ │ │
│ CPU Core 3 ┌────────────────┐ │
│ │ ████████████ │ │
│ CPU Core 4 ┌────────────────┐ │
│ │ ████████████ │ │
└─────────────────────────────────────────────┘

duckdb_time = %timeit -o duckdb.sql("SELECT * FROM 'sales_data.csv'").df()

762 ms ± 77.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

print(f"Polars is {pandas_time.average / polars_time.average:.1f}× faster than pandas")
print(f"DuckDB is {pandas_time.average / duckdb_time.average:.1f}× faster than pandas")

Polars is 7.7× faster than pandas
DuckDB is 1.4× faster than pandas

While Polars leads with a 7.7× speedup in CSV reading, DuckDB’s 1.4× improvement shows parsing isn’t its focus. DuckDB shines when querying files directly or running complex analytical queries.
Query Optimization
pandas: No Optimization
pandas executes operations eagerly, creating intermediate DataFrames at each step. This wastes memory and prevents optimization.
┌─────────────────────────────────────────────────────────────┐
│ Step 1: Load ALL rows → 10M rows in memory │
│ Step 2: Filter (amount > 100) → 5M rows in memory │
│ Step 3: GroupBy → New DataFrame │
│ Step 4: Mean → Final result │
└─────────────────────────────────────────────────────────────┘
Memory: ████████████████████████████████ (high – stores all intermediates)

def pandas_query():
return (
pd.read_csv("sales_data.csv")
.query('amount > 100')
.groupby('category')['amount']
.mean()
)

pandas_opt_time = %timeit -o pandas_query()

1.46 s ± 88.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

This approach has three problems:

Full CSV load: All rows are read before filtering
No predicate pushdown: Rows are filtered after loading the entire file into memory
No projection pushdown: All columns are loaded, even unused ones

Polars: Lazy Evaluation
Polars supports lazy evaluation, which builds a query plan and optimizes it before execution:
┌─────────────────────────────────────────────────────────────┐
│ Query Plan Built: │
│ scan_csv → filter → group_by → agg │
│ │
│ Optimizations Applied: │
│ • Predicate pushdown (filter during scan) │
│ • Projection pushdown (read only needed columns) │
│ • Multi-threaded execution (parallel across CPU cores) │
└─────────────────────────────────────────────────────────────┘
Memory: ████████ (low – no intermediate DataFrames)

query_pl = (
pl.scan_csv("sales_data.csv")
.filter(pl.col("amount") > 100)
.group_by("category")
.agg(pl.col("amount").mean().alias("avg_amount"))
)

# View the optimized query plan
print(query_pl.explain())

AGGREGATE[maintain_order: false]
[col("amount").mean().alias("avg_amount")] BY [col("category")]
FROM
Csv SCAN [sales_data.csv] [id: 4687118704]
PROJECT 2/4 COLUMNS
SELECTION: [(col("amount")) > (100.0)]

The query plan shows these optimizations:

Predicate pushdown: SELECTION filters during scan, not after loading
Projection pushdown: PROJECT 2/4 COLUMNS reads only what’s needed
Operation reordering: Aggregate runs on filtered data, not the full dataset

Execute the optimized query:
def polars_query():
return (
pl.scan_csv("sales_data.csv")
.filter(pl.col("amount") > 100)
.group_by("category")
.agg(pl.col("amount").mean().alias("avg_amount"))
.collect()
)

polars_opt_time = %timeit -o polars_query()

148 ms ± 32.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

DuckDB: SQL Optimizer
DuckDB’s SQL optimizer applies similar optimizations automatically:
┌─────────────────────────────────────────────────────────────┐
│ Query Plan Built: │
│ SQL → Parser → Optimizer → Execution Plan │
│ │
│ Optimizations Applied: │
│ • Predicate pushdown (WHERE during scan) │
│ • Projection pushdown (SELECT only needed columns) │
│ • Vectorized execution (process 1024 rows per batch) │
└─────────────────────────────────────────────────────────────┘
Memory: ████████ (low – streaming execution)

def duckdb_query():
return duckdb.sql("""
SELECT category, AVG(amount) as avg_amount
FROM 'sales_data.csv'
WHERE amount > 100
GROUP BY category
""").df()

duckdb_opt_time = %timeit -o duckdb_query()

245 ms ± 12.1 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

Let’s compare the performance of the optimized queries:
print(f"Polars is {pandas_opt_time.average / polars_opt_time.average:.1f}× faster than pandas")
print(f"DuckDB is {pandas_opt_time.average / duckdb_opt_time.average:.1f}× faster than pandas")

Polars is 9.9× faster than pandas
DuckDB is 6.0× faster than pandas

Polars outperforms DuckDB (9.9× vs 6.0×) in this benchmark because its Rust-based engine handles the filter-then-aggregate pattern efficiently. DuckDB’s strength lies in complex SQL queries with joins and subqueries.
GroupBy Performance
Computing aggregates requires scanning every row, a workload that scales linearly with CPU cores. This makes groupby operations the clearest test of parallel execution.
Let’s load the data for the groupby benchmarks:
# Load data for fair comparison
df_pd = pd.read_csv("sales_data.csv")
df_pl = pl.read_csv("sales_data.csv")

pandas: Single-Threaded
pandas processes groupby operations on a single CPU core, which becomes a bottleneck on large datasets.
┌─────────────────────────────────────────────────────────────┐
│ CPU Core 1 │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Group A → Group B → Group C → Group D → … → Aggregate │ │
│ └─────────────────────────────────────────────────────────┘ │
│ CPU Core 2 [idle] │
│ CPU Core 3 [idle] │
│ CPU Core 4 [idle] │
└─────────────────────────────────────────────────────────────┘

def pandas_groupby():
return df_pd.groupby("category")["amount"].mean()

pandas_groupby_time = %timeit -o pandas_groupby()

271 ms ± 135 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

Polars: Multi-Threaded
Polars splits data across cores, computes partial aggregates in parallel, then merges the results.
┌─────────────────────────────────────────────────────────────┐
│ CPU Core 1 ┌──────────────┐ │
│ │ ████████████ │ → Partial Aggregate │
│ CPU Core 2 ┌──────────────┐ │
│ │ ████████████ │ → Partial Aggregate │
│ CPU Core 3 ┌──────────────┐ │
│ │ ████████████ │ → Partial Aggregate │
│ CPU Core 4 ┌──────────────┐ │
│ │ ████████████ │ → Partial Aggregate │
│ ↓ │
│ Final Merge → Result │
└─────────────────────────────────────────────────────────────┘

def polars_groupby():
return df_pl.group_by("category").agg(pl.col("amount").mean())

polars_groupby_time = %timeit -o polars_groupby()

31.1 ms ± 3.65 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

DuckDB: Multi-Threaded
Similar to Polars, DuckDB splits data across cores, computes partial aggregates in parallel, then merges the results.
┌─────────────────────────────────────────────────────────────┐
│ CPU Core 1 ┌──────────────┐ │
│ │ ████████████ │ → Partial Aggregate │
│ CPU Core 2 ┌──────────────┐ │
│ │ ████████████ │ → Partial Aggregate │
│ CPU Core 3 ┌──────────────┐ │
│ │ ████████████ │ → Partial Aggregate │
│ CPU Core 4 ┌──────────────┐ │
│ │ ████████████ │ → Partial Aggregate │
│ ↓ │
│ Final Merge → Result │
└─────────────────────────────────────────────────────────────┘

def duckdb_groupby():
return duckdb.sql("""
SELECT category, AVG(amount)
FROM df_pd
GROUP BY category
""").df()

duckdb_groupby_time = %timeit -o duckdb_groupby()

29 ms ± 3.33 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

print(f"Polars is {pandas_groupby_time.average / polars_groupby_time.average:.1f}× faster than pandas")
print(f"DuckDB is {pandas_groupby_time.average / duckdb_groupby_time.average:.1f}× faster than pandas")

Polars is 8.7× faster than pandas
DuckDB is 9.4× faster than pandas

DuckDB and Polars perform similarly (9.4× vs 8.7×), both leveraging parallel execution. DuckDB’s slight edge comes from late materialization and vector-at-a-time pipelined execution, which avoids creating intermediate results that Polars may still materialize for some operations.
Memory Efficiency
pandas: Full Memory Load
pandas loads the entire dataset into RAM:
┌─────────────────────────────────────────────────────────────┐
│ RAM │
│ ┌────────────────────────────────────────────────────────┐ │
│ │████████████████████████████████████████████████████████│ │
│ │██████████████████ ALL 10M ROWS ████████████████████████│ │
│ │████████████████████████████████████████████████████████│ │
│ └────────────────────────────────────────────────────────┘ │
│ Usage: 707,495 KB (entire dataset in memory) │
└─────────────────────────────────────────────────────────────┘

df_pd_mem = pd.read_csv("sales_data.csv")
pandas_mem = df_pd_mem.memory_usage(deep=True).sum() / 1e3
print(f"pandas memory usage: {pandas_mem:,.0f} KB")

pandas memory usage: 707,495 KB

For larger-than-RAM datasets, pandas throws an out-of-memory error.
Polars: Streaming Mode
Polars can process data in streaming mode, handling chunks without loading everything:
┌─────────────────────────────────────────────────────────────┐
│ RAM │
│ ┌────────────────────────────────────────────────────────┐ │
│ │█ │ │
│ │ (result only) │ │
│ │ │ │
│ └────────────────────────────────────────────────────────┘ │
│ Usage: 0.06 KB (streams chunks, keeps only result) │
└─────────────────────────────────────────────────────────────┘

result_pl_stream = (
pl.scan_csv("sales_data.csv")
.group_by("category")
.agg(pl.col("amount").mean())
.collect(streaming=True)
)

polars_mem = result_pl_stream.estimated_size() / 1e3
print(f"Polars result memory: {polars_mem:.2f} KB")

Polars result memory: 0.06 KB

For larger-than-RAM files, use sink_parquet instead of collect(). It writes results directly to disk as chunks are processed, never holding the full dataset in memory:
(
pl.scan_csv("sales_data.csv")
.filter(pl.col("amount") > 500)
.sink_parquet("filtered_sales.parquet")
)

DuckDB: Automatic Spill-to-Disk
DuckDB automatically writes intermediate results to temporary files when data exceeds available RAM:
┌─────────────────────────────────────────────────────────────┐
│ RAM Disk (if needed) │
│ ┌──────────────────────────┐ ┌──────────────────────┐ │
│ │█ │ │░░░░░░░░░░░░░░░░░░░░░░│ │
│ │ (up to 500MB) │ → │ (overflow here) │ │
│ │ │ │ │ │
│ └──────────────────────────┘ └──────────────────────┘ │
│ Usage: 0.42 KB (spills to disk when RAM full) │
└─────────────────────────────────────────────────────────────┘

# Configure memory limit and temp directory
duckdb.sql("SET memory_limit = '500MB'")
duckdb.sql("SET temp_directory = '/tmp/duckdb_temp'")

# DuckDB handles larger-than-RAM automatically
result_duckdb_mem = duckdb.sql("""
SELECT category, AVG(amount) as avg_amount
FROM 'sales_data.csv'
GROUP BY category
""").df()

duckdb_mem = result_duckdb_mem.memory_usage(deep=True).sum() / 1e3
print(f"DuckDB result memory: {duckdb_mem:.2f} KB")

DuckDB result memory: 0.42 KB

DuckDB’s out-of-core processing makes it ideal for embedded analytics where memory is limited.
print(f"pandas: {pandas_mem:,.0f} KB (full dataset)")
print(f"Polars: {polars_mem:.2f} KB (result only)")
print(f"DuckDB: {duckdb_mem:.2f} KB (result only)")
print(f"\nPolars uses {pandas_mem / polars_mem:,.0f}× less memory than pandas")
print(f"DuckDB uses {pandas_mem / duckdb_mem:,.0f}× less memory than pandas")

pandas: 707,495 KB (full dataset)
Polars: 0.06 KB (result only)
DuckDB: 0.42 KB (result only)

Polars uses 11,791,583× less memory than pandas
DuckDB uses 1,684,512× less memory than pandas

The million-fold reduction comes from streaming: Polars and DuckDB process data in chunks and only keep the 4-row result in memory, while pandas must hold all 10 million rows to compute the same aggregation.
Join Operations
Joining tables is one of the most common operations in data analysis. Let’s compare how each tool handles a left join between 1 million orders and 100K customers.
Let’s create two tables for join benchmarking:
# Create orders table (1M rows)
orders_pd = pd.DataFrame({
"order_id": range(1_000_000),
"customer_id": np.random.randint(1, 100_000, size=1_000_000),
"amount": np.random.rand(1_000_000) * 500
})

# Create customers table (100K rows)
customers_pd = pd.DataFrame({
"customer_id": range(100_000),
"region": np.random.choice(["North", "South", "East", "West"], size=100_000)
})

# Convert to Polars
orders_pl = pl.from_pandas(orders_pd)
customers_pl = pl.from_pandas(customers_pd)

pandas: Single-Threaded
pandas processes the join on a single CPU core.
┌─────────────────────────────────────────────┐
│ CPU Core 1 │
│ ┌─────────────────────────────────────────┐ │
│ │ Row 1 → Row 2 → Row 3 → … → Row 1M │ │
│ └─────────────────────────────────────────┘ │
│ CPU Core 2 [idle] │
│ CPU Core 3 [idle] │
│ CPU Core 4 [idle] │
└─────────────────────────────────────────────┘

def pandas_join():
return orders_pd.merge(customers_pd, on="customer_id", how="left")

pandas_join_time = %timeit -o pandas_join()

60.4 ms ± 6.98 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

Polars: Multi-Threaded
Polars distributes the join across all available CPU cores.
┌─────────────────────────────────────────────┐
│ CPU Core 1 ┌────────────────┐ │
│ │ ████████████ │ │
│ CPU Core 2 ┌────────────────┐ │
│ │ ████████████ │ │
│ CPU Core 3 ┌────────────────┐ │
│ │ ████████████ │ │
│ CPU Core 4 ┌────────────────┐ │
│ │ ████████████ │ │
└─────────────────────────────────────────────┘

def polars_join():
return orders_pl.join(customers_pl, on="customer_id", how="left")

polars_join_time = %timeit -o polars_join()

11.8 ms ± 6.42 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

DuckDB: Multi-Threaded
Similar to Polars, DuckDB distributes the join across all available CPU cores.
┌─────────────────────────────────────────────┐
│ CPU Core 1 ┌────────────────┐ │
│ │ ████████████ │ │
│ CPU Core 2 ┌────────────────┐ │
│ │ ████████████ │ │
│ CPU Core 3 ┌────────────────┐ │
│ │ ████████████ │ │
│ CPU Core 4 ┌────────────────┐ │
│ │ ████████████ │ │
└─────────────────────────────────────────────┘

def duckdb_join():
return duckdb.sql("""
SELECT o.*, c.region
FROM orders_pd o
LEFT JOIN customers_pd c ON o.customer_id = c.customer_id
""").df()

duckdb_join_time = %timeit -o duckdb_join()

55.7 ms ± 1.14 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

Let’s compare the performance of the joins:
print(f"Polars is {pandas_join_time.average / polars_join_time.average:.1f}× faster than pandas")
print(f"DuckDB is {pandas_join_time.average / duckdb_join_time.average:.1f}× faster than pandas")

Polars is 5.1× faster than pandas
DuckDB is 1.1× faster than pandas

Polars delivers a 5.1× speedup while DuckDB shows only 1.1× improvement. Both tools use multi-threading, but Polars’ join algorithm and native DataFrame output avoid the conversion overhead that DuckDB incurs when returning results via .df().
Interoperability
All three tools work together seamlessly. Use each tool for what it does best in a single pipeline.
pandas DataFrame to DuckDB
Query pandas DataFrames directly with SQL:
df = pd.DataFrame({
"product": ["A", "B", "C"],
"sales": [100, 200, 150]
})

# DuckDB queries pandas DataFrames by variable name
result = duckdb.sql("SELECT * FROM df WHERE sales > 120").df()
print(result)

product sales
0 B 200
1 C 150

Polars to pandas
Convert Polars DataFrames when ML libraries require pandas:
df_polars = pl.DataFrame({
"feature1": [1, 2, 3],
"feature2": [4, 5, 6],
"target": [0, 1, 0]
})

# Convert to pandas for scikit-learn
df_pandas = df_polars.to_pandas()
print(type(df_pandas))

<class 'pandas.core.frame.DataFrame'>

DuckDB to Polars
Get query results as Polars DataFrames:
result = duckdb.sql("""
SELECT category, SUM(amount) as total
FROM 'sales_data.csv'
GROUP BY category
""").pl()

print(type(result))
print(result)

<class 'polars.dataframe.frame.DataFrame'>
shape: (4, 2)
┌─────────────┬──────────┐
│ category ┆ total │
│ — ┆ — │
│ str ┆ f64 │
╞═════════════╪══════════╡
│ Electronics ┆ 6.2445e8 │
│ Food ┆ 6.2540e8 │
│ Clothing ┆ 6.2539e8 │
│ Books ┆ 6.2475e8 │
└─────────────┴──────────┘

Combined Pipeline Example
Each tool has a distinct strength: DuckDB optimizes SQL queries, Polars parallelizes transformations, and pandas integrates with ML libraries. Combine them in a single pipeline to leverage all three:
# Step 1: DuckDB for initial SQL query
aggregated = duckdb.sql("""
SELECT category, region,
SUM(amount) as total_amount,
COUNT(*) as order_count
FROM 'sales_data.csv'
GROUP BY category, region
""").pl()

# Step 2: Polars for additional transformations
enriched = (
aggregated
.with_columns([
(pl.col("total_amount") / pl.col("order_count")).alias("avg_order_value"),
pl.col("category").str.to_uppercase().alias("category_upper")
])
.filter(pl.col("order_count") > 100000)
)

# Step 3: Convert to pandas for visualization or ML
final_df = enriched.to_pandas()
print(final_df.head())

category region total_amount order_count avg_order_value category_upper
0 Food East 1.563586e+08 312918 499.679004 FOOD
1 Food North 1.563859e+08 312637 500.215456 FOOD
2 Clothing North 1.560532e+08 311891 500.345286 CLOTHING
3 Clothing East 1.565054e+08 312832 500.285907 CLOTHING
4 Food West 1.560994e+08 312662 499.259318 FOOD

📖 Related: For writing functions that work across pandas, Polars, and PySpark without conversion, see Unified DataFrame Functions.

Decision Matrix
No single tool wins in every scenario. Use these tables to choose the right tool for your workflow.
Performance Summary
Benchmark results from 10 million rows on a single machine:

Operation
pandas
Polars
DuckDB

CSV Read (10M rows)
1.05s
137ms
762ms

GroupBy
271ms
31ms
29ms

Join (1M rows)
60ms
12ms
56ms

Memory Usage
707 MB
0.06 KB (streaming)
0.42 KB (spill-to-disk)

Polars leads in CSV reading (7.7× faster than pandas) and joins (5× faster). DuckDB matches Polars in groupby performance and uses the least memory with automatic spill-to-disk.
Feature Comparison
Each tool makes different trade-offs between speed, memory, and ecosystem integration:

Feature
pandas
Polars
DuckDB

Multi-threading
No
Yes
Yes

Lazy evaluation
No
Yes
N/A (SQL)

Query optimization
No
Yes
Yes

Larger-than-RAM
No
Streaming
Spill-to-disk

SQL interface
No
Limited
Native

ML integration
Excellent
Good
Limited

pandas lacks the performance features that make Polars and DuckDB fast, but remains essential for ML workflows. Choose between Polars and DuckDB based on whether you prefer DataFrame chaining or SQL syntax.
Recommendations
The best tool depends on your data size, workflow preferences, and constraints:

Small data (<1M rows): Use pandas for simplicity
Large data (1M-100M rows): Use Polars or DuckDB for 5-10× speedup
SQL-preferred workflow: Use DuckDB
DataFrame-preferred workflow: Use Polars
Memory-constrained: Use Polars (streaming) or DuckDB (spill-to-disk)
ML pipeline integration: Use pandas (convert from Polars/DuckDB as needed)
Production data pipelines: Use Polars (DataFrame) or DuckDB (SQL) based on team preference

Final Thoughts
If your code is all written in pandas, you don’t need to rewrite it all. You can migrate where it matters:

Profile first: Find which pandas operations are slow
Replace with Polars: CSV reads, groupbys, and joins see the biggest gains
Add DuckDB: When SQL is cleaner than chained DataFrame operations

Keep pandas for final ML steps. Convert with df.to_pandas() when needed.
Related Resources

Polars vs. Pandas: A Fast, Multi-Core Alternative for DataFrames
A Deep Dive into DuckDB for Data Scientists
Scaling Pandas Workflows with PySpark’s Pandas API
Delta Lake: Transform pandas Prototypes into Production

Favorite

pandas vs Polars vs DuckDB: A Data Scientist’s Guide to Choosing the Right Tool Read More »

Coiled: Scale Python Data Pipeline to the Cloud in Minutes

Table of Contents

Introduction
What is Coiled?
Setup
Serverless Functions: Process Data with Any Framework
When You Need More: Distributed Clusters with Dask
Cost Optimization
Environment Synchronization
Conclusion

Introduction
A common challenge data scientists face is that local machines simply can’t handle large-scale datasets. Once your analysis reaches the 50GB+ range, you’re pushed into difficult choices:

Sample your data and hope patterns hold
Buy more RAM or rent expensive cloud VMs
Learn Kubernetes and spend days configuring clusters
Build Docker images and manage container registries

Each option adds complexity, cost, or compromises your analysis quality.
Coiled eliminates these tradeoffs. It provisions emphemeral compute clusters on AWS, GCP, or Azure using simple Python APIs. You get distributed computing power without DevOps expertise, automatic environment synchronization without Docker, and 70% cost savings through smart spot instance management.
In this article, you’ll learn how to scale Python data workflows to the cloud with Coiled:

Serverless functions: Run pandas, Polars, or DuckDB code on cloud VMs with a simple decorator
Parallel processing: Process multiple files simultaneously across cloud machines
Distributed clusters: Aggregate data across files using managed Dask clusters
Environment sync: Replicate your local packages to the cloud without Docker
Cost optimization: Reduce cloud spending with spot instances and auto-scaling

💻 Get the Code: The complete source code and Jupyter notebook for this tutorial are available on GitHub. Clone it to follow along!

What is Coiled?
Coiled is a lightweight cloud platform that runs Python code on powerful cloud infrastructure without requiring Docker or Kubernetes knowledge. It supports four main capabilities:

Batch Jobs: Submit and run Python scripts asynchronously on cloud infrastructure
Serverless Functions: Execute Python functions (Pandas, Polars, PyTorch) on cloud VMs with decorators
Dask Clusters: Provision multi-worker clusters for distributed computing
Jupyter Notebooks: Launch interactive Jupyter servers directly on cluster schedulers

Key features across both:

Framework-Agnostic: Works with Pandas, Polars, Dask, or any Python library
Automatic Package Sync: Local packages replicate to cloud workers without Docker
Cost Optimization: Spot instances, adaptive scaling, and auto-shutdown reduce spending
Simple APIs: Decorate functions or create clusters with 2-3 lines of code

To install Coiled and Dask, run:
pip install coiled dask[complete]

Setup
First, create a free Coiled account by running this command in your terminal:
coiled login

This creates a free Coiled Hosted account with 200 CPU-hours per month. Your code runs on Coiled’s cloud infrastructure with no AWS/GCP/Azure account needed.
Note: For production workloads with your own cloud account, run coiled setup PROVIDER (setup guide).
Serverless Functions: Process Data with Any Framework
The simplest way to scale Python code to the cloud is with serverless functions. Decorate any function with @coiled.function, and Coiled handles provisioning cloud VMs, installing packages, and executing your code.
Scale Beyond Laptop Memory with Cloud VMs
Imagine you need to process a NYCTaxi dataset with 12GB compressed files (50GB+ expanded) on a laptop with only 16GB of RAM. Your machine simply doesn’t have enough memory to handle this workload.
With Coiled, you can run the exact same code on a cloud VM with 64GB of RAM by simply adding the @coiled.function decorator.
import coiled
import pandas as pd

@coiled.function(
memory="64 GiB",
region="us-east-1"
)
def process_month_with_pandas(month):
# Read 12GB file directly into pandas
df = pd.read_parquet(
f"https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-{month:02d}.parquet"
)

# Compute tipping patterns by hour
df["hour"] = df["tpep_pickup_datetime"].dt.hour
result = df.groupby("hour")["tip_amount"].mean()

return result

# Run on cloud VM with 64GB RAM
january_tips = process_month_with_pandas(1)
print(january_tips)

Output:
hour
0 3.326543
1 2.933899
2 2.768246
3 2.816333
4 3.132973

Name: tip_amount, dtype: float64

This function runs on a cloud VM with 64GB RAM, processes the entire month in memory, and returns just the aggregated result to your laptop.
You can view the function’s execution progress and resource usage in the Coiled dashboard.

Parallel Processing with .map()
By default Coiled Functions will run sequentially, just like normal Python functions. However, they can also easily run in parallel by using the .map() method.
Process all 12 months in parallel using .map():
import coiled
import pandas as pd

@coiled.function(memory="64 GiB", region="us-east-1")
def process_month(month):
df = pd.read_parquet(
f"https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-{month:02d}.parquet"
)
return df["tip_amount"].mean()

# Process 12 months in parallel on 12 cloud VMs
months = range(1, 13)
monthly_tips = list(process_month.map(months))

print("Average tips by month:", monthly_tips)

Output:
Average tips by month: [2.65, 2.58, 2.72, 2.68, 2.75, 2.81, 2.79, 2.73, 2.69, 2.71, 2.66, 2.62]

When you call .map() with 12 months, Coiled spins up 12 cloud VMs simultaneously, runs process_month() on each VM with a different month, then returns all results.
The execution flow:
VM 1: yellow_tripdata_2024-01.parquet → compute mean → 2.65
VM 2: yellow_tripdata_2024-02.parquet → compute mean → 2.58
VM 3: yellow_tripdata_2024-03.parquet → compute mean → 2.72
… (all running in parallel)
VM 12: yellow_tripdata_2024-12.parquet → compute mean → 2.62

Coiled collects: [2.65, 2.58, 2.72, …, 2.62]

Each VM works in complete isolation with no data sharing or coordination between them.

The dashboard confirms 12 tasks were executed, matching the 12 months we passed to .map().
Framework-Agnostic: Use Any Python Library
Coiled Functions aren’t limited to pandas. You can use any Python library (Polars, DuckDB, PyTorch, scikit-learn) without any additional configuration. The automatic package synchronization works for all dependencies.
Example with Polars:
Polars is a fast DataFrame library optimized for performance. It works seamlessly with Coiled:
import coiled
import polars as pl

@coiled.function(memory="64 GiB", region="us-east-1")
def process_with_polars(month):
df = pl.read_parquet(
f"https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-{month:02d}.parquet"
)
return (
df
.filter(pl.col("tip_amount") > 0)
.group_by("PULocationID")
.agg(pl.col("tip_amount").mean())
.sort("tip_amount", descending=True)
.head(5)
)

result = process_with_polars(1)
print(result)

Output:
shape: (5, 2)
┌──────────────┬────────────┐
│ PULocationID ┆ tip_amount │
│ — ┆ — │
│ i64 ┆ f64 │
╞══════════════╪════════════╡
│ 138 ┆ 4.52 │
│ 230 ┆ 4.23 │
│ 161 ┆ 4.15 │
│ 234 ┆ 3.98 │
│ 162 ┆ 3.87 │
└──────────────┴────────────┘

Example with DuckDB:
DuckDB provides fast SQL analytics directly on Parquet files:
import coiled
import duckdb

@coiled.function(memory="64 GiB", region="us-east-1")
def query_with_duckdb(month):
con = duckdb.connect()
result = con.execute(f"""
SELECT
DATE_TRUNC('hour', tpep_pickup_datetime) as pickup_hour,
AVG(tip_amount) as avg_tip,
COUNT(*) as trip_count
FROM 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-{month:02d}.parquet'
WHERE tip_amount > 0
GROUP BY pickup_hour
ORDER BY avg_tip DESC
LIMIT 5
""").fetchdf()
return result

result = query_with_duckdb(1)
print(result)

Output:
pickup_hour avg_tip trip_count
0 2024-01-15 14:00:00 4.23 15234
1 2024-01-20 18:00:00 4.15 18456
2 2024-01-08 12:00:00 3.98 12789
3 2024-01-25 16:00:00 3.87 14567
4 2024-01-12 20:00:00 3.76 16234

Coiled automatically detects your local Polars and DuckDB installations and replicates them to cloud VMs. No manual configuration needed.
When You Need More: Distributed Clusters with Dask
Serverless functions work great for independent file processing. However, when you need to combine and aggregate data across all your files into a single result, you need a Dask cluster.
For example, suppose you want to calculate total revenue by pickup location across all 12 months of data. With Coiled Functions, each VM processes one month independently:
@coiled.function(memory="64 GiB", region="us-east-1")
def get_monthly_revenue_by_location(month):
df = pd.read_parquet(
f"https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-{month:02d}.parquet"
)
return df.groupby("PULocationID")["total_amount"].sum()

# This returns 12 separate DataFrames, one per month
results = list(get_monthly_revenue_by_location.map(range(1, 13)))
print(f'Number of DataFrames: {len(results)}')

Output:
Number of DataFrames: 12

The problem is that you get 12 separate DataFrames that you need to manually combine.
Here’s what happens: VM 1 processes January and returns a DataFrame like:
PULocationID total_amount
138 15000
230 22000

VM 2 processes February and returns:
PULocationID total_amount
138 18000
230 19000

Each VM works independently and has no knowledge of the other months’ data. To get yearly totals per location, you’d need to write code to merge these 12 DataFrames and sum the revenue for each location.
With a Dask cluster, workers coordinate to give you one global result:
import coiled
import dask.dataframe as dd

# For production workloads, you can scale to 50+ workers
cluster = coiled.Cluster(n_workers=3, region="us-east-1")

# Read all 12 months of 2024 data
files = [
f"https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-{month:02d}.parquet"
for month in range(1, 13)
]
df = dd.read_parquet(files) # Lazy: builds a plan, doesn't load data yet

# This returns ONE DataFrame with total revenue per location across all months
total_revenue = (
df.groupby("PULocationID")["total_amount"].sum().compute()
) # Executes the plan

total_revenue.head()

Output:
PULocationID
1 563645.70
2 3585.80
3 67261.41
4 1687265.08
5 602.98
Name: total_amount, dtype: float64

You can see that we got a single DataFrame with the total revenue per location across all months.
Here is what happens under the hood:
When you call .compute(), Dask executes the plan in four steps:
Step 1: Data Distribution
├─ Worker 1: [Jan partitions 1-3, Apr partitions 1-2, Jul partitions 1-3]
├─ Worker 2: [Feb partitions 1-4, May partitions 1-3, Aug partitions 1-2]
└─ Worker 3: [Mar partitions 1-3, Jun partitions 1-2, Sep-Dec partitions]

Step 2: Local Aggregation (each worker groups its data)
├─ Worker 1: {location_138: $45,000, location_230: $63,000}
├─ Worker 2: {location_138: $38,000, location_230: $55,000}
└─ Worker 3: {location_138: $50,000, location_230: $62,000}

Step 3: Shuffle (redistribute so each location lives on one worker)
├─ Worker 1: All location_230 data → $63,000 + $55,000 + $62,000
├─ Worker 2: All location_138 data → $45,000 + $38,000 + $50,000
└─ Worker 3: All other locations…

Step 4: Final Result
location_138: $133,000 (yearly total)
location_230: $180,000 (yearly total)

This shuffle-and-combine process is what makes Dask different from Coiled Functions. Workers actively coordinate and share data to produce one unified result.
Cost Optimization
Cloud costs can spiral quickly. Coiled provides three mechanisms to reduce spending:
1. Spot Instances
You can reduce cloud costs by 60-90% using spot instances. These are discounted servers that cloud providers can reclaim when demand increases. When an interruption occurs, Coiled:

Gracefully shuts down the affected worker
Redistributes its work to healthy workers
Automatically launches a replacement worker

cluster = coiled.Cluster(
n_workers=50,
spot_policy="spot_with_fallback", # Use spot instances with on-demand backup
region="us-east-1"
)

Cost comparison for m5.xlarge instances:

On-demand: $0.192/hour
Spot: $0.05/hour
Savings: 74%

For a 100-worker cluster:

On-demand: $19.20/hour = $460/day
Spot: $5.00/hour = $120/day

2. Adaptive Scaling
Adaptive scaling automatically adds workers when you have more work and removes them when idle, so you only pay for what you need. Coiled enables this with the adapt() method:
cluster = coiled.Cluster(region="us-east-1")
cluster.adapt(minimum=10, maximum=50) # Scale between 10-50 workers

Serverless functions also support auto-scaling by specifying a worker range:
@coiled.function(n_workers=[10, 300])
def process_data(files):
return results

This saves money during light workloads while delivering performance during heavy computation. No manual monitoring required.
3. Automatic Shutdown
To prevent paying for unused resources, Coiled automatically shuts down clusters after 20 minutes of inactivity by default. You can customize this with the idle_timeout parameter:
cluster = coiled.Cluster(
n_workers=20,
region="us-east-1",
idle_timeout="1 hour" # Keep cluster alive for longer workloads
)

This prevents the common mistake of leaving clusters running overnight.
Environment Synchronization
The “Works on My Machine” Problem When Scaling to Cloud
Imagine this scenario: your pandas code works locally but fails on a cloud VM because the environment has different package versions or missing dependencies.
Docker solves this by packaging your environment into a container that runs identically on your laptop and cloud VMs. However, getting it running on cloud infrastructure involves a complex workflow:

Write a Dockerfile listing all dependencies and versions
Build the Docker image (wait 5-10 minutes)
Push to cloud container registry (AWS ECR, Google Container Registry)
Configure cloud VMs (EC2/GCE instances with proper networking and security)
Pull and run the image on cloud machines (3-5 minutes per VM)
Rebuild and redeploy every time you add a package (repeat steps 2-5)

This Docker + cloud workflow slows down development and requires expertise in both containerization and cloud infrastructure management.
Coiled’s Solution: Automatic Package Synchronization
Coiled eliminates Docker entirely through automatic package synchronization. Your local environment replicates to cloud workers with no Dockerfile required.
Instead of managing Docker images and cloud infrastructure, you simply add a decorator to your function:
import coiled
import pandas as pd

@coiled.function(memory="64 GiB", region="us-east-1")
def process_data():
df = pd.read_parquet("s3://my-bucket/data.parquet")
# Your analysis code here
return df.describe()

result = process_data() # Runs on cloud VM with your exact package versions

What Coiled does automatically:

Scans your local environment (pip, conda packages with exact versions)
Creates a dependency manifest (a list of all packages and their versions)
Installs packages on cloud workers with matching versions
Reuses built environments when your dependencies haven’t changed

This is faster than Docker builds in most cases thanks to intelligent caching, and requires zero configuration.
Conclusion
Coiled transforms cloud computing from a multi-day DevOps project into simple Python operations. Whether you’re processing a single large file with Pandas, querying multiple files with Polars, or running distributed aggregations with Dask clusters, Coiled provides the right scaling approach for your needs.
I recommend starting simple with serverless functions for single-file processing, then scale to Dask clusters when you need truly distributed computing. Coiled removes the infrastructure burden from data science workflows, letting you focus on analysis instead of operations.
Related Tutorials

Polars vs Pandas: Performance Benchmarks and When to Switch – Choose the right DataFrame library for your workloads
DuckDB: Fast SQL Analytics on Parquet Files – Master SQL techniques for processing data with DuckDB
PySpark Pandas API: Familiar Syntax at Scale – Explore Spark as an alternative to Dask for distributed processing

Favorite

Coiled: Scale Python Data Pipeline to the Cloud in Minutes Read More »

What’s New in PySpark 4.0: Arrow UDFs, Native Visualization, and Dynamic UDTFs

Table of Contents

Introduction
From Pandas UDFs to Arrow UDFs: Next-Gen Performance
Native Data Visualization (PySpark 4.0+)
Dynamic Schema Generation with UDTF analyze() (PySpark 4.0+)
Conclusion

Introduction
PySpark 4.0 introduces transformative improvements that enhance performance, streamline workflows, and enable flexible data transformations in distributed processing.
This release delivers three key enhancements:
Arrow-optimized UDFs accelerate custom transformations by operating directly on Arrow data structures, eliminating the serialization overhead of Pandas UDFs.
Native Plotly visualization enables direct DataFrame plotting without conversion, streamlining exploratory data analysis and reducing memory overhead.
Dynamic schema UDTFs adapt output columns to match input data at runtime, enabling flexible pivot tables and aggregations where column structure depends on data values.
For comprehensive coverage of core PySpark SQL functionality, see the Complete Guide to PySpark SQL.
From Pandas UDFs to Arrow UDFs: Next-Gen Performance
The pandas_udf function requires converting Arrow data to Pandas format and back again for each operation. This serialization cost becomes significant when processing large datasets.
PySpark 3.5+ introduces Arrow-optimized UDFs via the useArrow=True parameter, which operates directly on Arrow data structures, avoiding the Pandas conversion entirely and improving performance.
Let’s compare the performance with a weighted sum calculation across multiple columns on 100,000 rows:
import pandas as pd
import pyarrow.compute as pc
from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf, udf
from pyspark.sql.types import DoubleType

spark = SparkSession.builder.appName("UDFComparison").getOrCreate()

# Create test data with multiple numeric columns
data = [(float(i), float(i*2), float(i*3)) for i in range(100000)]
df = spark.createDataFrame(data, ["val1", "val2", "val3"])

Create a timing decorator to measure the execution time of the functions:
import time
from functools import wraps

# Timing decorator
def timer(func):
@wraps(func)
def wrapper(*args, **kwargs):
start = time.time()
result = func(*args, **kwargs)
elapsed = time.time() – start
print(f"{func.__name__}: {elapsed:.2f}s")
wrapper.elapsed_time = elapsed
return result

return wrapper

Use the timing decorator to measure the execution time of the pandas_udf function:
@pandas_udf(DoubleType())
def weighted_sum_pandas(v1: pd.Series, v2: pd.Series, v3: pd.Series) -> pd.Series:
return v1 * 0.5 + v2 * 0.3 + v3 * 0.2

@timer
def run_pandas_udf():
result = df.select(
weighted_sum_pandas(df.val1, df.val2, df.val3).alias("weighted")
)
result.count() # Trigger computation
return result

result_pandas = run_pandas_udf()
pandas_time = run_pandas_udf.elapsed_time

run_pandas_udf: 1.33s

Use the timing decorator to measure the execution time of the Arrow-optimized UDF using useArrow:
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType

@udf(DoubleType(), useArrow=True)
def weighted_sum_arrow(v1, v2, v3):
term1 = pc.multiply(v1, 0.5)
term2 = pc.multiply(v2, 0.3)
term3 = pc.multiply(v3, 0.2)
return pc.add(pc.add(term1, term2), term3)

@timer
def run_arrow_udf():
result = df.select(
weighted_sum_arrow(df.val1, df.val2, df.val3).alias("weighted")
)
result.count() # Trigger computation
return result

result_arrow = run_arrow_udf()
arrow_time = run_arrow_udf.elapsed_time

run_arrow_udf: 0.43s

Measure the speedup:
speedup = pandas_time / arrow_time
print(f"Speedup: {speedup:.2f}x faster")

Speedup: 3.06x faster

The output shows that the Arrow-optimized version is 3.06x faster than the pandas_udf version!
The performance gain comes from avoiding serialization. Arrow-optimized UDFs use PyArrow compute functions like pc.multiply() and pc.add() directly on Arrow data, while pandas_udf must convert each column to Pandas and back.
Trade-off: The 3.06x performance improvement comes at the cost of using PyArrow’s less familiar compute API instead of Pandas operations. However, this becomes increasingly valuable as dataset size and column count grow.
Native Data Visualization (PySpark 4.0+)
Visualizing PySpark DataFrames traditionally requires converting to Pandas first, then using external libraries like matplotlib or plotly. This adds memory overhead and extra processing steps.
PySpark 4.0 introduces a native plotting API powered by Plotly, enabling direct visualization from PySpark DataFrames without any conversion.
Let’s visualize sales data across product categories:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Visualization").getOrCreate()

# Create sample sales data
sales_data = [
("Electronics", 5000, 1200),
("Electronics", 7000, 1800),
("Clothing", 3000, 800),
("Clothing", 4500, 1100),
("Furniture", 6000, 1500),
("Furniture", 8000, 2000),
]

sales_df = spark.createDataFrame(sales_data, ["category", "sales", "profit"])
sales_df.show()

+———–+—–+——+
| category|sales|profit|
+———–+—–+——+
|Electronics| 5000| 1200|
|Electronics| 7000| 1800|
| Clothing| 3000| 800|
| Clothing| 4500| 1100|
| Furniture| 6000| 1500|
| Furniture| 8000| 2000|
+———–+—–+——+

Create a scatter plot directly from the PySpark DataFrame using the .plot() method:
# Direct plotting without conversion
sales_df.plot(kind="scatter", x="sales", y="profit", color="category")

You can also use shorthand methods such as plot.scatter() and plot.bar() for specific chart types:
# Scatter plot with shorthand
sales_df.plot.scatter(x="sales", y="profit", color="category")

# Bar chart by category
category_totals = sales_df.groupBy("category").agg({"sales": "sum"}).withColumnRenamed("sum(sales)", "total_sales")
category_totals.plot.bar(x="category", y="total_sales")

The native plotting API supports 8 chart types:
– scatter: Scatter plots with color grouping
– bar: Bar charts for categorical comparisons
– line: Line plots for time series
– area: Area charts for cumulative values
– pie: Pie charts for proportions
– box: Box plots for distributions
– histogram: Histograms for frequency analysis
– kde/density: Density plots for probability distributions
By default, PySpark visualizes up to 1,000 rows. For larger datasets, configure the limit:
# Increase visualization row limit
spark.conf.set("spark.sql.pyspark.plotting.max_rows", 5000)

Dynamic Schema Generation with UDTF analyze() (PySpark 4.0+)
Python UDTFs (User-Defined Table Functions) generate multiple rows from a single input row, but they come with a critical limitation: you must define the output schema upfront. When your output columns depend on the input data itself (like creating pivot tables or dynamic aggregations where column names come from data values), this rigid schema requirement becomes a problem.
For example, a word-counting UDTF requires you to specify all output columns upfront, even though the words themselves are unknown until runtime.
from pyspark.sql.functions import udtf, lit
from pyspark.sql.types import StructType, StructField, IntegerType

# Schema must be defined upfront with fixed column names
@udtf(returnType=StructType([
StructField("hello", IntegerType()),
StructField("world", IntegerType()),
StructField("spark", IntegerType())
]))
class StaticWordCountUDTF:
def eval(self, text: str):
words = text.split(" ")
yield tuple(words.count(word) for word in ["hello", "world", "spark"])

# Only works for exactly these three words
result = StaticWordCountUDTF(lit("hello world hello spark"))
result.show()

+—–+—–+—–+
|hello|world|spark|
+—–+—–+—–+
| 2| 1| 1|
+—–+—–+—–+

If the input text contains a different set of words, the output won’t contain the count of the new words.
result = StaticWordCountUDTF(lit("hi world hello spark"))
result.show()

+—–+—–+—–+
|hello|world|spark|
+—–+—–+—–+
| 1| 1| 1|
+—–+—–+—–+

PySpark 4.0 introduces the analyze() method for UDTFs, enabling dynamic schema determination based on input data. Instead of hardcoding your output schema, analyze() inspects the input and generates the appropriate columns at runtime.
from pyspark.sql.functions import udtf, lit
from pyspark.sql.types import StructType, IntegerType
from pyspark.sql.udtf import AnalyzeArgument, AnalyzeResult

@udtf
class DynamicWordCountUDTF:
@staticmethod
def analyze(text: AnalyzeArgument) -> AnalyzeResult:
"""Dynamically create schema based on input text"""
schema = StructType()
# Create one column per unique word in the input
for word in sorted(set(text.value.split(" "))):
schema = schema.add(word, IntegerType())
return AnalyzeResult(schema=schema)

def eval(self, text: str):
"""Generate counts for each word"""
words = text.split(" ")
# Use same logic as analyze() to determine column order
unique_words = sorted(set(words))
yield tuple(words.count(word) for word in unique_words)

# Schema adapts to any input text
result = DynamicWordCountUDTF(lit("hello world hello spark"))
result.show()

+—–+—–+—–+
|hello|spark|world|
+—–+—–+—–+
| 2| 1| 1|
+—–+—–+—–+

Now try with completely different words:
# Different words – schema adapts automatically
result2 = DynamicWordCountUDTF(lit("python data science"))
result2.show()

+—-+——+——-+
|data|python|science|
+—-+——+——-+
| 1| 1| 1|
+—-+——+——-+

The columns change from hello, spark, world to data, python, science without any code modifications.
Conclusion
PySpark 4.0 makes distributed computing faster and easier to use. Arrow-optimized UDFs speed up custom transformations, native visualization removes conversion steps, and dynamic UDTFs handle flexible data structures.
These improvements address real bottlenecks without requiring major code changes, making PySpark more practical for everyday data engineering tasks.
Favorite

What’s New in PySpark 4.0: Arrow UDFs, Native Visualization, and Dynamic UDTFs Read More »

Manim: Create Mathematical Animations Like 3Blue1Brown Using Python

Table of Contents

Motivation
What is Manim?
Create a Blue Square that Grows from the Center
Turn a Square into a Circle
Customize Manim
Write Mathematical Equations with a Moving Frame
Moving and Zooming Camera
Graph
Move Objects Together
Trace Path
Recap

Motivation
Have you ever struggled with math concepts in a machine learning algorithm and turned to 3Blue1Brown as a learning resource? 3Blue1Brown is a popular math YouTube channel created by Grant Sanderson, known for exceptional explanations and stunning animations.
What if you could create similar animations to explain data science concepts to your teammates, managers, or followers?
Grant developed a Python package called Manim that enables you to create mathematical animations or pictures using Python. In this article, you will learn how to create beautiful mathematical animations using Manim.

💻 Get the Code: The complete source code and Jupyter notebook for this tutorial are available on GitHub. Clone it to follow along!

What is Manim?
Manim is an animation engine for creating precise, explanatory math videos. Note that there are two versions of Manim:

The original version created by Grant Sanderson
The community-maintained version

Since the Manim Community version is updated more frequently and better tested, we’ll use it for this tutorial.
To install the dependencies for the package, visit the Documentation. After the dependencies are installed, run:
pip install manim

Create a Blue Square that Grows from the Center
We’ll start by creating a blue square that grows from the center:
from manim import *

class GrowingSquare(Scene):
def construct(self):
square = Square(color=BLUE, fill_opacity=0.5)
self.play(GrowFromCenter(square))
self.wait()

The code leverages four key Manim elements to produce the animation.

Scene: Base class providing animation infrastructure via construct() and play() methods
Square(color, fill_opacity): Mobject for creating squares with specified stroke color and fill transparency
GrowFromCenter: Growth animation starting from the object’s center
wait(): 1-second pause in animation timeline

Save the script above as start.py. Now run the command below to generate a video for the script:
manim -p -ql start.py GrowingSquare

A video called GrowingSquare.mp4 will be saved in your local directory. You should see a blue square growing from the center.

Explanation of the options:

-p: play the video once it finishes generating
-ql: generate a video with low quality

To generate a video with high quality, use -qh instead.
To create a GIF instead of a video, add –format=gif to the command:
manim -p -ql –format=gif start.py GrowingSquare

Turn a Square into a Circle
Creating a square alone is not that exciting. Let’s transform the square into a circle using the Transform animation.
from manim import *

class SquareToCircle(Scene):
def construct(self):
circle = Circle()
circle.set_fill(PINK, opacity=0.5)

square = Square()
square.rotate(PI / 4)

self.play(Create(square))
self.play(Transform(square, circle))
self.play(FadeOut(square))

This code demonstrates creating shapes, styling them, and applying different animation effects.

Circle: Creates a circular shape
set_fill(color, opacity): Sets the fill color and transparency (0.5 = 50% transparent)
rotate(angle): Rotates the object by an angle (PI/4 = 45 degrees)
Create: Animation that draws the shape onto the screen
Transform: Smoothly morphs one shape into another
FadeOut: Makes the object gradually disappear

Find a comprehensive list of shapes in the Manim geometry documentation.
Customize Manim
If you don’t want the background to be black, you can change it to white using self.camera.background_color:
from manim import *

class CustomBackground(Scene):
def construct(self):
self.camera.background_color = WHITE

square = Square(color=BLUE, fill_opacity=0.5)
circle = Circle(color=RED, fill_opacity=0.5)

self.play(Create(square))
self.wait()
self.play(Transform(square, circle))
self.wait()

Find other ways to customize Manim in the configuration documentation.
Write Mathematical Equations with a Moving Frame
You can create animations that write mathematical equations using the MathTex class:
from manim import *

class WriteEquation(Scene):
def construct(self):
equation = MathTex(r"e^{i\pi} + 1 = 0")

self.play(Write(equation))
self.wait()

This code demonstrates writing mathematical equations using LaTeX.

MathTex: Creates mathematical equations using LaTeX notation (the r prefix indicates a raw string for LaTeX commands)
Write: Animation that makes text appear as if being written by hand

Or show step-by-step solutions to equations:
from manim import *

class EquationSteps(Scene):
def construct(self):
step1 = MathTex(r"2x + 5 = 13")
step2 = MathTex(r"2x = 8")
step3 = MathTex(r"x = 4")

self.play(Write(step1))
self.wait()
self.play(Transform(step1, step2))
self.wait()
self.play(Transform(step1, step3))
self.wait()

Let’s break down this code:

Three equation stages: Separate MathTex objects for each step of the solution
Progressive transformation: Transform morphs the first equation through intermediate and final stages
Timed pauses: wait() creates natural breaks between steps for a teaching pace

You can also highlight specific parts of equations using frames:
from manim import *

class MovingFrame(Scene):
def construct(self):
# Write equations
equation = MathTex("2x^2-5x+2", "=", "(x-2)(2x-1)")

# Create animation
self.play(Write(equation))

# Add moving frames
framebox1 = SurroundingRectangle(equation[0], buff=.1)
framebox2 = SurroundingRectangle(equation[2], buff=.1)

# Create animations
self.play(Create(framebox1))

self.wait()
# Replace frame 1 with frame 2
self.play(ReplacementTransform(framebox1, framebox2))

self.wait()

In this code, we use the following elements:

MathTex with multiple arguments: Breaks the equation into separate parts that can be individually accessed and highlighted
SurroundingRectangle(mobject, buff): Creates a box around an equation part (buff=0.1 sets the space between the box and the content)
ReplacementTransform: Smoothly moves and morphs one frame into another, replacing the first frame with the second

Moving and Zooming Camera
You can adjust the camera and select which part of the equations to zoom in using a class inherited from MovingCameraScene:
from manim import *

class MovingCamera(MovingCameraScene):
def construct(self):
equation = MathTex(
r"\frac{d}{dx}(x^2) = 2x"
)

self.play(Write(equation))
self.wait()

# Zoom in on the derivative
self.play(
self.camera.frame.animate.scale(0.5).move_to(equation[0])
)
self.wait()

This code shows how to zoom in on specific parts of your animation.

MovingCameraScene: A special scene type that allows the camera to move and zoom
self.camera.frame: Represents the camera’s view that you can move and zoom
animate.scale(0.5): Zooms in by making the camera view 50% smaller (objects appear twice as big)
move_to(equation[0]): Centers the camera on the first part of the equation

Graph
You can use Manim to create annotated graphs:
from manim import *

class Graph(Scene):
def construct(self):
axes = Axes(
x_range=[-3, 3, 1],
y_range=[-5, 5, 1],
x_length=6,
y_length=6,
)

# Add labels
axes_labels = axes.get_axis_labels(x_label="x", y_label="f(x)")

# Create a function graph
graph = axes.plot(lambda x: x**2, color=BLUE)
graph_label = axes.get_graph_label(graph, label="x^2")

self.add(axes, axes_labels)
self.play(Create(graph))
self.play(Write(graph_label))
self.wait()

This code shows how to create mathematical function graphs with labeled axes.

Axes: Creates a coordinate grid with specified ranges (e.g., x from -3 to 3, y from -5 to 5) and sizes
get_axis_labels: Adds “x” and “f(x)” labels to the coordinate axes
plot(lambda x: x2)**: Plots a mathematical function (here, x squared) as a curve on the axes
get_graph_label: Adds a label showing the function’s equation next to the plotted curve
self.add: Instantly displays objects on screen without animating them in

If you want to get an image of the last frame of a scene, add -s to the command:
manim -p -qh -s more.py Graph

You can also animate the process of setting up the axes:
manim -p -qh more.py Graph

Move Objects Together
You can use VGroup to group different Manim objects and move them together:
from manim import *

class MoveObjectsTogether(Scene):
def construct(self):
square = Square(color=BLUE)
circle = Circle(color=RED)

# Group objects
group = VGroup(square, circle)
group.arrange(RIGHT, buff=1)

self.play(Create(group))
self.wait()

# Move the entire group
self.play(group.animate.shift(UP * 2))
self.wait()

This code shows how to group objects and move them together as one unit.

VGroup: Groups multiple objects together so you can control them all at once (like selecting multiple items)
arrange(RIGHT, buff=1): Lines up the objects horizontally from left to right with 1 unit of space between them
shift(UP * 2): Moves the entire group upward by 2 units (multiplying by 2 makes it move twice as far)

You can also create multiple groups and manipulate them independently or together:
from manim import *

class GroupCircles(Scene):
def construct(self):
# Create circles
circle_green = Circle(color=GREEN)
circle_blue = Circle(color=BLUE)
circle_red = Circle(color=RED)

# Set initial positions
circle_green.shift(LEFT)
circle_blue.shift(RIGHT)

# Create 2 different groups
gr = VGroup(circle_green, circle_red)
gr2 = VGroup(circle_blue)
self.add(gr, gr2)
self.wait()

# Shift 2 groups down
self.play((gr + gr2).animate.shift(DOWN))

# Move only 1 group
self.play(gr.animate.shift(RIGHT))
self.play(gr.animate.shift(UP))

# Shift 2 groups to the right
self.play((gr + gr2).animate.shift(RIGHT))
self.play(circle_red.animate.shift(RIGHT))
self.wait()

Trace Path
You can use TracedPath to create a trace of a moving object:
from manim import *

class TracePath(Scene):
def construct(self):
dot = Dot(color=RED)

# Create traced path
path = TracedPath(dot.get_center, stroke_color=BLUE, stroke_width=4)
self.add(path, dot)

# Move the dot in a circular pattern
self.play(
MoveAlongPath(dot, Circle(radius=2)),
rate_func=linear,
run_time=4
)
self.wait()

This code shows how to create a trail that follows a moving object.

Dot: A small circular point that you can move around
TracedPath(dot.get_center): Creates a line that draws itself following the dot’s path (like a pen trail)
get_center: Gets the current position of the dot so TracedPath knows where to draw
MoveAlongPath(dot, Circle(radius=2)): Moves the dot along a circular path with radius 2
rate_func=linear: Makes the dot move at a constant speed (no speeding up or slowing down)
run_time=4: The animation takes 4 seconds to complete

For a more complex example, you can create a rolling circle that traces a cycloid pattern:
from manim import *

class RollingCircleTrace(Scene):
def construct(self):
# Create circle and dot
circ = Circle(color=BLUE).shift(4*LEFT)
dot = Dot(color=BLUE).move_to(circ.get_start())

# Group dot and circle
rolling_circle = VGroup(circ, dot)
trace = TracedPath(circ.get_start)

# Rotate the circle
rolling_circle.add_updater(lambda m: m.rotate(-0.3))

# Add trace and rolling circle to the scene
self.add(trace, rolling_circle)

# Shift the circle to 8*RIGHT
self.play(rolling_circle.animate.shift(8*RIGHT), run_time=4, rate_func=linear)

This code creates a rolling wheel animation that draws a curved pattern as it moves.

Setup: Creates a blue circle at the left side and places a blue dot at the circle’s edge
Grouping: Uses VGroup to combine the circle and dot so they move together
Path tracking: TracedPath watches the circle’s starting point and draws a line following it
Continuous rotation: add_updater makes the circle rotate automatically every frame, creating the rolling effect
Rolling motion: As the group shifts right, the rotation updater makes it look like a wheel rolling across the screen
Cycloid pattern: The traced path creates a wave-like curve showing the mathematical path a point on a rolling wheel makes

Recap
Congratulations! You have just learned how to use Manim and what it can do. To recap, there are three kinds of objects that Manim provides:

Mobjects: Objects that can be displayed on the screen, such as Circle, Square, Matrix, Angle, etc.
Scenes: Canvas for animations such as Scene, MovingCameraScene, etc.
Animations: Animations applied to Mobjects such as Write, Create, GrowFromCenter, Transform, etc.

There is so much more Manim can do that I cannot cover here. The best way to learn is through practice, so I encourage you to try the examples in this article and check out Manim’s tutorial.
Favorite

Manim: Create Mathematical Animations Like 3Blue1Brown Using Python Read More »

Behave: Write Readable ML Tests with Behavior-Driven Development

Table of Contents

Motivation
What is behave?
Invariance Testing
Directional Testing
Minimum Functionality Testing
Behave’s Trade-offs
Conclusion

Motivation
Imagine you create an ML model to predict customer sentiment based on reviews. Upon deploying it, you realize that the model incorrectly labels certain positive reviews as negative when they’re rephrased using negative words.

This is just one example of how an extremely accurate ML model can fail without proper testing. Thus, testing your model for accuracy and reliability is crucial before deployment.
But how do you test your ML model? One straightforward approach is to use unit-test:
from textblob import TextBlob

def test_sentiment_the_same_after_paraphrasing():
sent = "The hotel room was great! It was spacious, clean and had a nice view of the city."
sent_paraphrased = "The hotel room wasn't bad. It wasn't cramped, dirty, and had a decent view of the city."

sentiment_original = TextBlob(sent).sentiment.polarity
sentiment_paraphrased = TextBlob(sent_paraphrased).sentiment.polarity

both_positive = (sentiment_original > 0) and (sentiment_paraphrased > 0)
both_negative = (sentiment_original < 0) and (sentiment_paraphrased < 0)
assert both_positive or both_negative

This approach works but can be challenging for non-technical or business participants to understand. Wouldn’t it be nice if you could incorporate project objectives and goals into your tests, expressed in natural language?
Feature: Sentiment Analysis
As a data scientist
I want to ensure that my model is invariant to paraphrasing
So that my model can produce consistent results.

Scenario: Paraphrased text
Given a text
When the text is paraphrased
Then both text should have the same sentiment

That is when behave comes in handy.

💻 Get the Code: The complete source code and Jupyter notebook for this tutorial are available on GitHub. Clone it to follow along!

What is behave?
behave is a Python framework for behavior-driven development (BDD). BDD is a software development methodology that:

Emphasizes collaboration between stakeholders (such as business analysts, developers, and testers)
Enables users to define requirements and specifications for a software application

Since behave provides a common language and format for expressing requirements and specifications, it can be ideal for defining and validating the behavior of machine learning models.
To install behave, type:
pip install behave

Let’s use behave to perform various tests on machine learning models.

📚 For comprehensive unit testing strategies and best practices, check out Production-Ready Data Science.

Invariance Testing
Invariance testing tests whether an ML model produces consistent results under different conditions.
An example of invariance testing involves verifying if a model is invariant to paraphrasing. An ideal model should maintain consistent sentiment scores even when a positive review is rephrased using negative words like “wasn’t bad” instead of “was good.”

Feature File
To use behave for invariance testing, create a directory called features. Under that directory, create a file called invariant_test_sentiment.feature.
└── features/
└─── invariant_test_sentiment.feature

Within the invariant_test_sentiment.feature file, we will specify the project requirements:
Feature: Sentiment Analysis
As a data scientist
I want to ensure that my model is invariant to paraphrasing
So that my model can produce consistent results.

Scenario: Paraphrased text
Given a text
When the text is paraphrased
Then both text should have the same sentiment

The “Given,” “When,” and “Then” parts of this file present the actual steps that will be executed by behave during the test.
The Feature section serves as living documentation to provide context but does not trigger test execution.
Python Step Implementation
To implement the steps used in the scenarios with Python, start with creating the features/steps directory and a file called invariant_test_sentiment.py within it:
└── features/
├──── invariant_test_sentiment.feature
└──── steps/
└──── invariant_test_sentiment.py

The invariant_test_sentiment.py file contains the following code, which tests whether the sentiment produced by the TextBlob model is consistent between the original text and its paraphrased version.
from behave import given, then, when
from textblob import TextBlob

@given("a text")
def step_given_positive_sentiment(context):
context.sent = "The hotel room was great! It was spacious, clean and had a nice view of the city."

@when("the text is paraphrased")
def step_when_paraphrased(context):
context.sent_paraphrased = "The hotel room wasn't bad. It wasn't cramped, dirty, and had a decent view of the city."

@then("both text should have the same sentiment")
def step_then_sentiment_analysis(context):
# Get sentiment of each sentence
sentiment_original = TextBlob(context.sent).sentiment.polarity
sentiment_paraphrased = TextBlob(context.sent_paraphrased).sentiment.polarity

# Print sentiment
print(f"Sentiment of the original text: {sentiment_original:.2f}")
print(f"Sentiment of the paraphrased sentence: {sentiment_paraphrased:.2f}")

# Assert that both sentences have the same sentiment
both_positive = (sentiment_original > 0) and (sentiment_paraphrased > 0)
both_negative = (sentiment_original < 0) and (sentiment_paraphrased < 0)
assert both_positive or both_negative

Explanation of the code above:

The steps are identified using decorators matching the feature’s predicate: given, when, and then.
The decorator accepts a string containing the rest of the phrase in the matching scenario step.
The context variable allows you to share values between steps.

Run the Test
To run the invariant_test_sentiment.feature test, type the following command:
behave features/invariant_test_sentiment.feature

Output:
Feature: Sentiment Analysis # features/invariant_test_sentiment.feature:1
As a data scientist
I want to ensure that my model is invariant to paraphrasing
So that my model can produce consistent results in real-world scenarios.
Scenario: Paraphrased text
Given a text
When the text is paraphrased
Then both text should have the same sentiment
Traceback (most recent call last):
assert both_positive or both_negative
AssertionError

Captured stdout:
Sentiment of the original text: 0.66
Sentiment of the paraphrased sentence: -0.38

Failing scenarios:
features/invariant_test_sentiment.feature:6 Paraphrased text

0 features passed, 1 failed, 0 skipped
0 scenarios passed, 1 failed, 0 skipped
2 steps passed, 1 failed, 0 skipped, 0 undefined

The output shows that the first two steps passed and the last step failed, indicating that the model is affected by paraphrasing.
Directional Testing
Directional testing is a statistical method used to assess whether the impact of an independent variable on a dependent variable is in a particular direction, either positive or negative.
An example of directional testing is to check whether the presence of a specific word has a positive or negative effect on the sentiment score of a given text.

To use behave for directional testing, we will create two files directional_test_sentiment.feature and directional_test_sentiment.py.
└── features/
├──── directional_test_sentiment.feature
└──── steps/
└──── directional_test_sentiment.py

Feature File
The code in directional_test_sentiment.feature specifies the requirements of the project as follows:
Feature: Sentiment Analysis with Specific Word
As a data scientist
I want to ensure that the presence of a specific word
has a positive or negative effect on the sentiment score of a text

Scenario: Sentiment analysis with specific word
Given a sentence
And the same sentence with the addition of the word 'awesome'
When I input the new sentence into the model
Then the sentiment score should increase

Notice that “And” is added to the prose. Since the preceding step starts with “Given,” behave will rename “And” to “Given.”
Python Step Implementation
The code in directional_test_sentiment.py implements a test scenario, which checks whether the presence of the word “awesome ” positively affects the sentiment score generated by the TextBlob model.
from behave import given, then, when
from textblob import TextBlob

@given("a sentence")
def step_given_positive_word(context):
context.sent = "I love this product"

@given("the same sentence with the addition of the word '{word}'")
def step_given_a_positive_word(context, word):
context.new_sent = f"I love this {word} product"

@when("I input the new sentence into the model")
def step_when_use_model(context):
context.sentiment_score = TextBlob(context.sent).sentiment.polarity
context.adjusted_score = TextBlob(context.new_sent).sentiment.polarity

@then("the sentiment score should increase")
def step_then_positive(context):
assert context.adjusted_score > context.sentiment_score

The second step uses the parameter syntax {word}. When the .feature file is run, the value specified for {word} in the scenario is automatically passed to the corresponding step function.
This means that if the scenario states that the same sentence should include the word “awesome,” behave will automatically replace {word} with “awesome.”

This conversion is useful when you want to use different values for the {word} parameter without changing both the .feature file and the .py file.

Run the Test
behave features/directional_test_sentiment.feature

Output:
Feature: Sentiment Analysis with Specific Word
As a data scientist
I want to ensure that the presence of a specific word has a positive or negative effect on the sentiment score of a text
Scenario: Sentiment analysis with specific word
Given a sentence
And the same sentence with the addition of the word 'awesome'
When I input the new sentence into the model
Then the sentiment score should increase

1 feature passed, 0 failed, 0 skipped
1 scenario passed, 0 failed, 0 skipped
4 steps passed, 0 failed, 0 skipped, 0 undefined

Since all the steps passed, we can infer that the sentiment score increases due to the new word’s presence.
Minimum Functionality Testing
Minimum functionality testing is a type of testing that verifies if the system or product meets the minimum requirements and is functional for its intended use.
One example of minimum functionality testing is to check whether the model can handle different types of inputs, such as numerical, categorical, or textual data. To test with diverse inputs, generate test data using Faker for more comprehensive validation.

To use minimum functionality testing for input validation, create two files minimum_func_test_input.feature and minimum_func_test_input.py.
└── features/
├──── minimum_func_test_input.feature
└──── steps/
└──── minimum_func_test_input.py

Feature File
The code in minimum_func_test_input.feature specifies the project requirements as follows:
Feature: Test my_ml_model

Scenario: Test integer input
Given I have an integer input of 42
When I run the model
Then the output should be an array of one number

Scenario: Test float input
Given I have a float input of 3.14
When I run the model
Then the output should be an array of one number

Scenario: Test list input
Given I have a list input of [1, 2, 3]
When I run the model
Then the output should be an array of three numbers

Python Step Implementation
The code in minimum_func_test_input.py implements the requirements, checking if the output generated by predict for a specific input type meets the expectations.
from behave import given, then, when

import numpy as np
from sklearn.linear_model import LinearRegression
from typing import Union

def predict(input_data: Union[int, float, str, list]):
"""Create a model to predict input data"""

# Reshape the input data
if isinstance(input_data, (int, float, list)):
input_array = np.array(input_data).reshape(-1, 1)
else:
raise ValueError("Input type not supported")

# Create a linear regression model
model = LinearRegression()

# Train the model on a sample dataset
X = np.array([[1], [2], [3], [4], [5]])
y = np.array([2, 4, 6, 8, 10])
model.fit(X, y)

# Predict the output using the input array
return model.predict(input_array)

@given("I have an integer input of {input_value}")
def step_given_integer_input(context, input_value):
context.input_value = int(input_value)

@given("I have a float input of {input_value}")
def step_given_float_input(context, input_value):
context.input_value = float(input_value)

@given("I have a list input of {input_value}")
def step_given_list_input(context, input_value):
context.input_value = eval(input_value)

@when("I run the model")
def step_when_run_model(context):
context.output = predict(context.input_value)

@then("the output should be an array of one number")
def step_then_check_output(context):
assert isinstance(context.output, np.ndarray)
assert all(isinstance(x, (int, float)) for x in context.output)
assert len(context.output) == 1

@then("the output should be an array of three numbers")
def step_then_check_output(context):
assert isinstance(context.output, np.ndarray)
assert all(isinstance(x, (int, float)) for x in context.output)
assert len(context.output) == 3

Run the Test
behave features/minimum_func_test_input.feature

Output:
Feature: Test my_ml_model

Scenario: Test integer input
Given I have an integer input of 42
When I run the model
Then the output should be an array of one number

Scenario: Test float input
Given I have a float input of 3.14
When I run the model
Then the output should be an array of one number

Scenario: Test list input
Given I have a list input of [1, 2, 3]
When I run the model
Then the output should be an array of three numbers

1 feature passed, 0 failed, 0 skipped
3 scenarios passed, 0 failed, 0 skipped
9 steps passed, 0 failed, 0 skipped, 0 undefined

Since all the steps passed, we can conclude that the model outputs match our expectations.
Behave’s Trade-offs
This section will outline some drawbacks of using behave compared to pytest, and explain why it may still be worth considering the tool.
Learning Curve
Using Behavior-Driven Development (BDD) in behavior may result in a steeper learning curve than the more traditional testing approach used by pytest.

Counter argument: The focus on collaboration in BDD can lead to better alignment between business requirements and software development, resulting in a more efficient development process overall.

Slower performance
behave tests can be slower than pytest tests because behave must parse the feature files and map them to step definitions before running the tests.

Counter argument: behave’s focus on well-defined steps can lead to tests that are easier to understand and modify, reducing the overall effort required for test maintenance.

Less flexibility
behave is more rigid in its syntax, while pytest allows more flexibility in defining tests and fixtures.

Counter argument: behave’s rigid structure can help ensure consistency and readability across tests, making them easier to understand and maintain over time.

Conclusion
You’ve learned how to use behave to write readable tests for a data science project.
Key takeaways:
How behave works:

Feature files serve as living documentation: They communicate test intent in natural language while driving actual test execution
Step decorators bridge features and code: @given, @when, and @then decorators map feature file steps to Python test implementations

Three essential test types:

Invariance testing: Ensures your model produces consistent results when inputs are paraphrased or slightly modified
Directional testing: Validates that specific changes have the expected positive or negative impact on predictions
Minimum functionality testing: Verifies your model handles different input types correctly

Despite trade-offs like a steeper learning curve and slower performance compared to pytest, behave excels where it matters most for ML testing: making model behavior transparent and testable by both technical and non-technical team members.
Related Tutorials

Configuration Management: Hydra for Python Configuration for managing test specifications with YAML-like syntax

Favorite

Behave: Write Readable ML Tests with Behavior-Driven Development Read More »

Build Production-Ready LLM Agents with LangChain 1.0 Middleware

Table of Contents

Introduction
Introduction to Middleware Pattern
Installation
Message Summarization
PII Detection and Filtering
Human-in-the-Loop
Task Planning
Intelligent Tool Selection
Building a Production Agent with Multiple Middleware
Final Thoughts

Introduction
Have you ever wanted to extend your LLM agent with custom behaviors like:

Summarizing messages to manage context windows
Filtering PII to protect sensitive data
Requesting human approval for critical actions

…but weren’t sure how to build them?
If you’ve tried this in LangChain v0.x, you probably ran into complex pre/post hooks that were hard to scale or test.
LangChain 1.0 introduces a composable middleware architecture that solves these problems by providing reusable, testable components that follow web server middleware patterns.

💻 Get the Code: The complete source code and Jupyter notebook for this tutorial are available on GitHub. Clone it to follow along!

Introduction to Middleware Pattern
Building on the LangChain fundamentals we covered earlier, LangChain 1.0 introduces middleware components that give you fine-grained control over agent execution. Each middleware is a self-contained component that:

Focuses on a single responsibility (monitor, modify, control, or enforce)
Can be tested independently
Composes with other middleware through a standard interface

The four middleware categories are:

Monitor: Track agent behavior with logging, analytics, and debugging
Modify: Transform prompts, tool selection, and output formatting
Control: Add retries, fallbacks, and early termination logic
Enforce: Apply rate limits, guardrails, and PII detection

This article covers five essential middleware components:

Message summarization (modify): Manage context windows by condensing long conversations
PII filtering (enforce): Protect sensitive data by redacting emails and phone numbers
Human-in-the-loop (control): Pause execution for critical actions requiring approval
Task planning (modify): Structure complex requests into manageable subtasks
Intelligent tool selection (modify): Pre-filter tools to reduce costs and improve accuracy

Let’s explore how each middleware component improves production agent workflows.
Installation
Install LangChain 1.0 and the OpenAI integration:
# Option 1: pip
pip install langchain langchain-openai

# Option 2: uv (faster alternative to pip)
uv add langchain langchain-openai

Note: If you’re upgrading from LangChain v0.x, add the –U flag: pip install –U langchain langchain-openai

You’ll also need an OpenAI API key:
export OPENAI_API_KEY="your-api-key-here"

Message Summarization
When building conversational agents, message history grows with each turn. Long conversations quickly exceed model context windows, causing API errors or degraded performance.
SummarizationMiddleware automates this by:

Monitoring token count across the conversation
Condensing older messages when thresholds are exceeded
Preserving recent context for immediate relevance

The benefits:

Reduced API costs from sending fewer tokens per request
Faster responses with smaller context windows
Complete context through summaries plus full recent history

Here’s how to use SummarizationMiddleware as part of an agent:
from langchain.agents import create_agent
from langchain.agents.middleware import SummarizationMiddleware

agent = create_agent(
model="openai:gpt-4o",
tools=[],
middleware=[
SummarizationMiddleware(
model="openai:gpt-4o-mini",
max_tokens_before_summary=400,
messages_to_keep=5
)
]
)

This configuration sets up automatic conversation management:

model="openai:gpt-4o" – The primary model for agent responses
max_tokens_before_summary=400 – Triggers summarization when conversation exceeds 400 tokens
messages_to_keep=5 – Preserves the 5 most recent messages in full
model="openai:gpt-4o-mini" – Uses a faster, cheaper model for creating summaries

Note: These configuration values are set low for demonstration purposes to quickly show summarization behavior. Production applications typically use max_tokens_before_summary=4000 and messages_to_keep=20 (the recommended defaults).

Let’s use this agent to simulate a customer support conversation and track token usage.
First, let’s set up a realistic customer support conversation with multiple turns:
# Simulate a customer support conversation
conversation_turns = [
"I ordered a laptop last week but haven't received it yet. Order #12345.",
"Can you check the shipping status? I need it for work next Monday.",
"Also, I originally wanted the 16GB RAM model but ordered 8GB by mistake.",
"Is it too late to change the order? Or should I return and reorder?",
"What's your return policy on laptops? Do I need the original packaging?",
"If I return it, how long does the refund take to process?",
"Can I get expedited shipping on the replacement 16GB model?",
"Does the 16GB version come with the same warranty as the 8GB?",
"Are there any promotional codes I can use for the new order?",
"What if the new laptop arrives damaged? What's the process?",
]

Next, define helper functions to track token usage and verify summarization:

estimate_token_count(): Calculates approximate tokens by counting words in all messages
get_actual_tokens(): Extracts the actual token count from the model’s response metadata
print_token_comparison(): Displays estimated vs actual tokens to show when summarization occurs

def estimate_token_count(messages):
"""Estimate total tokens in message history."""
return sum(len(msg.content.split()) * 1.3 for msg in messages)

def get_actual_tokens(response):
"""Extract actual token count from response metadata."""
last_ai_message = response["messages"][-1]
if hasattr(last_ai_message, 'usage_metadata') and last_ai_message.usage_metadata:
return last_ai_message.usage_metadata.get("input_tokens", 0)
return None

def print_token_comparison(turn_number, estimated, actual):
"""Print token count comparison for a conversation turn."""
if actual is not None:
print(f"Turn {turn_number}: ~{int(estimated)} tokens (estimated) → {actual} tokens (actual)")
else:
print(f"Turn {turn_number}: ~{int(estimated)} tokens (estimated)")

Finally, run the conversation and observe token usage across turns:
messages = []
for i, question in enumerate(conversation_turns, 1):
messages.append(HumanMessage(content=question))

estimated_tokens = estimate_token_count(messages)
response = agent.invoke({"messages": messages})
messages.extend(response["messages"][len(messages):])

actual_tokens = get_actual_tokens(response)
print_token_comparison(i, estimated_tokens, actual_tokens)

Output:
Turn 1: ~16 tokens (estimated) → 24 tokens (actual)
Turn 2: ~221 tokens (estimated) → 221 tokens (actual)
Turn 3: ~408 tokens (estimated) → 415 tokens (actual)
Turn 4: ~646 tokens (estimated) → 509 tokens (actual)
Turn 5: ~661 tokens (estimated) → 524 tokens (actual)
Turn 6: ~677 tokens (estimated) → 379 tokens (actual)
Turn 7: ~690 tokens (estimated) → 347 tokens (actual)
Turn 8: ~705 tokens (estimated) → 184 tokens (actual)
Turn 9: ~721 tokens (estimated) → 204 tokens (actual)
Turn 10: ~734 tokens (estimated) → 195 tokens (actual)

Notice the pattern in the token counts:

Turns 1-3: Tokens grow steadily (24 → 221 → 415) as the conversation builds
Turn 4: Summarization kicks in with actual tokens dropping to 509 despite 646 estimated
Turn 8: Most dramatic reduction with only 184 actual tokens sent vs 705 estimated (74% reduction!)

Once past the 400-token threshold, the middleware automatically condenses older messages while preserving the 5 most recent turns. This keeps token usage low even as the conversation continues.
PII Detection and Filtering
Customer support conversations often contain sensitive information like email addresses, phone numbers, and account IDs. Logging or storing this data without redaction creates compliance and security risks.
PIIMiddleware automatically protects personally identifiable information (PII) by:

Built-in detectors for common PII types (email, credit cards, IP addresses)
Custom regex patterns for domain-specific sensitive data
Multiple protection strategies: redact, mask, hash, or block
Automatic application to all messages before model processing

First, configure the agent with multiple PII detectors:
Each detector in this example demonstrates a different protection strategy:

Email detector: Uses built-in pattern with redact strategy (complete replacement)
Phone detector: Uses custom regex \b\d{3}-\d{3}-\d{4}\b with mask strategy (partial visibility)
Account ID detector: Uses custom pattern \b[A-Z]{2}\d{8}\b with redact strategy (complete removal)

from langchain.agents import create_agent
from langchain.agents.middleware import PIIMiddleware
from langchain_core.messages import HumanMessage

agent = create_agent(
model="openai:gpt-4o",
tools=[],
middleware=[
# Built-in email detector – replaces emails with [REDACTED_EMAIL]
PIIMiddleware("email", strategy="redact", apply_to_input=True),
# Custom phone number pattern – shows only last 4 digits
PIIMiddleware(
"phone",
detector=r"\b\d{3}-\d{3}-\d{4}\b",
strategy="mask",
apply_to_input=True,
),
# Custom regex pattern for account IDs (e.g., AB12345678)
PIIMiddleware(
"account_id",
detector=r"\b[A-Z]{2}\d{8}\b",
strategy="redact",
apply_to_input=True,
),
],
)

Next, create a message containing sensitive information and invoke the agent:
# Create a message with PII
original_message = HumanMessage(content="My email is john@example.com, phone is 555-123-4567, and account is AB12345678")
print(f"Original message: {original_message.content}")

# Invoke the agent
response = agent.invoke({"messages": [original_message]})

Output:
Original message: My email is john@example.com, phone is 555-123-4567, and account is AB12345678

Finally, inspect the message that was actually sent to the model to verify redaction:
# Check what was actually sent to the model (after PII redaction)
input_message = response["messages"][0]
print(f"Message sent to model: {input_message.content}")

Output:
Message sent to model: My email is [REDACTED_EMAIL], phone is ****4567, and account is [REDACTED_ACCOUNT_ID]

The middleware successfully processed all three types of sensitive information:

Email: Completely redacted to [REDACTED_EMAIL]
Phone: Masked to show only last 4 digits (****4567)
Account ID: Completely redacted to [REDACTED_ACCOUNT_ID]

Human-in-the-Loop
Autonomous agents can perform sensitive actions like processing refunds or modifying account settings. Executing these without human oversight creates risk of errors or abuse.
HumanInTheLoopMiddleware automates approval workflows by pausing execution and waiting for approval before proceeding:
from langchain.agents import create_agent
from langchain.agents.middleware import HumanInTheLoopMiddleware
from langchain_core.tools import tool
from langgraph.checkpoint.memory import MemorySaver

@tool
def process_refund(amount: float, reason: str) -> str:
"""Process a customer refund. Use this when a customer requests a refund."""
return f"Refund of ${amount} processed for reason: {reason}"

# Create memory checkpointer for state persistence
memory = MemorySaver()

agent = create_agent(
model="openai:gpt-4o",
tools=[process_refund],
middleware=[HumanInTheLoopMiddleware(interrupt_on={"process_refund": True})],
checkpointer=memory, # Required for state persistence
system_prompt="You are a customer support agent. Use the available tools to help customers. When a customer asks for a refund, use the process_refund tool.",
)

This configuration sets up an agent that:

Uses HumanInTheLoopMiddleware to pause execution before calling process_refund
Uses a checkpointer (MemorySaver) to save agent state during interruptions, allowing execution to resume after approval

Now let’s invoke the agent with a refund request:
# Agent pauses before executing sensitive tools
response = agent.invoke(
{"messages": [("user", "I need a refund of $100 for my damaged laptop")]},
config={"configurable": {"thread_id": "user-123"}},
)

The agent will pause when it tries to process the refund. To verify this happened, let’s define helper functions for interrupt detection.
def has_interrupt(response):
"""Check if response contains an interrupt."""
return "__interrupt__" in response

def display_action(action):
"""Display pending action details."""
print(f"Pending action: {action['name']}")
print(f"Arguments: {action['args']}")
print()

def get_user_approval():
"""Prompt user for approval and return decision."""
approval = input("Approve this action? (yes/no): ")
if approval.lower() == "yes":
print("✓ Action approved")
return True
else:
print("✗ Action rejected")
return False

Now use these helpers to check for interrupts and process approval:
if has_interrupt(response):
print("Execution interrupted – waiting for approval\n")

interrupts = response["__interrupt__"]
for interrupt in interrupts:
for action in interrupt.value["action_requests"]:
display_action(action)
approved = get_user_approval()

Output:
Execution interrupted – waiting for approval

Pending action: process_refund
Arguments: {'amount': 100, 'reason': 'Damaged Laptop'}

Approve this action? (yes/no): yes
✓ Action approved

The middleware successfully intercepted the process_refund tool call before execution, displaying all necessary details (action name and arguments) for human review. Only after explicit approval does the agent proceed with the sensitive operation.
Task Planning
Complex tasks like “refactor my codebase” or “analyze this dataset” require breaking down into smaller, manageable steps. Without explicit planning, agents often might jump between subtasks randomly or skip critical steps entirely.
TodoListMiddleware enables structured task management by:

Automatically providing a write_todos tool for task planning
Tracking completion status across multi-step workflows
Returning structured todo items in agent results

The benefits:

Better task decomposition through explicit step-by-step planning
Progress tracking to monitor complex workflow completion
Reduced errors from skipped or forgotten subtasks

Here’s how to enable planning for an agent:
from langchain.agents import create_agent
from langchain.agents.middleware import TodoListMiddleware
from langchain_core.tools import tool

@tool
def analyze_code(file_path: str) -> str:
"""Analyze code quality and find issues."""
return f"Analyzed {file_path}: Found 3 code smells, 2 security issues"

@tool
def refactor_code(file_path: str, changes: str) -> str:
"""Refactor code with specified changes."""
return f"Refactored {file_path}: {changes}"

agent = create_agent(
model="openai:gpt-4o",
tools=[analyze_code, refactor_code],
middleware=[TodoListMiddleware()]
)

This configuration automatically injects planning capabilities into the agent.
Now let’s ask the agent to perform a multi-step refactoring task:
from langchain_core.messages import HumanMessage

response = agent.invoke({
"messages": [HumanMessage("I need to refactor my authentication module. First analyze it, then suggest improvements, and finally implement the changes.")]
})

Check the agent’s todo list to see how it planned the work:
# Access the structured todo list from the response
if "todos" in response:
print("Agent's Task Plan:")
for i, todo in enumerate(response["todos"], 1):
status = todo.get("status", "pending")
print(f"{i}. [{status}] {todo['content']}")

Output:
Agent's Task Plan:
1. [in_progress] Analyze the authentication module code to identify quality issues and areas for improvement.
2. [pending] Suggest improvements based on the analysis of the authentication module.
3. [pending] Implement the suggested improvements in the authentication module code.

Nice! The agent automatically decomposed the multi-step refactoring request into 3 distinct tasks, with 1 in progress and 2 pending. This structured approach ensures systematic execution without skipping critical steps.
Intelligent Tool Selection
Agents with many tools (10+) face a scaling problem: sending all tool descriptions with every request wastes tokens and degrades performance. The model must process irrelevant options, increasing latency and cost.
LLMToolSelectorMiddleware solves this by using a smaller model to pre-filter relevant tools:

Uses a secondary LLM (separate from the main agent model) to pre-filter and limit tools sent to main model
Allows critical tools to always be included in selection
Analyzes queries to select only relevant tools

The benefits:

Lower costs from sending fewer tool descriptions per request
Faster responses with smaller tool context
Better accuracy when model isn’t distracted by irrelevant options

Let’s create an agent with many tools for a customer support scenario:
from langchain.agents import create_agent
from langchain.agents.middleware import LLMToolSelectorMiddleware
from langchain_core.tools import tool

# Define multiple tools for different support scenarios
@tool
def lookup_order(order_id: str) -> str:
"""Look up order details and shipping status."""
return f"Order {order_id}: Shipped on 2025-01-15"

@tool
def process_refund(order_id: str, amount: float) -> str:
"""Process a customer refund."""
return f"Refund of ${amount} processed for order {order_id}"

@tool
def check_inventory(product_id: str) -> str:
"""Check product inventory levels."""
return f"Product {product_id}: 42 units in stock"

@tool
def update_address(order_id: str, new_address: str) -> str:
"""Update shipping address for an order."""
return f"Address updated for order {order_id}"

@tool
def cancel_order(order_id: str) -> str:
"""Cancel an existing order."""
return f"Order {order_id} cancelled"

@tool
def track_shipment(tracking_number: str) -> str:
"""Track package location."""
return f"Package {tracking_number}: Out for delivery"

@tool
def apply_discount(order_id: str, code: str) -> str:
"""Apply discount code to order."""
return f"Discount {code} applied to order {order_id}"

@tool
def schedule_delivery(order_id: str, date: str) -> str:
"""Schedule delivery for specific date."""
return f"Delivery scheduled for {date}"

Configure the agent with intelligent tool selection:
agent = create_agent(
model="openai:gpt-4o",
tools=[
lookup_order, process_refund, check_inventory,
update_address, cancel_order, track_shipment,
apply_discount, schedule_delivery
],
middleware=[
LLMToolSelectorMiddleware(
model="openai:gpt-4o-mini", # Use cheaper model for selection
max_tools=3, # Limit to 3 most relevant tools
always_include=["lookup_order"], # Always include order lookup
)
]
)

This configuration creates an efficient filtering system:

model="openai:gpt-4o-mini" – Uses a smaller, faster model for tool selection
max_tools=3 – Limits to 3 most relevant tools per query
always_include=["lookup_order"] – Ensures order lookup is always available

Now test the agent with different customer requests:
First, define a helper function to display tool usage:
def show_tools_used(response):
"""Display which tools were called during agent execution."""
tools_used = []
for msg in response["messages"]:
if hasattr(msg, "tool_calls") and msg.tool_calls:
for tool_call in msg.tool_calls:
tools_used.append(tool_call["name"])

if tools_used:
print(f"Tools used: {', '.join(tools_used)}")
print(f"Response: {response['messages'][-1].content}\n")

Test with a package tracking query:
# Example 1: Package tracking query
response = agent.invoke({
"messages": [HumanMessage("Where is my package? Tracking number is 1Z999AA10123456784")]
})
show_tools_used(response)

Output:
Tools used: track_shipment
Response: Your package with tracking number 1Z999AA10123456784 is currently out for delivery.

Test with a refund request:
# Example 2: Refund request
response = agent.invoke({
"messages": [HumanMessage("I need a refund of $50 for order ORD-12345")]
})
show_tools_used(response)

Output:
Tools used: lookup_order, process_refund
Response: The refund of $50 for order ORD-12345 has been successfully processed.

Test with an inventory check:
# Example 3: Inventory check
response = agent.invoke({
"messages": [HumanMessage("Do you have product SKU-789 in stock?")]
})
show_tools_used(response)

Output:
Tools used: check_inventory
Response: Yes, we currently have 42 units of product SKU-789 in stock.

The middleware demonstrated precise tool selection across different query types:

track_shipment for tracking numbers
lookup_order + process_refund for refund requests
check_inventory for stock queries

Each request filtered out 5+ irrelevant tools, sending only what was needed to the main model.
Building a Production Agent with Multiple Middleware
Let’s combine three middleware components to build a production-ready customer support agent that handles a realistic scenario: a customer with a long conversation history requesting a refund and sharing their email address.
from langchain.agents import create_agent
from langchain.agents.middleware import (
SummarizationMiddleware,
PIIMiddleware,
HumanInTheLoopMiddleware
)
from langchain_core.tools import tool
from langgraph.checkpoint.memory import MemorySaver

@tool
def process_refund(amount: float, reason: str) -> str:
"""Process a customer refund."""
return f"Refund of ${amount} processed for reason: {reason}"

# Create agent with three middleware components
agent = create_agent(
model="openai:gpt-4o",
tools=[process_refund],
middleware=[
SummarizationMiddleware(
model="openai:gpt-4o-mini",
max_tokens_before_summary=400,
messages_to_keep=5
),
PIIMiddleware("email", strategy="redact", apply_to_input=True),
HumanInTheLoopMiddleware(interrupt_on={"process_refund": True})
],
checkpointer=MemorySaver()
)

Now test with a realistic customer interaction, processing each message to show how middleware handles them.
First, define a helper function to track middleware behavior using the helper functions defined earlier:
def process_message_with_tracking(agent, messages, thread_id, turn_num):
"""Process messages and show middleware behavior."""
print(f"\n— Turn {turn_num} —")
print(f"User: {messages[-1][1]}")

response = agent.invoke(
{"messages": messages},
config={"configurable": {"thread_id": thread_id}}
)

# Check for interrupts (human-in-the-loop)
if has_interrupt(response):
print("⏸ Execution paused for approval")
else:
# Show agent response
agent_message = response["messages"][-1].content
print(f"Agent: {agent_message}")

# Check for PII redaction
full_response = str(response["messages"])
if "[REDACTED_EMAIL]" in full_response:
print("🔒 PII detected and redacted")

return response

Now simulate a customer conversation that demonstrates all three middleware components:

Turns 1-3: Normal conversation flow about a damaged laptop
Turn 4: Customer shares email and asks for confirmation (tests PIIMiddleware redaction)
Turn 5: Customer requests $1200 refund (triggers HumanInTheLoopMiddleware approval)

messages = []

# Turn 1: Initial complaint
messages.append(("user", "I ordered a laptop but it arrived damaged."))
process_message_with_tracking(agent, messages, "customer-456", 1)

# Turn 2: Additional details
messages.append(("user", "I already tried troubleshooting but it won't turn on."))
process_message_with_tracking(agent, messages, "customer-456", 2)

# Turn 3: More context
messages.append(("user", "The screen is cracked and there's a dent on the corner."))
process_message_with_tracking(agent, messages, "customer-456", 3)

# Turn 4: PII exposure – test if middleware hides email from model
messages.append(("user", "My email is customer@example.com. Can you confirm my email address?"))
response = process_message_with_tracking(agent, messages, "customer-456", 4)

# Turn 5: Sensitive action request – triggers human approval
messages.append(("user", "Can I get a full refund of $1200?"))
response = process_message_with_tracking(agent, messages, "customer-456", 5)

# If interrupted, show approval flow
if has_interrupt(response):
interrupts = response["__interrupt__"]
for interrupt in interrupts:
for action in interrupt.value["action_requests"]:
display_action(action)
approved = get_user_approval()

Output:
— Turn 1 —
User: I ordered a laptop but it arrived damaged.
Agent: I'm sorry to hear that your laptop arrived damaged. To resolve this issue, I can assist you in processing a refund. Could you please provide the amount of the purchase and any specific reason you would like to include for the refund?

— Turn 2 —
User: I already tried troubleshooting but it won't turn on.
Agent: Thank you for sharing that information. I'll proceed with processing a refund for the damaged laptop. Could you please provide the purchase amount, so I can include it in the refund request?

— Turn 3 —
User: The screen is cracked and there's a dent on the corner.
Agent: I'm sorry to hear about the condition of your laptop. I will help you process a refund. Could you please let me know the purchase amount, so I can proceed with the refund request?

— Turn 4 —
User: My email is customer@example.com. Can you confirm my email address?
Agent: I'm sorry, but I'm unable to confirm or access email addresses for privacy and security reasons. However, I can assist you with processing a refund. Could you please provide the amount you paid for the laptop so that I can proceed with the refund request?
🔒 PII detected and redacted

— Turn 5 —
User: Can I get a full refund of $1200?
⏸ Execution paused for approval

Pending action: process_refund
Arguments: {'amount': 1200, 'reason': 'Laptop arrived damaged with a cracked screen and dent on the corner, and it will not turn on after troubleshooting.'}

Approve this action? (yes/no): yes
✓ Action approved

The output demonstrates proper security controls:

Turn 4: Agent states it “cannot confirm or access email addresses,” confirming PIIMiddleware successfully redacted customer@example.com to [REDACTED_EMAIL]
Email protection: Model never saw the actual address, preventing data leaks or logging
Refund approval: $1200 transaction didn’t execute until human approval was granted

For coordinating multiple agents with shared state and workflows, explore our LangGraph tutorial.

Final Thoughts
Building production LLM agents with LangChain 1.0 middleware requires minimal infrastructure code. Each component handles one concern: managing context windows, protecting sensitive data, controlling execution flow, or structuring complex tasks.
The best approach is incremental. Add one middleware at a time, test its behavior, then combine it with others. This modular design lets you start simple and expand as your agent’s requirements evolve.
Related Tutorials

Structured Outputs: Enforce Structured Outputs from LLMs with PydanticAI for type-safe agent responses
RAG Implementation: Build a Complete RAG System with 5 Open-Source Tools for question-answering agents
Vector Storage: Implement Semantic Search in Postgres Using pgvector and Ollama for production-grade embedding storage

Favorite

Build Production-Ready LLM Agents with LangChain 1.0 Middleware Read More »

Choose the Right Text Pattern Tool: Regex, Pregex, or Pyparsing

Table of Contents

Introduction
Dataset Generation
Simple Regex: Basic Pattern Extraction
pregex: Build Readable Patterns
pyparsing: Parse Structured Ticket Headers
Conclusion

Introduction
Imagine you’re analyzing customer support tickets to extract contact information and error details. Tickets contain customer messages with email addresses in various formats, phone numbers with inconsistent formatting (some (555) 123-4567, others 555-123-4567).

ticket_id
message

0
Contact me at nichole70@kemp.com or (798)034-325 to resolve this issue.

1
You can reach me by phone (970-295-1452) or email (russellbrandon@simon-rogers.org) anytime.

2
My contact details: ehamilton@silva.io and 242 844 7293.

3
Feel free to call 901.794.1337 or email ogarcia@howell-chavez.net for assistance.

How do you extract the email addresses and phone numbers from the tickets?
This article shows three approaches to text pattern matching: regex, pregex, and pyparsing.
Key Takeaways
Here’s what you’ll learn:

Understand when regex patterns are sufficient and when they fall short
Write maintainable text extraction code using pregex’s readable components
Parse structured text with inconsistent formatting using pyparsing

💻 Get the Code: The complete source code and Jupyter notebook for this tutorial are available on GitHub. Clone it to follow along!

Dataset Generation
Let’s create sample datasets that will be used throughout the article. We’ll generate customer support ticket data using the Faker library:
Install Faker:
pip install faker

First, let’s generate customer support tickets with simple contact information:
from faker import Faker
import csv
import pandas as pd
import random

fake = Faker()
Faker.seed(40)

# Define phone patterns
phone_patterns = ["(###)###-####", "###-###-####", "### ### ####", "###.###.####"]

# Define email TLDs
email_tlds = [".com", ".org", ".io", ".net"]

# Generate phone numbers and emails
phones = []
emails = []

for i in range(4):
# Generate phone with specific pattern
phone = fake.numerify(text=phone_patterns[i])
phones.append(phone)

# Generate email with specific TLD
email = fake.user_name() + "@" + fake.domain_word() + email_tlds[i]
emails.append(email)

# Define sentence structures
sentence_structures = [
lambda p, e: f"Contact me at {e} or {p} to resolve this issue.",
lambda p, e: f"You can reach me by phone ({p}) or email ({e}) anytime.",
lambda p, e: f"My contact details: {e} and {p}.",
lambda p, e: f"Feel free to call {p} or email {e} for assistance."
]

# Create CSV with 4 rows
with open("data/tickets.csv", "w", newline="") as f:
writer = csv.writer(f)
writer.writerow(["ticket_id", "message"])

for i in range(4):
message = sentence_structures[i](phones[i], emails[i])
writer.writerow([i, message])

Set the display option to show the full width of the columns:
pd.set_option("display.max_colwidth", None)

Load and preview the tickets dataset:
df_tickets = pd.read_csv("data/tickets.csv")
df_tickets.head()

ticket_id
message

0
Contact me at nichole70@kemp.com or (798)034-325 to resolve this issue.

1
You can reach me by phone (970-295-1452) or email (russellbrandon@simon-rogers.org) anytime.

2
My contact details: ehamilton@silva.io and 242 844 7293.

3
Feel free to call 901.794.1337 or email ogarcia@howell-chavez.net for assistance.

Simple Regex: Basic Pattern Extraction
Regular expressions (regex) are patterns that match text based on rules. They excel at finding structured data like emails, phone numbers, and dates in unstructured text.
Extract Email Addresses
Start with a simple pattern that matches basic email formats, including:

Username: [a-z]+ – One or more lowercase letters (e.g. maria95)
Separator: @ – Literal @ symbol
Domain: [a-z]+ – One or more lowercase letters (e.g. gmail or outlook)
Dot: \. – Literal dot (escaped)
Extension: (?:org|net|com|io) – Match specific extensions (e.g. .com, .org, .io, .net)

import re

# Match basic email format: letters@domain.extension
email_pattern = r'[a-z]+@[a-z]+\.(?:org|net|com|io)'

df_tickets['emails'] = df_tickets['message'].apply(
lambda x: re.findall(email_pattern, x)
)

df_tickets[['message', 'emails']].head()

message
emails

0
Contact me at nichole70@kemp.com or (798)034-325 to resolve this issue.

1
You can reach me by phone (970-295-1452) or email (russellbrandon@simon-rogers.org) anytime.

2
My contact details: ehamilton@silva.io and 242 844 7293.

3
Feel free to call 901.794.1337 or email ogarcia@howell-chavez.net for assistance.

This pattern works for simple emails but misses variations with:

Other characters in the username such as numbers, dots, underscores, plus signs, or hyphens
Other characters in the domain such as numbers, dots, or hyphens
Other extensions that are not .com, .org, .io, or .net

Let’s expand the pattern to handle more formats:
# Handle emails with numbers, dots, underscores, hyphens, plus signs
improved_email = r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}'

df_tickets['emails_improved'] = df_tickets['message'].apply(
lambda x: re.findall(improved_email, x)
)

df_tickets[['message', 'emails_improved']].head()

message
emails_improved

0
Contact me at nichole70@kemp.com or (798)034-325 to resolve this issue.

1
You can reach me by phone (970-295-1452) or email (russellbrandon@simon-rogers.org) anytime.

2
My contact details: ehamilton@silva.io and 242 844 7293.

3
Feel free to call 901.794.1337 or email ogarcia@howell-chavez.net for assistance.

The improved pattern successfully extracts all emails from the tickets! Let’s move on to extracting phone numbers.
Extract Phone Numbers
Common phone number formats are:

(XXX)XXX-XXXX – With parentheses
XXX-XXX-XXXX – Without parentheses
XXX XXX XXXX – With spaces
XXX.XXX.XXXX – With dots

To handle all four phone formats, we can use the following pattern:

\(? – Optional opening parenthesis
\d{3} – Exactly 3 digits (area code)
[-.\s]? – Optional hyphen, dot, or space
\)? – Optional closing parenthesis
\d{3} – Exactly 3 digits (prefix)
[-.\s]? – Optional hyphen, dot, or space
\d{3,4} – Exactly 3 or 4 digits

# Define phone pattern
phone_pattern = r'\(?\d{3}\)?[-.\s]?\d{3}[-.\s]\d{4}'

df_tickets['phones'] = df_tickets['message'].apply(
lambda x: re.findall(phone_pattern, x)
)

df_tickets[['message', 'phones']].head()

message
phones

0
Contact me at hfuentes@anderson.com or (798)034-3254 to resolve this issue.

1
You can reach me by phone (702-951-4528) or email (russellbrandon@simon-rogers.org) anytime.

2
My contact details: ehamilton@silva.io and 242 844 7293.

3
Feel free to call 901.794.1337 or email ogarcia@howell-chavez.net for assistance.

Awesome! We are able to extract all phone numbers from the tickets!
While these patterns works, they are difficult to understand and modify for someone who is not familiar with regex.

📖 Readable code reduces maintenance burden and improves team productivity. Check out Production-Ready Data Science for detailed guidance on writing production-quality code.

In the next section, we will use pregex to build more readable patterns.
pregex: Build Readable Patterns
pregex is a Python library that lets you build regex patterns using readable Python syntax instead of regex symbols. It breaks complex patterns into self-documenting components that clearly express validation logic.
Install pregex:
pip install pregex

Extract Email Addresses
Let’s extract emails using pregex’s readable components.
In the code, we will use the following components:

Username: OneOrMore(AnyButWhitespace()) – Any letters but whitespace (maria95)
Separator: @ – Literal @ symbol
Domain name: OneOrMore(AnyButWhitespace()) – Any letters but whitespace (gmail or outlook)
Extension: Either(".com", ".org", ".io", ".net") – Match specific extensions (.com, .org, .io, .net)

from pregex.core.classes import AnyButWhitespace
from pregex.core.quantifiers import OneOrMore
from pregex.core.operators import Either

username = OneOrMore(AnyButWhitespace())
at_symbol = "@"
domain_name = OneOrMore(AnyButWhitespace())
extension = Either(".com", ".org", ".io", ".net")

email_pattern = username + at_symbol + domain_name + extension

# Extract emails
df_tickets["emails_pregex"] = df_tickets["message"].apply(
lambda x: email_pattern.get_matches(x)
)

df_tickets[["message", "emails_pregex"]].head()

message
emails_pregex

0
Contact me at hfuentes@anderson.com or (798)034-3254 to resolve this issue.
[hfuentes@anderson.com]

1
You can reach me by phone (702-951-4528) or email (russellbrandon@simon-rogers.org) anytime.
[(russellbrandon@simon-rogers.org]

2
My contact details: ehamilton@silva.io and 242 844 7293.
[ehamilton@silva.io]

3
Feel free to call 901.794.1337 or email ogarcia@howell-chavez.net for assistance.
[ogarcia@howell-chavez.net]

The output shows that we are able to extract the emails from the tickets!
pregex transforms pattern matching from symbol decoding into readable code. OneOrMore(username_chars) communicates intent more clearly than [a-zA-Z0-9._%+-]+, reducing the time teammates spend understanding and modifying validation logic.
Extract Phone Numbers
Now extract phone numbers with multiple components:

First three digits: Optional("(") + Exactly(AnyDigit(), 3) + Optional(")")
Separator: Either(" ", "-", ".")
Second three digits: Exactly(AnyDigit(), 3)
Last four digits: Exactly(AnyDigit(), 4)

from pregex.core.classes import AnyDigit
from pregex.core.quantifiers import Optional, Exactly
from pregex.core.operators import Either

# Build phone pattern using pregex
first_three = Optional("(") + Exactly(AnyDigit(), 3) + Optional(")")
separator = Either(" ", "-", ".")
second_three = Exactly(AnyDigit(), 3)
last_four = Exactly(AnyDigit(), 4)

phone_pattern = first_three + Optional(separator) + second_three + separator + last_four

# Extract phone numbers
df_tickets['phones_pregex'] = df_tickets['message'].apply(
lambda x: phone_pattern.get_matches(x)
)

df_tickets[['message', 'phones_pregex']].head()

message
phones_pregex

0
Contact me at hfuentes@anderson.com or (798)034-3254 to resolve this issue.
[(798)034-3254]

1
You can reach me by phone (702-951-4528) or email (russellbrandon@simon-rogers.org) anytime.
[(702-951-4528]

2
My contact details: ehamilton@silva.io and 242 844 7293.
[242 844 7293]

3
Feel free to call 901.794.1337 or email ogarcia@howell-chavez.net for assistance.
[901.794.1337]

If your system requires the raw regex pattern, you can get it with get_compiled_pattern():
print("Compiled email pattern:", email_pattern.get_compiled_pattern().pattern)
print("Compiled phone pattern:", phone_pattern.get_compiled_pattern().pattern)

Compiled email pattern: \S+@\S+(?:\.com|\.org|\.io|\.net)
Compiled phone pattern: \(?\d{3}\)?(?: |-|\.)?\d{3}(?: |-|\.)\d{4}

For more pregex examples including URLs and time patterns, see PRegEx: Write Human-Readable Regular Expressions in Python.

Parse Structured Ticket Headers
Now let’s tackle a more complex task: parsing structured ticket headers that contain multiple fields:
Ticket: 1000 | Priority: High | Assigned: John Doe # escalated

We will use Capture to extract just the values we need from each ticket:
from pregex.core.quantifiers import OneOrMore
from pregex.core.classes import AnyDigit, AnyLetter, AnyWhitespace
from pregex.core.groups import Capture

sample_ticket = "Ticket: 1000 | Priority: High | Assigned: John Doe # escalated"

# Define patterns with Capture to extract just the values
whitespace = AnyWhitespace()
ticket_id_pattern = "Ticket:" + whitespace + Capture(OneOrMore(AnyDigit()))
priority_pattern = "Priority:" + whitespace + Capture(OneOrMore(AnyLetter()))
name_pattern = (
"Assigned:"
+ whitespace
+ Capture(OneOrMore(AnyLetter()) + " " + OneOrMore(AnyLetter()))
)

# Define separator pattern (whitespace around pipe)
separator = whitespace + "|" + whitespace

# Combine all patterns with separators
ticket_pattern = (
ticket_id_pattern
+ separator
+ priority_pattern
+ separator
+ name_pattern
)
“`text
Next, define a function to extract the ticket components from the captured components:

“`python
def get_ticket_components(ticket_string, ticket_pattern):
"""Extract ticket components from a ticket string."""
try:
captures = ticket_pattern.get_captures(ticket_string)[0]
return pd.Series(
{
"ticket_id": captures[0],
"priority": captures[1],
"assigned": captures[2],
}
)
except IndexError:
return pd.Series(
{"ticket_id": None, "priority": None, "assigned": None}
)

Apply the function with the pattern defined above to the sample ticket.

components = get_ticket_components(sample_ticket, ticket_pattern)
print(components.to_dict())

{'ticket_id': '1000', 'priority': 'High', 'assigned': 'John Doe'}

This looks good! Let’s apply to ticket headers with inconsistent whitespace around the separators. Start by creating the dataset:
import pandas as pd

# Create tickets with embedded comments and variable whitespace
tickets = [
"Ticket: 1000 | Priority: High | Assigned: John Doe # escalated",
"Ticket: 1001 | Priority: Medium | Assigned: Maria Garcia # team lead",
"Ticket:1002| Priority:Low |Assigned:Alice Smith # non-urgent",
"Ticket: 1003 | Priority: High | Assigned: Bob Johnson # on-call"
]

df_tickets = pd.DataFrame({'ticket': tickets})
df_tickets.head()

ticket

0

1

2

3

# Extract individual components using the function
df_pregex = df_tickets.copy()
components_df = df_pregex["ticket"].apply(get_ticket_components, ticket_pattern=ticket_pattern)

df_pregex = df_pregex.assign(**components_df)

df_pregex[["ticket_id", "priority", "assigned"]].head()

ticket_id
priority
assigned

0
1000
High
John Doe

1
None
None
None

2
None
None
None

3
1003
High
Bob Johnson

We can see that pregex misses Tickets 1 and 2 because AnyWhitespace() only matches a single space, while those rows use inconsistent spacing around the separators.
Making pregex patterns flexible enough for variable formatting requires adding optional quantifiers to the whitespace pattern so that it can match zero or more spaces around the separators.
As these fixes accumulate, pregex’s readability advantage diminishes, and you end up with code that’s as hard to understand as raw regex but more verbose.
When parsing structured data with consistent patterns but varying details, pyparsing provides more robust handling than regex.
pyparsing: Parse Structured Ticket Headers
Unlike regex’s pattern matching approach, pyparsing lets you define grammar rules using Python classes, making parsing logic explicit and maintainable.
Install pyparsing:
pip install pyparsing

Let’s parse the complete structure with pyparsing, including:

Ticket ID: Word(nums) – One or more digits (e.g. 1000)
Priority: Word(alphas) – One or more letters (e.g. High)
Name: Word(alphas) + Word(alphas) – First and last name (e.g. John Doe)

We will also use the pythonStyleComment to ignore Python-style comments throughout parsing.
from pyparsing import Word, alphas, nums, Literal, pythonStyleComment

# Define grammar components
ticket_num = Word(nums)
priority = Word(alphas)
name = Word(alphas) + Word(alphas)

# Define complete structure
ticket_grammar = (
"Ticket:"
+ ticket_num
+ "|"
+ "Priority:"
+ priority
+ "|"
+ "Assigned:"
+ name
)

# Automatically ignore Python-style comments throughout parsing
ticket_grammar.ignore(pythonStyleComment)

sample_ticket = "Ticket: 1000 | Priority: High | Assigned: John Doe # escalated"
sample_result = ticket_grammar.parse_string(sample_ticket)
print(sample_result)

['Ticket:', '1000', '|', 'Priority:', 'High', '|', 'Assigned:', 'John', 'Doe']

Awesome! We are able to extract the ticket components from the ticket with a much simpler pattern!
Compare this to the pregex implementation:
ticket_pattern = (
"Ticket:" + whitespace + Capture(OneOrMore(AnyDigit()))
+ whitespace + "|" + whitespace
+ "Priority:" + whitespace + Capture(OneOrMore(AnyLetter()))
+ whitespace + "|" + whitespace
+ "Assigned:"
+ whitespace
+ Capture(OneOrMore(AnyLetter()) + " " + OneOrMore(AnyLetter()))
)

We can see that pyparsing handles structured data better than pregex for the following reasons:

No whitespace boilerplate: pyparsing handles spacing automatically while pregex requires + whitespace + between every component
Self-documenting: Word(alphas) clearly means “letters” while pregex’s nested Capture(OneOrMore(AnyLetter())) is less readable

To extract ticket components, assign names using () syntax and access them via dot notation:
# Define complete structure
ticket_grammar = (
"Ticket:"
+ ticket_num("ticket_id")
+ "|"
+ "Priority:"
+ priority("priority")
+ "|"
+ "Assigned:"
+ name("assigned")
)

# Automatically ignore Python-style comments throughout parsing
ticket_grammar.ignore(pythonStyleComment)

sample_ticket = "Ticket: 1000 | Priority: High | Assigned: John Doe # escalated"
sample_result = ticket_grammar.parse_string(sample_ticket)

# Access the components by name
print(
f"Ticket ID: {sample_result.ticket_id}",
f"Priority: {sample_result.priority}",
f"Assigned: {' '.join(sample_result.assigned)}",
)

Ticket ID: 1000 Priority: High Assigned: John Doe

Let’s apply this to the entire dataset.
# Parse all tickets and create columns
def parse_ticket(ticket, ticket_grammar):
result = ticket_grammar.parse_string(ticket)
return pd.Series(
{
"ticket_id": result.ticket_id,
"priority": result.priority,
"assigned": " ".join(result.assigned),
}
)

df_pyparsing = df_tickets.copy()
components_df_pyparsing = df_pyparsing["ticket"].apply(parse_ticket, ticket_grammar=ticket_grammar)
df_pyparsing = df_pyparsing.assign(**components_df_pyparsing)

df_pyparsing[["ticket_id", "priority", "assigned"]].head()

ticket_id
priority
assigned

0
1000
High
John Doe

1
1001
Medium
Maria Garcia

2
1002
Low
Alice Smith

3
1003
High
Bob Johnson

The output looks good!
Let’s try to parse some more structured data with pyparsing.
Extract Code Blocks from Markdown
Use SkipTo to extract Python code between code block markers without complex regex patterns like r'“`python(.*?)“`':
from pyparsing import Literal, SkipTo

code_start = Literal("“`python")
code_end = Literal("“`")

code_block = code_start + SkipTo(code_end)("code") + code_end

markdown = """“`python
def hello():
print("world")
“`"""

result = code_block.parse_string(markdown)
print(result.code)

def hello():
print("world")

Parse Nested Structures
nested_expr handles arbitrary nesting depth, which regex fundamentally cannot parse:
from pyparsing import nested_expr

# Default: parentheses
nested_list = nested_expr()
result = nested_list.parse_string("((2 + 3) * (4 – 1))")
print(result.as_list())

[[['2', '+', '3'], '*', ['4', '-', '1']]]

Conclusion
So how do you know when to use each tool? Choose your tool based on your needs:
Use simple regex when:

Extracting simple, well-defined patterns (emails, phone numbers with consistent format)
Pattern won’t need frequent modifications

Use pregex when:

Pattern has multiple variations (different phone number formats)
Need to document pattern logic through readable code

Use pyparsing when:

Need to extract multiple fields from structured text (ticket headers, configuration files)
Must handle variable formatting (inconsistent whitespace, embedded comments)

In summary, start with simple regex, adopt pregex when readability matters, and switch to pyparsing when structure becomes complex.
Related Tutorials
Here are some related text processing tools:

Text similarity matching: 4 Text Similarity Tools: When Regex Isn’t Enough compares regex preprocessing, difflib, RapidFuzz, and Sentence Transformers for matching product names and handling data variations
Business entity extraction: langextract vs spaCy: AI-Powered vs Rule-Based Entity Extraction evaluates regex, spaCy, GLiNER, and langextract for extracting structured information from financial documents

Favorite

Choose the Right Text Pattern Tool: Regex, Pregex, or Pyparsing Read More »

Faker: Generate Realistic Test Data in Python with One Line of Code

Table of Contents

Motivation
Basics of Faker
Location-Specific Data Generation
Create Text
Create Profile Data
Create Random Python Datatypes
Conclusion

Motivation
Let’s say you want to create data with certain data types (bool, float, text, integers) with special characteristics (names, address, color, email, phone number, location) to test some Python libraries or specific implementation. But it takes time to find that specific kind of data. You wonder: is there a quick way that you can create your own data?
What if there is a package that enables you to create fake data in one line of code such as this:
fake.profile()

{
'address': '076 Steven Trace\nJillville, ND 12393',
'birthdate': datetime.date(1981, 11, 19),
'blood_group': 'O-',
'company': 'Johnson-Rodriguez',
'current_location': (Decimal('61.969848'), Decimal('121.407164')),
'job': 'Patent examiner',
'mail': 'ohicks@hotmail.com',
'name': 'Katie Romero',
'residence': '271 Smith Wells\nMichaelport, MN 40933',
'sex': 'F',
'ssn': '281-84-3963',
'username': 'eparker',
'website': ['https://www.gonzalez.com/', 'https://rogers-scott.com/']
}

This can be done with Faker, a Python package that generates fake data for you, ranging from a specific data type to specific characteristics of that data, and the origin or language of the data. Let’s discover how we can use Faker to create fake data.

💻 Get the Code: The complete source code and Jupyter notebook for this tutorial are available on GitHub. Clone it to follow along!

Basics of Faker
Start with installing the package:
pip install Faker

Import Faker:
from faker import Faker

fake = Faker()

Some basic methods of Faker:
print(fake.color_name())
print(fake.name())
print(fake.address())
print(fake.job())
print(fake.date_of_birth(minimum_age=30))
print(fake.city())

Tan
Kristin Buck
715 Peter Views
Abigailport, ME 57602
Systems analyst
1946-03-07
Evanmouth

Let’s say you are an author of a fiction book who want to create a character but find it difficult and time-consuming to come up with a realistic name and information. You can write:
name = fake.name()
color = fake.color_name()
city = fake.city()
job = fake.job()

print(f'Her name is {name}. She lives in {city}. Her favorite color is {color}. She works as a {job}')

Her name is Debra Armstrong. She lives in Beanview. Her favorite color is GreenYellow. She works as a Lawyer

With Faker, you can generate a persuasive example instantly!
Location-Specific Data Generation
Luckily, we can also specify the location of the data we want to fake. Maybe the character you want to create is from Italy. You also want to create instances of her friends. Since you are from the US, it is difficult for you to generate relevant information to that location. That can be easily taken care of by adding location parameter in the class Faker:
fake = Faker('it_IT')

for _ in range(10):
print(fake.name())

Angelica Donarelli-Marangoni
Rosaria Castiglione
Federica Iacovelli
Puccio Armellini
Dina Donini-Alboni
Dott. Carolina Marrone
Olga Nosiglia
Graziella Russo
Paulina Galiazzo
Dott. Riccardo Padovano

Or create information from multiple locations:
fake = Faker(['ja_JP','zh_CN','es_ES','en_US','fr_FR'])

for _ in range(10):
print(fake.city())

齐齐哈尔市
Blakefort
North Joeborough
玉兰市
Saint Suzanne-les-Bains
Melilla
調布市
富津市
Maillot-sur-Mer
East Jamesshire

If you are from these specific countries, I hope you recognize the location. In case you are curious about other locations that you can specify, check out the doc here.
Create Text
Create Random Text
We can create random text with:
fake = Faker('en_US')
print(fake.text())

Gas threat perhaps minute energy thus. Relate group science car discussion budget art.
Let visit reach senior. Story once list almost. Enough major everyone.

Try with the Vietnamese language:
fake = Faker('vi_VN')
print(fake.text())

Như không cho số vậy tại đến. Hơn các thay. Khi từ cũng không rất là.
Gần được cho có nơi như vẫn cho. Nơi đi về giống.
Mà cũng từ nhưng lớn. Từng của nếu khi như nhưng.

None of these random text makes sense, but it is a good way to quickly create text for testing.
Create Text from Selected Words
Or we can also create text from a list of words:
fake = Faker()
my_information = ['dog','swimming', '21', 'slow', 'girl', 'coffee', 'flower','pink']

print(fake.sentence(ext_word_list=my_information))
print(fake.sentence(ext_word_list=my_information))

Coffee pink coffee.
Dog pink 21 pink.
“`text
## Create Profile Data {#create-profile-data}

We can quickly create a profile with:

“`python
fake = Faker()
fake.profile()

{'job': 'Nurse, adult',
'company': 'Johnson, Moore and Glover',
'ssn': '762-56-8929',
'residence': '742 Shane Groves\nLake Jasminefort, GU 12583',
'current_location': (Decimal('-77.3842165'), Decimal('7.407430')),
'blood_group': 'B-',
'website': ['https://brooks.com/'],
'username': 'brownamanda',
'name': 'Carolyn Navarro',
'sex': 'F',
'address': '505 Lewis Grove Apt. 588\nHowardville, ID 68181',
'mail': 'larry00@hotmail.com',
'birthdate': datetime.date(1946, 6, 13)}

As we can see, most relevant information about a person is created with ease, even with mail, ssn, username, and website.
What is even more useful is that we can create a dataframe of 100 users from different countries:
import pandas as pd

fake = Faker(['it_IT','ja_JP', 'zh_CN', 'de_DE','en_US'])
profiles = [fake.profile() for i in range(100)]

pd.DataFrame(profiles).head()

job
company
ssn
residence
current_location
blood_group
website
username
name
sex
address
mail
birthdate

0
Physiological scientist
Sobrero-Mazzanti Group
CLGTNO59H42A473Z
Incrocio Cabrini, 14 Appartamento 59\n74100, L…
(-88.2637715, 149.968584)
AB+
[http://federici-endrizzi.it/, http://www.paru…]
giuliagreco
Dott. Liliana Serraglio
F
Vicolo Milo, 0\n64020, Ripattoni (TE)
giolittiflavio@gmail.com
1998-10-10

1
花火師
阿部運輸株式会社
701-41-9799
和歌山県印旛郡本埜村鳥越20丁目23番18号
(79.245074, 109.117174)
O+
[https://suzuki.com/, http://ishikawa.jp/]
lyamamoto
斉藤 明美
F
東京都江戸川区神明内40丁目12番20号
akemiyamada@yahoo.com
1916-12-09

2
小説家
小林食品株式会社
103-28-5057
島根県富津市細野7丁目16番1号
(-84.3304275, 38.093874)
A+
[https://tanaka.jp/, http://www.fujita.net/, h…]
minoru62
渡辺 英樹
M
青森県川崎市川崎区長畑22丁目27番12号
minoru35@yahoo.com
2008-02-17

3
ゲームクリエイター
佐藤水産有限会社
123-85-7967
宮城県調布市隼町3丁目22番12号 アーバン台東327
(-49.3689775, -134.762867)
AB-
[http://www.sato.org/, http://kato.net/, http:…]
ayamamoto
鈴木 洋介
M
栃木県川崎市中原区虎ノ門30丁目27番20号
yuta56@hotmail.com
1917-01-25

4
薬剤師
合同会社高橋建設
891-98-2169
山梨県山武郡横芝光町轟4丁目22番10号 コート天神島159
(-62.1493985, -105.171377)
B+
[http://yamashita.jp/, http://www.shimizu.com/]
yosukekimura
田中 真綾
F
山口県府中市下吉羽6丁目20番2号
hayashiyuki@yahoo.com
2001-08-09

Create Random Python Datatypes
If we just care about the type of your data, without caring so much about the information, we can easily generate random datatypes such as:
Boolean:
print(fake.pybool())

False

A list of 5 elements with different data_type:
print(fake.pylist(nb_elements=5, variable_nb_elements=True))

['juan28@example.org', 8515, 6618, 'UexWQJkGrJFGBAVfHgUt']

A decimal with 5 left digits and 6 right digits (after the .):
print(fake.pydecimal(left_digits=5, right_digits=6, positive=False, min_value=None, max_value=None))

-26114.564612

You can find more about other Python datatypes that you can create here.
Conclusion
I hope you find Faker a helpful tool to create data efficiently. You may find this tool useful for what you are working on or may not at the moment. But it is helpful to know that there exists a tool that enables you to generate data with ease for your specific needs such as testing.
Feel free to check out more information about Faker here.
Favorite

Faker: Generate Realistic Test Data in Python with One Line of Code Read More »

3 Tools That Automatically Convert Python Code to LaTeX Math

Table of Contents

Introduction
Tool Selection Guide
Setting Up the Environment
IPython.display.Latex: Built-in LaTeX Rendering
handcalcs: Step-by-Step Calculations
latexify-py: Automated Function Conversion
SymPy: Symbolic Mathematics
Final Thoughts

Introduction
Imagine you are a financial analyst, who is building financial models in Python and need to present them to non-technical executives. Since they are not familiar with Python, you need to show them the mathematical foundations behind your algorithms, not just code blocks. How can you do that?
The best way to present mathematical models is to use LaTeX. It is a powerful tool for writing mathematical notation and equations. It is widely used in academic papers, research papers, and technical reports.
However, writing LaTeX by hand is not easy, especially for complex equations. In this article, you will learn how to convert Python code to LaTeX in Jupyter notebooks using four powerful tools: IPython.display.Latex, handcalcs, latexify-py, and SymPy.

💻 Get the Code: The complete source code and Jupyter notebook for this tutorial are available on GitHub. Clone it to follow along!

Key Takeaways
Here’s what you’ll learn:

Transform Python calculations into professional LaTeX equations using four specialized tools
Generate step-by-step mathematical documentation automatically with handcalcs magic commands
Convert Python functions to clean LaTeX notation instantly using latexify-py decorators
Perform symbolic mathematics including equation solving and algebraic manipulation with SymPy

Setting Up the Environment
Install the required packages using pip or uv.
# Using pip
pip install handcalcs latexify-py sympy

# Using uv (recommended)
uv add handcalcs latexify-py sympy

📚 For production-ready notebook workflows and development best practices, check out Production-Ready Data Science.

IPython.display.Latex: Built-in LaTeX Rendering
The simplest approach uses Jupyter’s built-in IPython.display.Latex rendering. It is ideal when you want precise control over mathematical notation.
Let’s create a professional-looking compound interest calculation.
Start with defining the variables, where P is the principal amount, r is the annual interest rate, and t is the number of years.
P = 10000 # principal amount
r = 0.08 # annual interest rate
t = 5 # number of years

Now we are ready to display the calculation in LaTeX. To make the calculation easier to follow, we will break it down into three parts:

Display the formula
Substitute the variables into the formula
Display the result

# Calculate result
A = P * (1 + r) ** t

# Display the calculation with LaTeX
display(Latex(r"$A = P(1 + r)^t$"))
display(Latex(f"$A = {P:,}(1 + {r})^{{{t}}}$"))
display(Latex(f"$A = {A:,.2f}$"))

 
\displaystyle A = P(1 + r)^t
\displaystyle A = 10{,}000\,(1 + 0.08)^{5}
\displaystyle A = 14{,}693.28
 
This shows step-by-step substitutions, but writing LaTeX for each step is manual and slow.
Wouldn’t it be nice if we could have steps and substitutions and latex code automatically generated when writing Python code? That is where handcalcs comes in.
handcalcs: Step-by-Step Calculations
handcalcs automatically converts Python calculations into step-by-step mathematical documentation. It’s perfect for technical reports and educational content.
Jupyter Magic Command
To use handcals in Jupyter, we need to load the extension first.
import handcalcs.render
from handcalcs import handcalc

# Enable handcalcs in Jupyter
%load_ext handcalcs.render

Now we can use the %%render magic command to render the calculation.
%%render
# Step-by-step substitutions for compound interest
A = P * (1 + r)**t

 
\displaystyle A = P\,\left(1+r\right)^{t} = 10000\,\left(1+0.080\right)^{5} = 14693.281
 
This renders as a complete step-by-step calculation showing all substitutions and intermediate results. All without writing a single line of LaTeX code!
Function Decorator
Use the function decorator to render calculations. Set jupyter_display=True to show the LaTeX in Jupyter.
from handcalcs import handcalc

@handcalc(jupyter_display=True)
def calculate_compound_interest(P, r, t):
A = P * (1 + r)**t
return A

# Calling the function renders the calculation with substitutions
result = calculate_compound_interest(10000, 0.08, 5)

The result is a simple number that can be used for further calculations.
result
print(f"Result: {result:,.2f}")

Output:
Result: 14,693.28

latexify-py: Automated Function Conversion
Unlike handcalcs, which renders step-by-step numeric substitutions, latexify-py focuses on function-level documentation without the intermediate arithmetic. It’s ideal when you want a clean, reusable formula and don’t need to show the intermediate steps.
import latexify

# Simple function conversion
@latexify.function
def A(P, r, t):
return P * (1 + r) ** t

A

 
\displaystyle A(P, r, t) = P \cdot \mathopen{}\left( 1 + r \mathclose{}\right)^{t}
 
The latexify-py function can be used like a normal Python function to compute the result.
result = A(10000, 0.08, 5)
print(f"Result: {result:,.2f}")

Output:
Result: 14,693.28

SymPy: Symbolic Mathematics
handcalcs and latexify-py excel at rendering clear results from concrete values, but they are not good at symbolic tasks like solving variables, computing derivatives or integrals. For these tasks, use SymPy.
To create a symbolic equation, start with defining the variables and the equation.
from sympy import symbols, Eq, solve

# Define the variables
A, P, r, t = symbols("A P r t", positive=True)

# Define the equation
eq = Eq(A, P * (1 + r) ** t)
eq

 
\displaystyle A(t) = P(1 + r)^t
 
After setting up the equation, we are ready to perform symbolic calculations.
Substitute Variables
Let’s compute A (amount) from given P (principal), r (interest rate), and t (time) by solving for A and substituting values.
# Solve for A
A_expr = solve(eq, A)[0]

# Substitute the values
A_result = A_expr.subs({P: 10000, r: 0.08, t: 5})
A_result

 
\displaystyle 14693.280768
 
Solve for a Variable
Let’s solve the equation for t to answer the question: “How many years will it take for the investment to reach A (amount) given P (principal) and r (interest rate)?”
t_sol = solve(eq, t)[0]
t_sol

The result is the formula to solve for t.
 
\displaystyle \frac{\log{\left(A \right)} – \log{\left(P \right)}}{\log{\left(r + 1 \right)}}
 
Now we can substitute the variables into the equation to answer a more specific question: “How many years will it take for the investment to reach $5,000 given a principal of $1,000 and an annual interest rate of 8%?”
t_result = t_sol.subs({P: 1000, r: 0.08, A: 5000}).evalf(2)
t_result

 
\displaystyle 21.0
 
The result shows that it will take approximately 21 years for the investment to reach $5,000.
Expand and Factor an Expression
We can also use SymPy to expand and factor an expression.
Assume t = 2. With an annual rate r and two compounding periods, the expression becomes:
compound_expr = P * (1 + r) ** 2
compound_expr

 
\displaystyle P(1 + r)^2
 
Let’s expand the expression using the expand function.
from sympy import expand

expanded_expr = expand(compound_expr)
expanded_expr

 
\displaystyle P r^{2} + 2 P r + P
 
We can then turn the expression back into a product of factors using the factor function.
from sympy import factor

factored_expr = factor(expanded_expr)
factored_expr

 
\displaystyle P \left(r + 1\right)^{2}
 
Summary
Converting Python code to LaTeX in Jupyter notebooks transforms your technical documentation from code-heavy to mathematically elegant. Here’s when to use each tool:

Use IPython.display.Latex when: You need precise control over mathematical notation
Use handcalcs when: You want step-by-step calculation documentation
Use latexify-py when: You want automatic function-to-LaTeX conversion
Use SymPy when: You want to solve equations, compute derivatives and integrals

Related Tutorials

Broader Visualization: Top 6 Python Libraries for Visualization: Which One to Use? for comprehensive visualization options beyond mathematical notation
Advanced Mathematical Operations: 5 Essential Itertools for Data Science for complex mathematical workflows and performance optimization

Favorite

3 Tools That Automatically Convert Python Code to LaTeX Math Read More »

0
    0
    Your Cart
    Your cart is empty
    Scroll to Top

    Work with Khuyen Tran

    Work with Khuyen Tran

    Seraphinite AcceleratorOptimized by Seraphinite Accelerator
    Turns on site high speed to be attractive for people and search engines.