At this point, we should have finished the setup step, and now we have the example code setup with a fresh virtual environment, and Airflow running locally. Now, we can start writing Dagster code.
We call the first stage of migration from Airflow to Dagster the "Peering" stage, at which we will "peer" the Airflow instance with a Dagster code location, which will create an asset representation of each Airflow DAG that you can view in Dagster. This process does not require any changes to your Airflow instance.
First, you will want a new shell and navigate to the same directory. You will need to set up the dagster-airlift package in your Dagster environment:
Next, create a Definitions object using build_defs_from_airflow_instance. You can use the empty tutorial_example/dagster_defs/definitions.py file as a starting point:
from dagster_airlift.core import(
AirflowBasicAuthBackend,
AirflowInstance,
build_defs_from_airflow_instance,)
defs = build_defs_from_airflow_instance(
airflow_instance=AirflowInstance(# other backends available (e.g. MwaaSessionAuthBackend)
auth_backend=AirflowBasicAuthBackend(
webserver_url="http://localhost:8080",
username="admin",
password="admin",),
name="airflow_instance_one",))
This function creates:
An external asset representing each DAG. This asset is marked as materialized whenever a DAG run completes.
A sensor that polls the Airflow instance for operational information. This sensor is responsible for creating materializations when a DAG executes. The sensor must remain on in order to properly update execution status.
Let's set up some environment variables, and then point Dagster to see the asset created from our Airflow DAG:
# Set up environment variables to point to the airlift-migration-tutorial directory on your machineexportTUTORIAL_EXAMPLE_DIR=$(pwd)exportTUTORIAL_DBT_PROJECT_DIR="$TUTORIAL_EXAMPLE_DIR/tutorial_example/shared/dbt"exportAIRFLOW_HOME="$TUTORIAL_EXAMPLE_DIR/.airflow_home"
dagster dev -f tutorial_example/dagster_defs/definitions.py
Let's kick off a run of the reubild_customers_list DAG in Airflow.
airflow dags backfill rebuild_customers_list --start-date $(shell date +"%Y-%m-%d")
When this run has completed in Airflow, we should be able to navigate to the Dagster UI, and see that the Dagster has registered a materialization corresponding to that successful run.
Run the following command to clean the Airflow and Dagster run history (we just do this so we can run the same example backfill in the future). Under the hood, this just deletes runs from Airflow and asset materializations from Dagster.
make clean
Note: When the code location loads, Dagster will query the Airflow REST API in order to build a representation of your DAGs. In order for Dagster to reflect changes to your DAGs, you will need to reload your code location.
Once you have peered your Airflow DAGs in Dagster, regardless of migration progress, you can begin to add asset checks to your Dagster code. In Dagster, Asset checks can be used to validate the quality of your data assets, and can provide additional observability and value on top of your Airflow DAG even before migration starts.
Asset checks can both act as useful user acceptance tests to ensure that any migration steps taken are successful, as well as outlive the migration itself.
For example, we're going to add an asset check to ensure that the final customers CSV output exists, and has a nonzero number of rows.
import os
from pathlib import Path
from dagster import AssetCheckResult, AssetCheckSeverity, AssetKey, Definitions, asset_check
from dagster_airlift.core import(
AirflowBasicAuthBackend,
AirflowInstance,
build_defs_from_airflow_instance,)# Attach a check to the DAG representation asset, which will be executed by Dagster# any time the DAG is run in Airflow@asset_check(asset=AssetKey(["airflow_instance_one","dag","rebuild_customers_list"]))defvalidate_exported_csv()-> AssetCheckResult:
csv_path = Path(os.environ["TUTORIAL_EXAMPLE_DIR"])/"customers.csv"ifnot csv_path.exists():return AssetCheckResult(passed=False, description=f"Export CSV {csv_path} does not exist")
rows =len(csv_path.read_text().split("\n"))if rows <2:return AssetCheckResult(
passed=False,
description=f"Export CSV {csv_path} is empty",
severity=AssetCheckSeverity.WARN,)return AssetCheckResult(
passed=True,
description=f"Export CSV {csv_path} exists",
metadata={"rows": rows},)
defs = build_defs_from_airflow_instance(
airflow_instance=AirflowInstance(# other backends available (e.g. MwaaSessionAuthBackend)
auth_backend=AirflowBasicAuthBackend(
webserver_url="http://localhost:8080",
username="admin",
password="admin",),
name="airflow_instance_one",),
defs=Definitions(asset_checks=[validate_exported_csv]),)
Once we reload the code location, we'll see a tab checks indicating the presence of an asset check on our rebuild_customers_list asset.
Let's run the backfill again:
airflow dags backfill rebuild_customers_list --start-date $(shell date +"%Y-%m-%d")
And we'll see that the asset check executed successfully in Dagster (indicated by the green check mark).
Let's again wipe materializations and runs for tutorial purposes.