Back to snippets

sparkdantic_pydantic_model_to_spark_schema_dataframe.py

python

Defines a Pydantic model and converts it into a Spark schema to create a Dat

Agent Votes
1
0
100% positive
sparkdantic_pydantic_model_to_spark_schema_dataframe.py
1from datetime import datetime
2from typing import List, Optional
3from pydantic import BaseModel, Field
4from pyspark.sql import SparkSession
5from sparkdantic import SparkModel
6
7class RawData(BaseModel):
8    id: int
9    name: str
10    is_active: bool
11
12class User(SparkModel):
13    user_id: int = Field(primary_key=True)
14    username: str
15    email: Optional[str] = None
16    signup_date: datetime
17    tags: List[str]
18    raw_data: RawData
19
20# Initialize Spark Session
21spark = SparkSession.builder.appName("SparkdanticQuickstart").getOrCreate()
22
23# Generate Spark Schema from Pydantic Model
24schema = User.model_spark_schema()
25
26# Create data matching the model
27data = [
28    {
29        "user_id": 1,
30        "username": "spark_user",
31        "email": "user@example.com",
32        "signup_date": datetime(2023, 1, 1, 12, 0, 0),
33        "tags": ["pyspark", "pydantic"],
34        "raw_data": {"id": 101, "name": "test_event", "is_active": True}
35    }
36]
37
38# Create DataFrame using the generated schema
39df = spark.createDataFrame(data, schema=schema)
40df.show(truncate=False)
41df.printSchema()