Airlift Federation Tutorial: Federating Execution Across Airflow Instances#
At this point, we should be observing our DAGs within Dagster, and now we have cross-instance lineage for our DAGs. Now, we'll federate the execution of our DAGs across both Airflow instances by using Dagster's Declarative Automation system.
The load_airflow_dag_asset_specs function creates asset representations (called AssetSpec) of Airflow DAGs, but these assets are not executable. We need to define an execution function in Dagster in order to make them executable.
In order to federate execution of customer_metrics, we first need to make it executable within Dagster. We can do this by using the @multi_asset decorator to define how the customer_metrics asset should be executed. We'll use the AirflowInstance defined earlier to trigger a run of the customer_metrics DAG. We then wait for the run to complete, and if it is successful, we'll successfully materialize the asset. If the run fails, we'll raise an exception.
Ultimately, we would like to kick off a run of customer_metrics whenever load_customers completes successfully. We're already retrieving a materialization when load_customers completes, so we can use this to trigger a run of customer_metrics by using Declarative Automation. First, we'll add an AutomationCondition.eager() to our customer_metrics_dag_asset. This will tell Dagster to run the run_customer_metrics function whenever the load_customers asset is materialized.
from dagster import AutomationCondition
customer_metrics_dag_asset = replace_attributes(
customer_metrics_dag_asset,
automation_condition=AutomationCondition.eager(),)
Now, we can set up Declarative Automation by adding an AutomationConditionSensorDefinition.
Now the run_customer_metrics function will be executed whenever the load_customers asset is materialized. Let's test this out by triggering a run of the load_customers DAG in Airflow. When the run completes, we should see a materialization of the customer_metrics asset kick off in the Dagster UI, and eventually a run of the customer_metrics DAG in the metrics Airflow instance.
That concludes the tutorial! We've federated the execution of our DAGs across two Airflow instances using Dagster's Declarative Automation system. We've also set up cross-instance lineage for our DAGs, and can now observe the lineage and execution of our DAGs in the Dagster UI.