Generic selectors
Exact matches only
Search in title
Search in content
Post Type Selectors
Filter by Categories
About Article
Analyze Data
Archive
Best Practices
Better Outputs
Blog
Code Optimization
Code Quality
Command Line
Daily tips
Dashboard
Data Analysis & Manipulation
Data Engineer
Data Visualization
DataFrame
Delta Lake
DevOps
DuckDB
Environment Management
Feature Engineer
Git
Jupyter Notebook
LLM
LLM
Machine Learning
Machine Learning
Machine Learning & AI
Manage Data
MLOps
Natural Language Processing
NumPy
Pandas
Polars
PySpark
Python Tips
Python Utilities
Python Utilities
Scrape Data
SQL
Testing
Time Series
Tools
Visualization
Visualization & Reporting
Workflow & Automation
Workflow Automation

From Complex SQL to Simple Merges: Delta Lake’s Upsert Solution

Table of Contents

From Complex SQL to Simple Merges: Delta Lake’s Upsert Solution

Traditionally, implementing upsert (update or insert) logic requires separate UPDATE and INSERT statements or complex SQL. This approach can be error-prone and inefficient, especially for large datasets. Delta Lake’s merge operation solves this problem by allowing you to specify different actions for matching and non-matching records in a single, declarative statement.

Here’s an example that demonstrates the power and simplicity of Delta Lake’s merge operation:

First, let’s set up our initial data:

# Create sample data for 'customers' DataFrame
customers_data = [
    (1, "John Doe", "john@example.com", "2023-01-01 10:00:00"),
    (2, "Jane Smith", "jane@example.com", "2023-01-02 11:00:00"),
    (3, "Bob Johnson", "bob@example.com", "2023-01-03 12:00:00"),
]
customers = spark.createDataFrame(
    customers_data, ["customer_id", "name", "email", "last_updated"]
)

# Create sample data for 'updates' DataFrame
updates_data = [
    (2, "Jane Doe", "jane.doe@example.com"),  # Existing customer with updates
    (3, "Bob Johnson", "bob@example.com"),  # Existing customer without changes
    (4, "Alice Brown", "alice@example.com"),  # New customer
]
updates = spark.createDataFrame(updates_data, ["customer_id", "name", "email"])

# Show the initial data
print("Initial Customers:")
customers.show()
print("Updates:")
updates.show()

Output:

Initial Customers:
+-----------+-----------+----------------+-------------------+
|customer_id|       name|           email|       last_updated|
+-----------+-----------+----------------+-------------------+
|          1|   John Doe|john@example.com|2023-01-01 10:00:00|
|          2| Jane Smith|jane@example.com|2023-01-02 11:00:00|
|          3|Bob Johnson| bob@example.com|2023-01-03 12:00:00|
+-----------+-----------+----------------+-------------------+

Updates:
+-----------+-----------+--------------------+
|customer_id|       name|               email|
+-----------+-----------+--------------------+
|          2|   Jane Doe|jane.doe@example.com|
|          3|Bob Johnson|     bob@example.com|
|          4|Alice Brown|   alice@example.com|
+-----------+-----------+--------------------+

Next, we create a Delta table from our initial customer data:

# Define the path where you want to save the Delta table
delta_table_path = "customers_delta"

# Write the DataFrame as a Delta table
customers.write.format("delta").mode("overwrite").save(delta_table_path)

# Create a DeltaTable object
customers_delta = DeltaTable.forPath(spark, delta_table_path)

Now, here’s the key part – the merge operation that handles both updates and inserts in a single statement:

customers_delta.alias("target").merge(
    updates.alias("source"),
    "target.customer_id = source.customer_id"
).whenMatchedUpdate(set={
    "name": "source.name",
    "email": "source.email",
    "last_updated": "current_timestamp()"
}).whenNotMatchedInsert(values={
    "customer_id": "source.customer_id",
    "name": "source.name",
    "email": "source.email",
    "last_updated": "current_timestamp()"
}).execute()

This single operation:

  1. Matches records based on customer_id
  2. Updates existing records with new name and email
  3. Inserts new records for non-matching customer_ids
  4. Updates the ‘last_updated’ timestamp for all modified records

Finally, we can verify the results:

print("Updated Customers Delta Table:")
customers_delta.toDF().show()

Output:

Updated Customers Delta Table:
                                                                                
+-----------+-----------+--------------------+--------------------+
|customer_id|       name|               email|        last_updated|
+-----------+-----------+--------------------+--------------------+
|          2|   Jane Doe|jane.doe@example.com|2024-08-20 16:05:...|
|          3|Bob Johnson|     bob@example.com|2024-08-20 16:05:...|
|          4|Alice Brown|   alice@example.com|2024-08-20 16:05:...|
|          1|   John Doe|    john@example.com| 2023-01-01 10:00:00|
+-----------+-----------+--------------------+--------------------+

The output shows that:

  • Jane Smith (ID 2) was updated to Jane Doe with a new email
  • Bob Johnson (ID 3) had his timestamp updated
  • Alice Brown (ID 4) was inserted as a new record
  • John Doe (ID 1) remained unchanged

This merge operation eliminates the need for separate UPDATE and INSERT statements, reduces the risk of partial updates, and improves performance by processing the data in a single pass.

Leave a Comment

Your email address will not be published. Required fields are marked *

0
    0
    Your Cart
    Your cart is empty
    Scroll to Top

    Work with Khuyen Tran

    Work with Khuyen Tran