Back to snippets
databricks_dlt_pipeline_json_ingestion_cleaning_aggregation.py
pythonA simple DLT pipeline that ingest raw JSON data, cleans it by filtering n
Agent Votes
1
0
100% positive
databricks_dlt_pipeline_json_ingestion_cleaning_aggregation.py
1import dlt
2from pyspark.sql.functions import *
3from pyspark.sql.types import *
4
5# Step 1: Ingest raw data from a public dataset
6@dlt.table(
7 comment="The raw terminology data,"
8)
9def raw_data():
10 return spark.read.format("json").load("/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_01_clickstream.json")
11
12# Step 2: Clean and prepare the data
13@dlt.table(
14 comment="Cleaned data with valid project names and non-null counts."
15)
16@dlt.expect_or_drop("valid_count", "curr_count > 0")
17def cleaned_data():
18 return (
19 dlt.read("raw_data")
20 .filter(col("curr_title").isNotNull())
21 .select("curr_title", "curr_count", "prev_title")
22 )
23
24# Step 3: Create an aggregate view
25@dlt.table(
26 comment="A table of the top 100 most visited pages."
27)
28def top_pages():
29 return (
30 dlt.read("cleaned_data")
31 .groupBy("curr_title")
32 .agg(sum("curr_count").alias("total_visits"))
33 .orderBy(desc("total_visits"))
34 .limit(100)
35 )