This guide covers using dagster-airlift to migrate an Airflow DAG to Dagster on apache-airflow below version 2.
Many APIs within the dagster-airlift package make use of Airflow's stable REST API, which was added in Airflow 2.0. However, we still enable a migration process for Airflow 1.x users.
This guide will cover the migration process using the same base example as the tutorial.
We recommend following the tutorial in order to understand the concepts and steps involved in the migration process, and then using this guide to apply those steps to an Airflow 1.x environment.
To begin proxying tasks in a DAG, first you will need a file to track proxying state. In your Airflow DAG directory, create a proxied_state folder, and in it create a yaml file with the same name as your DAG. The included example at airflow_dags/proxied_state is used by make airflow_run, and can be used as a template for your own proxied state files.
Given our example DAG rebuild_customers_list with three tasks, load_raw_customers, run_dbt_model, and export_customers, proxied_state/rebuild_customers_list.yaml should look like the following:
Next, you will need to modify your Airflow DAG to make it aware of the proxied state. This is already done in the example DAG:
# Dags file can be found at tutorial_example/airflow_dags/dags.pyfrom pathlib import Path
from airflow import DAG
from dagster_airlift.in_airflow import proxying_to_dagster
from dagster_airlift.in_airflow.proxied_state import load_proxied_state_from_yaml
dag = DAG("rebuild_customers_list",...)...# Set this to True to begin the proxying process
PROXYING =Falseif PROXYING:
proxying_to_dagster(
global_vars=globals(),
proxied_state=load_proxied_state_from_yaml(Path(__file__).parent /"proxied_state"),)
Set PROXYING to True or eliminate the if statement.
The DAG will now display its proxied state in the Airflow UI. (There is some latency as Airflow evaluates the Python file periodically.)
We'll now create Dagster assets that correspond to each Airflow task. First, since Dagster provides out of the box integration with dbt, we'll use dagster-dbt to create assets for the build_dbt_models task in our tutorial_example/dagster_defs/definitions.py file:
import os
from pathlib import Path
from dagster import AssetExecutionContext
from dagster_dbt import DbtCliResource, DbtProject, dbt_assets
defdbt_project_path()-> Path:
env_val = os.getenv("TUTORIAL_DBT_PROJECT_DIR")assert env_val,"TUTORIAL_DBT_PROJECT_DIR must be set"return Path(env_val)@dbt_assets(
manifest=dbt_project_path()/"target"/"manifest.json",
project=DbtProject(dbt_project_path()),)defdbt_project_assets(context: AssetExecutionContext, dbt: DbtCliResource):yieldfrom dbt.cli(["build"], context=context).stream()
Now, we'll mark our dbt_project_assets as being mapped from Airflow:
from dagster_airlift.core import assets_with_task_mappings
mapped_assets = assets_with_task_mappings(
dag_id="rebuild_customers_list",
task_mappings={"build_dbt_models":# load rich set of assets from dbt project[dbt_project_assets],},)
The assets_with_task_mappings function adds some metadata to each passed-in asset which, over the wire in Airflow, we'll use to determine which assets to execute in Dagster.
We'll provide the mapped assets to a Definitions object in our tutorial_example/dagster_defs/definitions.py file:
from dagster import Definitions
defs = Definitions(
assets=mapped_assets, resources={"dbt": DbtCliResource(project_dir=dbt_project_path())})
Note how this differs from the original migration tutorial; we're not using build_defs_from_airflow_instance, which relies on the REST API.
Finally, we'll mark the build_dbt_models task as proxied in the proxied state YAML file:
Important: It may take up to 30 seconds for the proxied state in the Airflow UI to reflect this change. You must subsequently reload the definitions in Dagster via the UI or by restarting dagster dev.
You can now run the rebuild_customers_list DAG in Airflow, and the build_dbt_models task will be executed in a Dagster run:
To recap, we've covered the process of migrating an Airflow 1.x DAG to Dagster using dagster-airlift. We've made clearer what functionality works wth Airflow < 2.0, and what does not. We've shown how to create Dagster assets that correspond to Airflow tasks, and how to mark those tasks as proxied in the proxied state YAML file.