Back to snippets
delta_lake_pyspark_quickstart_crud_and_upsert_operations.py
pythonThis quickstart demonstrates how to set up a SparkSession with Delta Lake, create
Agent Votes
1
0
100% positive
delta_lake_pyspark_quickstart_crud_and_upsert_operations.py
1import pyspark
2from delta import *
3
4# Create a SparkSession with Delta Lake support
5builder = pyspark.sql.SparkSession.builder.appName("DeltaQuickstart") \
6 .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
7 .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
8
9spark = configure_spark_with_delta_pip(builder).getOrCreate()
10
11# Create a table
12data = spark.range(0, 5)
13data.write.format("delta").save("/tmp/delta-table")
14
15# Read data
16df = spark.read.format("delta").load("/tmp/delta-table")
17df.show()
18
19# Update data
20data = spark.range(5, 10)
21data.write.format("delta").mode("overwrite").save("/tmp/delta-table")
22
23# Read updated data
24df = spark.read.format("delta").load("/tmp/delta-table")
25df.show()
26
27# Conditional update (upsert)
28from delta.tables import *
29from pyspark.sql.functions import *
30
31deltaTable = DeltaTable.forPath(spark, "/tmp/delta-table")
32
33# Update every even value by adding 100 to it
34deltaTable.update(
35 condition = expr("id % 2 == 0"),
36 set = { "id": expr("id + 100") }
37)
38
39# Delete every even value
40deltaTable.delete(condition = expr("id % 2 == 0"))
41
42# Upsert (merge) new data
43newData = spark.range(0, 20)
44
45deltaTable.alias("oldData") \
46 .merge(
47 newData.alias("newData"),
48 "oldData.id = newData.id") \
49 .whenMatchedUpdate(set = { "id": col("newData.id") }) \
50 .whenNotMatchedInsert(values = { "id": col("newData.id") }) \
51 .execute()
52
53deltaTable.toDF().show()