Traditionally, implementing upsert (update or insert) logic requires separate UPDATE and INSERT statements or complex SQL. This approach can be error-prone and inefficient, especially for large datasets. Delta Lake’s merge operation solves this problem by allowing you to specify different actions for matching and non-matching records in a single, declarative statement.
Here’s an example that demonstrates the power and simplicity of Delta Lake’s merge operation:
First, let’s set up our initial data:
# Create sample data for 'customers' DataFrame
customers_data = [
(1, "John Doe", "john@example.com", "2023-01-01 10:00:00"),
(2, "Jane Smith", "jane@example.com", "2023-01-02 11:00:00"),
(3, "Bob Johnson", "bob@example.com", "2023-01-03 12:00:00"),
]
customers = spark.createDataFrame(
customers_data, ["customer_id", "name", "email", "last_updated"]
)
# Create sample data for 'updates' DataFrame
updates_data = [
(2, "Jane Doe", "jane.doe@example.com"), # Existing customer with updates
(3, "Bob Johnson", "bob@example.com"), # Existing customer without changes
(4, "Alice Brown", "alice@example.com"), # New customer
]
updates = spark.createDataFrame(updates_data, ["customer_id", "name", "email"])
# Show the initial data
print("Initial Customers:")
customers.show()
print("Updates:")
updates.show()
Output:
Initial Customers:
+-----------+-----------+----------------+-------------------+
|customer_id| name| email| last_updated|
+-----------+-----------+----------------+-------------------+
| 1| John Doe|john@example.com|2023-01-01 10:00:00|
| 2| Jane Smith|jane@example.com|2023-01-02 11:00:00|
| 3|Bob Johnson| bob@example.com|2023-01-03 12:00:00|
+-----------+-----------+----------------+-------------------+
Updates:
+-----------+-----------+--------------------+
|customer_id| name| email|
+-----------+-----------+--------------------+
| 2| Jane Doe|jane.doe@example.com|
| 3|Bob Johnson| bob@example.com|
| 4|Alice Brown| alice@example.com|
+-----------+-----------+--------------------+
Next, we create a Delta table from our initial customer data:
# Define the path where you want to save the Delta table
delta_table_path = "customers_delta"
# Write the DataFrame as a Delta table
customers.write.format("delta").mode("overwrite").save(delta_table_path)
# Create a DeltaTable object
customers_delta = DeltaTable.forPath(spark, delta_table_path)
Now, here’s the key part – the merge operation that handles both updates and inserts in a single statement:
customers_delta.alias("target").merge(
updates.alias("source"),
"target.customer_id = source.customer_id"
).whenMatchedUpdate(set={
"name": "source.name",
"email": "source.email",
"last_updated": "current_timestamp()"
}).whenNotMatchedInsert(values={
"customer_id": "source.customer_id",
"name": "source.name",
"email": "source.email",
"last_updated": "current_timestamp()"
}).execute()
This single operation:
- Matches records based on customer_id
- Updates existing records with new name and email
- Inserts new records for non-matching customer_ids
- Updates the ‘last_updated’ timestamp for all modified records
Finally, we can verify the results:
print("Updated Customers Delta Table:")
customers_delta.toDF().show()
Output:
Updated Customers Delta Table:
+-----------+-----------+--------------------+--------------------+
|customer_id| name| email| last_updated|
+-----------+-----------+--------------------+--------------------+
| 2| Jane Doe|jane.doe@example.com|2024-08-20 16:05:...|
| 3|Bob Johnson| bob@example.com|2024-08-20 16:05:...|
| 4|Alice Brown| alice@example.com|2024-08-20 16:05:...|
| 1| John Doe| john@example.com| 2023-01-01 10:00:00|
+-----------+-----------+--------------------+--------------------+
The output shows that:
- Jane Smith (ID 2) was updated to Jane Doe with a new email
- Bob Johnson (ID 3) had his timestamp updated
- Alice Brown (ID 4) was inserted as a new record
- John Doe (ID 1) remained unchanged
This merge operation eliminates the need for separate UPDATE and INSERT statements, reduces the risk of partial updates, and improves performance by processing the data in a single pass.