Define a Dagster asset that invokes subprocess
note
This is part one of the Using Dagster Pipes tutorial. If you are looking for how to modify your existing code that is already being orchestrated by Dagster, you can jump to part 2, Modify external code.
note
In this part of the tutorial, you'll create a Dagster asset that, in its execution function, opens a Dagster pipes session and invokes a subprocess that executes some external code.
Step 1: Create a Dagster project
First we will create a new Dagster project:
uvx create-dagster@latest project external_pipeline
Step 2: Scaffold and define the asset
Next, we can scaffold the asset file:
dg scaffold defs dagster.assets dagster_code.py
Next, you’ll define the asset. Copy and paste the following into the file src/external_pipeline/defs/dagster_code.py:
src/external_pipeline/defs/dagster_code.py
import shutil
import dagster as dg
@dg.asset
def subprocess_asset(
context: dg.AssetExecutionContext, pipes_subprocess_client: dg.PipesSubprocessClient
) -> dg.MaterializeResult:
cmd = [shutil.which("python"), dg.file_relative_path(__file__, "external_code.py")]
return pipes_subprocess_client.run(
command=cmd, context=context
).get_materialize_result()