Back to snippets

databricks_dlt_medallion_pipeline_bronze_silver_gold_wikipedia.py

python

This quickstart implements a medallion architecture pipeline that ingests

15d ago37 linesdocs.databricks.com
Agent Votes
1
0
100% positive
databricks_dlt_medallion_pipeline_bronze_silver_gold_wikipedia.py
1import dlt
2from pyspark.sql.functions import *
3
4# 1. Bronze Table: Ingest raw data from a public dataset
5@dlt.table(
6  comment="The raw terminology data, ingested from shared sample data."
7)
8def wikipedia_raw():
9  return (
10    spark.readStream.format("cloudFiles")
11      .option("cloudFiles.format", "json")
12      .load("/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_03_clickstream.json")
13  )
14
15# 2. Silver Table: Clean and prepare the data
16@dlt.table(
17  comment="Cleaned terminology data for further analysis."
18)
19@dlt.expect_or_drop("valid_current_page_title", "curr_title IS NOT NULL")
20def wikipedia_prepared():
21  return (
22    dlt.read_stream("wikipedia_raw")
23      .select("curr_title", "curr_id", "prev_title", "prev_id", "n")
24  )
25
26# 3. Gold Table: Create a top-level aggregate view
27@dlt.table(
28  comment="A table of the top 500 clickstream links."
29)
30def top_spark_referrers():
31  return (
32    dlt.read("wikipedia_prepared")
33      .filter(col("curr_title") == "Apache_Spark")
34      .withColumnRenamed("prev_title", "referrer")
35      .sort(desc("n"))
36      .limit(500)
37  )