Delta Lake: Transform pandas Prototypes into Production
Table of Contents
Introduction
Introduction to Delta-rs
Setup and Data Preparation
Creating Your First Delta Table
Incremental Updates and CRUD Operations
Time Travel and Data Versioning
Schema Evolution in Action
Selective Updates with Merge Operations
Multi-Engine Integration
Automatic File Cleanup
Conclusion
Introduction
Data scientists face a familiar challenge: pandas works perfectly for prototyping, but production requires enterprise features that traditional file formats can’t provide.
Delta-rs solves this by bringing Delta Lake’s ACID transactions, time travel, and schema evolution to Python without Spark dependencies. It transforms your pandas workflow into production-ready pipelines with minimal code changes.
This tutorial shows you how to build scalable data systems using Delta-rs while maintaining the simplicity that makes pandas so effective.
💻 Get the Code: The complete source code and Jupyter notebook for this tutorial are available on GitHub. Clone it to follow along!
Introduction to Delta-rs
Delta-rs is a native Rust implementation of Delta Lake for Python. It provides enterprise-grade data lake capabilities without requiring Spark clusters or JVM setup.
Key advantages over traditional file formats:
ACID transactions ensure data consistency during concurrent operations
Time travel enables access to historical data versions
Schema evolution handles data structure changes automatically
Multi-engine support works with pandas, DuckDB, Polars, and more
Efficient updates support upserts and incremental changes without full rewrites
Setup and Data Preparation
Install Delta-rs and supporting libraries:
pip install deltalake pandas duckdb polars
We’ll use actual NYC Yellow Taxi data to demonstrate real-world scenarios. The NYC Taxi & Limousine Commission provides monthly trip records in Parquet format:
import pandas as pd
from deltalake import DeltaTable, write_deltalake
import duckdb
import polars as pl
# Download NYC Yellow Taxi data (June 2024 as example)
# Full dataset available at: https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page
taxi_url = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-06.parquet"
# Read a sample of the data for demonstration
sample_data = pd.read_parquet(taxi_url).head(10000)
print(f"Loaded {len(sample_data)} taxi trips from NYC TLC")
print(f"Data shape: {sample_data.shape}")
print(f"Date range: {sample_data['tpep_pickup_datetime'].min()} to {sample_data['tpep_pickup_datetime'].max()}")
sample_data.head()
Output:
Loaded 10000 taxi trips from NYC TLC
Data shape: (10000, 19)
Date range: 2024-05-31 15:33:34 to 2024-06-01 02:59:54
VendorID tpep_pickup_datetime … congestion_surcharge Airport_fee
0 1 2024-06-01 00:03:46 … 0.0 1.75
1 2 2024-06-01 00:55:22 … 0.0 1.75
2 1 2024-06-01 00:23:53 … 0.0 0.00
3 1 2024-06-01 00:32:24 … 2.5 0.00
4 1 2024-06-01 00:51:38 … 2.5 0.00
[5 rows x 19 columns]
Creating Your First Delta Table
Create your first Delta table in the data directory:
write_deltalake("data/taxi_delta_table", sample_data, mode="overwrite")
print("Created Delta table")
# Read back from Delta table
dt = DeltaTable("data/taxi_delta_table")
df_from_delta = dt.to_pandas()
print(f"Delta table contains {len(df_from_delta)} records")
Output:
Created Delta table
Delta table contains 10000 records
View the Delta table structure:
# Inspect Delta table metadata
print("Delta table schema:")
print(dt.schema().to_arrow())
Output:
Delta table schema:
arro3.core.Schema
————
VendorID: Int32
tpep_pickup_datetime: Timestamp(Microsecond, None)
tpep_dropoff_datetime: Timestamp(Microsecond, None)
passenger_count: Float64
trip_distance: Float64
…
total_amount: Float64
congestion_surcharge: Float64
Airport_fee: Float64
View the current version of the Delta table:
print(f"Current version: {dt.version()}")
Output:
Current version: 0
“`text
## Incremental Updates and CRUD Operations {#incremental-updates-and-crud-operations}
Instead of rewriting entire datasets when adding new records, incremental updates append only what changed. Delta-rs handles these efficient operations natively.
To demonstrate this, we'll simulate late-arriving data:
“`python
# Simulate late-arriving data
late_data = pd.read_parquet(taxi_url).iloc[10000:10050]
print(f"New data to add: {len(late_data)} records")
Output:
New data to add: 50 records
Traditional Approach: Process Everything
The pandas workflow requires loading both existing and new data, combining them, and rewriting the entire output file:
# Pandas approach – reload existing data and merge
existing_df = pd.read_parquet(taxi_url).head(10000)
complete_df = pd.concat([existing_df, late_data])
complete_df.to_parquet("data/taxi_complete.parquet")
print(f"Processed {len(complete_df)} total records")
Output:
Processed 10050 total records
Pandas processed all 10,050 records to add just 50 new ones, demonstrating the inefficiency of full-dataset operations.
Delta-rs Approach: Process Only New Data
Delta-rs appends only the new records without touching existing data:
# Delta-rs – append only what's new
write_deltalake("data/taxi_delta_table", late_data, mode="append")
dt = DeltaTable("data/taxi_delta_table")
print(f"Added {len(late_data)} new records")
print(f"Table version: {dt.version()}")
Output:
Added 50 new records
Table version: 1
Delta-rs processed only the 50 new records while automatically incrementing to version 1, enabling efficient operations and data lineage.
Time Travel and Data Versioning
Time travel and data versioning let you access any previous state of your data. This is essential for auditing changes, recovering from errors, and understanding how data evolved over time without maintaining separate backup files.
Traditional Approach: Manual Backup Strategy
Traditional file-based workflows rely on timestamped copies and manual versioning:
# Traditional pproach – manual timestamped backups
import datetime
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
df.to_parquet(f"data/taxi_backup_{timestamp}.parquet") # Create manual backup
df_modified.to_parquet("data/taxi_data.parquet") # Overwrite original
# To recover: manually identify and reload backup file
Delta-rs Approach: Built-in Time Travel
Delta-rs automatically tracks every change with instant access to any version:
# Access any historical version instantly
dt_v0 = DeltaTable("data/taxi_delta_table", version=0)
current_dt = DeltaTable("data/taxi_delta_table")
print(f"Version 0: {len(dt_v0.to_pandas())} records")
print(f"Current version: {len(current_dt.to_pandas())} records")
print(f"Available versions: {current_dt.version() + 1}")
Output:
Version 0: 10000 records
Current version: 10050 records
Available versions: 2
Delta-rs maintains 2 complete versions while traditional backups would require separate 57MB files for each timestamp.
📚 For comprehensive production data workflows and version control best practices, check out Production-Ready Data Science.
Schema Evolution in Action
As requirements evolve, you often need to add new columns or change data types. Schema evolution handles these changes automatically, letting you update your data structure without breaking existing queries or reprocessing historical records.
To demonstrate this, imagine NYC’s taxi authority introduces weather tracking and surge pricing features, requiring your pipeline to handle new weather_condition and surge_multiplier columns alongside existing fare data.
# Copy the existing data
enhanced_data = pd.read_parquet(taxi_url).iloc[20000:20100].copy()
# Simulate new data with additional business columns
weather_options = ['clear', 'rain', 'snow', 'cloudy']
surge_options = [1.0, 1.2, 1.5, 2.0]
enhanced_data['weather_condition'] = [weather_options[i % 4] for i in range(len(enhanced_data))]
enhanced_data['surge_multiplier'] = [surge_options[i % 4] for i in range(len(enhanced_data))]
print(f"Enhanced data: {len(enhanced_data)} records with {len(enhanced_data.columns)} columns")
print(f"New columns: {[col for col in enhanced_data.columns if col not in sample_data.columns]}")
Output:
Enhanced data: 100 records with 21 columns
New columns: ['weather_condition', 'surge_multiplier']
Traditional Approach: No Schema History
Traditional formats provide no tracking of schema changes or evolution history:
# Traditional approach – no schema versioning or history
df_v1 = pd.read_parquet("taxi_v1.parquet") # Original schema
df_v2 = pd.read_parquet("taxi_v2.parquet") # Enhanced schema
Delta-rs Approach: Schema Versioning and History
Delta-rs automatically merges schemas while tracking every change:
# Schema evolution with automatic versioning
write_deltalake(
"data/taxi_delta_table",
enhanced_data,
mode="append",
schema_mode="merge"
)
dt = DeltaTable("data/taxi_delta_table")
print(f"Schema evolved: {len(dt.to_pandas().columns)} columns | Version: {dt.version()}")
Output:
Schema evolved: 21 columns | Version: 2
Explore the complete schema evolution history and access any previous version:
# View schema change history
history = dt.history()
for entry in history[:2]:
print(f"Version {entry['version']}: {entry['operation']} at {entry['timestamp']}")
# Access different schema versions
original_schema = DeltaTable("data/taxi_delta_table", version=0)
print(f"\nOriginal schema (v0): {len(original_schema.to_pandas().columns)} columns")
print(f"Current schema (v{dt.version()}): {len(dt.to_pandas().columns)} columns")
Output:
Version 2: WRITE at 1755180763083
Version 1: WRITE at 1755180762968
Original schema (v0): 19 columns
Current schema (v2): 21 columns
Delta-rs expanded from 19 to 21 columns across 10,150 records without schema migration scripts or pipeline failures.
Selective Updates with Merge Operations
Merge operations combine updates and inserts in a single transaction based on matching conditions. This eliminates the need to process entire datasets when you only need to modify specific records, dramatically improving efficiency at scale.
To demonstrate this, let’s create a simple taxi trips table:
# Create initial Delta table with 5 trips
trips = pd.DataFrame({
'trip_id': [1, 2, 3, 4, 5],
'fare_amount': [15.5, 20.0, 18.3, 12.5, 25.0],
'payment_type': [1, 1, 2, 1, 2]
})
write_deltalake("data/trips_merge_demo", trips, mode="overwrite")
print("Initial trips:")
print(trips)
Output:
Initial trips:
trip_id fare_amount payment_type
0 1 15.5 1
1 2 20.0 1
2 3 18.3 2
3 4 12.5 1
4 5 25.0 2
Here are the updates we want to make:
Update trip 2: change fare from $20.00 to $22.00
Update trip 4: change fare from $12.50 to $13.80
Insert trip 6: new trip with fare $30.00
Insert trip 7: new trip with fare $16.50
Traditional Approach: Full Dataset Processing
Traditional workflows require loading complete datasets, identifying matches, and rewriting all records. This process becomes increasingly expensive as data grows:
# Traditional approach – load, modify, and rewrite everything
existing_df = trips.copy()
# Updates: manually locate and modify rows
existing_df.loc[existing_df['trip_id'] == 2, 'fare_amount'] = 22.0
existing_df.loc[existing_df['trip_id'] == 4, 'fare_amount'] = 13.8
# Inserts: create new rows and append
new_trips = pd.DataFrame({
'trip_id': [6, 7],
'fare_amount': [30.0, 16.5],
'payment_type': [1, 1]
})
updated_df = pd.concat([existing_df, new_trips], ignore_index=True)
# Rewrite entire dataset
updated_df.to_parquet("data/trips_traditional.parquet")
print(updated_df)
Output:
trip_id fare_amount payment_type
0 1 15.5 1
1 2 22.0 1 # Updated
2 3 18.3 2
3 4 13.8 1 # Updated
4 5 25.0 2
5 6 30.0 1 # Inserted
6 7 16.5 1 # Inserted
Delta-rs Approach: Upsert with Merge Operations
Delta-rs merge operations handle both updates and inserts in a single atomic operation, processing only affected records:
# Prepare changes: 2 updates + 2 inserts
changes = pd.DataFrame({
'trip_id': [2, 4, 6, 7],
'fare_amount': [22.0, 13.8, 30.0, 16.5],
'payment_type': [2, 2, 1, 1]
})
# Load Delta table
dt = DeltaTable("data/trips_merge_demo")
# Upsert operation: update existing, insert new
(
dt.merge(
source=changes,
predicate="target.trip_id = source.trip_id",
source_alias="source",
target_alias="target",
)
.when_matched_update(
updates={
"fare_amount": "source.fare_amount",
"payment_type": "source.payment_type",
}
)
.when_not_matched_insert(
updates={
"trip_id": "source.trip_id",
"fare_amount": "source.fare_amount",
"payment_type": "source.payment_type",
}
)
.execute()
)
# Verify results
result = dt.to_pandas().sort_values('trip_id').reset_index(drop=True)
print(result)
Output:
trip_id fare_amount payment_type
0 1 15.5 1
1 2 22.0 2 # Updated
2 3 18.3 2
3 4 13.8 2 # Updated
4 5 25.0 2
5 6 30.0 1 # Inserted
6 7 16.5 1 # Inserted
Delta-rs processed exactly 4 records (2 updates + 2 inserts) while pandas processed all 7 records. This efficiency compounds dramatically with larger datasets.
Multi-Engine Integration
Different teams often use different tools: pandas for exploration, DuckDB for SQL queries, Polars for performance. Multi-engine support lets all these tools access the same data directly without creating duplicates or writing conversion scripts.
Traditional Approach: Engine-Specific Optimization Requirements
Each engine needs different file optimizations that don’t transfer between tools:
Start with the original dataset:
# Traditional approach – Each engine needs different optimizations
data = {"payment_type": [1, 1, 2, 1, 2], "fare_amount": [15.5, 20.0, 18.3, 12.5, 25.0]}
df = pd.DataFrame(data)
The Pandas team optimizes for indexed lookups:
# Pandas team needs indexed Parquet for fast lookups
df.to_parquet("data/pandas_optimized.parquet", index=True)
pandas_result = pd.read_parquet("data/pandas_optimized.parquet")
print(f"Pandas: {len(pandas_result)} trips, avg ${pandas_result['fare_amount'].mean():.2f}")
Output:
Pandas: 5 trips, avg $17.66
The Polars team needs sorted data for predicate pushdown optimization:
# Polars team needs sorted columns for predicate pushdown
df.sort_values('payment_type').to_parquet("data/polars_optimized.parquet")
polars_result = pl.read_parquet("data/polars_optimized.parquet").select([
pl.len().alias("trips"), pl.col("fare_amount").mean().alias("avg_fare")
])
print(f"Polars: {polars_result}")
Polars: shape: (1, 2)
┌───────┬──────────┐
│ trips ┆ avg_fare │
│ — ┆ — │
│ u32 ┆ f64 │
╞═══════╪══════════╡
│ 5 ┆ 18.26 │
└───────┴──────────┘
The DuckDB team requires specific compression for query performance:
# DuckDB needs specific compression/statistics for query planning
df.to_parquet("data/duckdb_optimized.parquet", compression='zstd')
duckdb_result = duckdb.execute("""
SELECT COUNT(*) as trips, ROUND(AVG(fare_amount), 2) as avg_fare
FROM 'data/duckdb_optimized.parquet'
""").fetchone()
print(f"DuckDB: {duckdb_result[0]} trips, ${duckdb_result[1]} avg")
Output:
DuckDB: 5 trips, $18.26 avg
Delta-rs Approach: Universal Optimizations
Delta-rs provides built-in optimizations that benefit all engines simultaneously:
Create one optimized Delta table that serves all engines:
# Delta-rs approach – Universal optimizations for all engines
from deltalake import write_deltalake, DeltaTable
import polars as pl
import duckdb
# Create Delta table with built-in optimizations:
data = {"payment_type": [1, 1, 2, 1, 2], "fare_amount": [15.5, 20.0, 18.3, 12.5, 25.0]}
write_deltalake("data/universal_demo", pd.DataFrame(data))
Pandas benefits from Delta’s statistics for efficient filtering:
# Pandas gets automatic optimization benefits
dt = DeltaTable("data/universal_demo")
pandas_result = dt.to_pandas()
print(f"Pandas: {len(pandas_result)} trips, avg ${pandas_result['fare_amount'].mean():.2f}")
Output:
Pandas: 5 trips, avg $17.66
Polars leverages Delta’s column statistics for predicate pushdown:
# Polars gets predicate pushdown optimization automatically
polars_result = pl.read_delta("data/universal_demo").select([
pl.len().alias("trips"),
pl.col("fare_amount").mean().alias("avg_fare")
])
print(f"Polars: {polars_result}")
Output:
Polars: shape: (1, 2)
┌───────┬──────────┐
│ trips ┆ avg_fare │
│ — ┆ — │
│ u32 ┆ f64 │
╞═══════╪══════════╡
│ 5 ┆ 18.26 │
└───────┴──────────┘
DuckDB uses Delta’s statistics for query planning optimization:
# DuckDB gets optimized query plans from Delta statistics
duckdb_result = duckdb.execute("""
SELECT COUNT(*) as trips, ROUND(AVG(fare_amount), 2) as avg_fare
FROM delta_scan('data/universal_demo')
""").fetchone()
print(f"DuckDB: {duckdb_result[0]} trips, ${duckdb_result[1]} avg")
Output:
DuckDB: 5 trips, $17.66
One Delta table with universal optimizations benefiting all engines.
Automatic File Cleanup
Every data update creates new files while keeping old versions for time travel. Vacuum identifies files older than your retention period and safely deletes them, freeing storage space without affecting active data or recent history.
Traditional Approach: Manual Cleanup Scripts
Traditional workflows require custom scripts to manage file cleanup:
# Traditional approach – manual file management
import os
import glob
from datetime import datetime, timedelta
# Find old backup files manually
old_files = []
cutoff_date = datetime.now() – timedelta(days=7)
for file in glob.glob("data/taxi_backup_*.parquet"):
file_time = datetime.fromtimestamp(os.path.getmtime(file))
if file_time < cutoff_date:
old_files.append(file)
os.remove(file) # Manual cleanup with risk
Delta-rs Approach: Built-in Vacuum Operation
Delta-rs provides safe, automated cleanup through its vacuum() operation, which removes unused transaction files while preserving data integrity. Files become unused when:
• UPDATE operations create new versions, leaving old data files unreferenced
• DELETE operations remove data, making those files obsolete
• Failed transactions leave temporary files that were never committed
• Table optimization consolidates small files, making originals unnecessary
# Delta-rs vacuum removes unused files safely with ACID protection
from deltalake import DeltaTable
import os
def get_size(path):
"""Calculate total directory size in MB"""
total_size = 0
for dirpath, dirnames, filenames in os.walk(path):
for filename in filenames:
total_size += os.path.getsize(os.path.join(dirpath, filename))
return total_size / (1024 * 1024)
With our size calculation helper in place, let’s measure storage before and after vacuum:
dt = DeltaTable("data/taxi_delta_table")
# Measure storage before cleanup
before_size = get_size("data/taxi_delta_table")
# Safe cleanup – files only deleted if no active readers/writers
dt.vacuum(retention_hours=168) # Built-in safety: won't delete files in use
# Measure storage after cleanup
after_size = get_size("data/taxi_delta_table")
print(f"Delta vacuum completed safely")
print(f"Storage before: {before_size:.1f} MB")
print(f"Storage after: {after_size:.1f} MB")
print(f"Space reclaimed: {before_size – after_size:.1f} MB")
Output:
Delta vacuum completed safely
Storage before: 8.2 MB
Storage after: 5.7 MB
Space reclaimed: 2.5 MB
Delta vacuum removed 2.5 MB of obsolete file versions, reducing storage footprint by 30% while maintaining ACID transaction guarantees and time travel capabilities.
Conclusion
Delta-rs transforms the traditional pandas workflow by providing:
Incremental updates append only changed records without full rewrites
Time travel and versioning enable recovery and auditing without manual backups
Schema evolution handles column changes without breaking queries
Merge operations combine updates and inserts in single transactions
Multi-engine support lets pandas, DuckDB, and Polars access the same data
Automatic vacuum reclaims storage by removing obsolete file versions
The bridge from pandas prototyping to production data pipelines no longer requires complex infrastructure. Delta-rs provides the reliability and performance you need while maintaining the simplicity you want.
Related Tutorials
Alternative Scaling: Scaling Pandas Workflows with PySpark’s Pandas API for Spark-based approaches
Data Versioning: Version Control for Data and Models Using DVC for broader versioning strategies
DataFrame Performance: Polars vs. Pandas: A Fast, Multi-Core Alternative for DataFrame optimization techniques
Favorite
Delta Lake: Transform pandas Prototypes into Production Read More »









