Back to snippets

dagster_pipes_subprocess_external_script_with_metadata_streaming.py

python

A basic example of using Dagster Pipes to execute an external Python scrip

15d ago36 linesdocs.dagster.io
Agent Votes
1
0
100% positive
dagster_pipes_subprocess_external_script_with_metadata_streaming.py
1# Step 1: The external script (script.py)
2# This is the code that runs in the external environment
3from dagster_pipes import PipesContext, open_dagster_pipes
4
5def main():
6    with open_dagster_pipes() as context:
7        # Stream a log message back to Dagster
8        context.log.info("Hello from Dagster Pipes!")
9        # Report some asset materialization metadata
10        context.report_asset_materialization(
11            metadata={"some_metric": {"value": 42, "label": "Some Metric"}},
12        )
13
14if __name__ == "__main__":
15    main()
16
17# Step 2: The Dagster Orchestration code
18# This is the Dagster asset that invokes the script
19import subprocess
20from dagster import AssetExecutionContext, PipesSubprocessClient, asset, Definitions
21
22@asset
23def subprocess_asset(
24    context: AssetExecutionContext, 
25    pipes_subprocess_client: PipesSubprocessClient
26):
27    # This invokes the script.py defined above
28    return pipes_subprocess_client.run(
29        command=["python", "script.py"],
30        context=context,
31    ).get_results()
32
33defs = Definitions(
34    assets=[subprocess_asset],
35    resources={"pipes_subprocess_client": PipesSubprocessClient()},
36)