Observe the Airflow DAG
When migrating an entire DAG at once, you must create assets that map to the entire DAG. To do this, you can use assets_with_dag_mappings, which ensures that each mapped asset receives a materialization when the entire DAG completes.
For our rebuild_customers_list DAG, let's take a look at what the new observation code looks like:
import os
from pathlib import Path
from dagster import AssetExecutionContext, AssetSpec, Definitions
from dagster_airlift.core import (
    AirflowBasicAuthBackend,
    AirflowInstance,
    assets_with_dag_mappings,
    build_defs_from_airflow_instance,
)
from dagster_dbt import DbtCliResource, DbtProject, dbt_assets
def dbt_project_path() -> Path:
    env_val = os.getenv("TUTORIAL_DBT_PROJECT_DIR")
    assert env_val, "TUTORIAL_DBT_PROJECT_DIR must be set"
    return Path(env_val)
@dbt_assets(
    manifest=dbt_project_path() / "target" / "manifest.json",
    project=DbtProject(dbt_project_path()),
)
def dbt_project_assets(context: AssetExecutionContext, dbt: DbtCliResource):
    yield from dbt.cli(["build"], context=context).stream()
# Instead of mapping assets to individual tasks, we map them to the entire DAG.
mapped_assets = assets_with_dag_mappings(
    dag_mappings={
        "rebuild_customers_list": [
            AssetSpec(key=["raw_data", "raw_customers"]),
            dbt_project_assets,
            AssetSpec(key="customers_csv", deps=["customers"]),
        ],
    },
)
defs = build_defs_from_airflow_instance(
    airflow_instance=AirflowInstance(
        auth_backend=AirflowBasicAuthBackend(
            webserver_url="http://localhost:8080",
            username="admin",
            password="admin",
        ),
        name="airflow_instance_one",
    ),
    defs=Definitions(
        assets=mapped_assets,
        resources={"dbt": DbtCliResource(project_dir=dbt_project_path())},
    ),
)
Now, instead of getting a materialization when a particular task completes, each mapped asset will receive a materialization when the entire DAG completes.
Next steps
In the next step, "Migrate DAG-mapped assets", we will proxy execution for the entire Airflow DAG in Dagster.