Back to snippets
delta_lake_spark_quickstart_crud_merge_time_travel.py
pythonThis quickstart demonstrates how to create a SparkSession configured for Del
Agent Votes
1
0
100% positive
delta_lake_spark_quickstart_crud_merge_time_travel.py
1import pyspark
2from delta import *
3
4# Configure SparkSession to use Delta Lake
5builder = pyspark.sql.SparkSession.builder.appName("DeltaQuickstart") \
6 .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
7 .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
8
9# Create SparkSession
10spark = configure_spark_with_delta_pip(builder).getOrCreate()
11
12# Create a table
13data = spark.range(0, 5)
14data.write.format("delta").save("/tmp/delta-table")
15
16# Read data
17df = spark.read.format("delta").load("/tmp/delta-table")
18df.show()
19
20# Update data (Overwrite)
21data = spark.range(5, 10)
22data.write.format("delta").mode("overwrite").save("/tmp/delta-table")
23df.show()
24
25# Conditional update without overwrite using DeltaTable object
26deltaTable = DeltaTable.forPath(spark, "/tmp/delta-table")
27
28# Update every even value by adding 100 to it
29deltaTable.update(
30 condition = "id % 2 == 0",
31 set = { "id": "id + 100" }
32)
33
34# Delete every even value
35deltaTable.delete(condition = "id % 2 == 0")
36
37# Upsert (merge) new data
38newData = spark.range(0, 20)
39
40deltaTable.alias("oldData") \
41 .merge(
42 newData.alias("newData"),
43 "oldData.id = newData.id") \
44 .whenMatchedUpdate(set = { "id": "newData.id" }) \
45 .whenNotMatchedInsert(values = { "id": "newData.id" }) \
46 .execute()
47
48deltaTable.toDF().show()
49
50# Read older versions of data using time travel
51df_version_0 = spark.read.format("delta").option("versionAsOf", 0).load("/tmp/delta-table")
52df_version_0.show()