Invoking a PartitionedConfig object directly invokes the decorated function.
If you want to check whether the generated run config is valid for the config of a job, you can use the validate_run_config function.
from dagster import validate_run_config, daily_partitioned_config
from datetime import datetime
@daily_partitioned_config(start_date=datetime(2020,1,1))defmy_partitioned_config(start: datetime, _end: datetime):return{"ops":{"process_data_for_date":{"config":{"date": start.strftime("%Y-%m-%d")}}}}deftest_my_partitioned_config():# assert that the decorated function returns the expected output
run_config = my_partitioned_config(datetime(2020,1,3), datetime(2020,1,4))assert run_config =={"ops":{"process_data_for_date":{"config":{"date":"2020-01-03"}}}}# assert that the output of the decorated function is valid configuration for the# partitioned_op_job jobassert validate_run_config(partitioned_op_job, run_config)
If you want to test that a PartitionedConfig creates the partitions you expect, use the get_partition_keys or get_run_config_for_partition_key functions:
from dagster import Config, OpExecutionContext
@daily_partitioned_config(start_date=datetime(2020,1,1), minute_offset=15)defmy_offset_partitioned_config(start: datetime, _end: datetime):return{"ops":{"process_data":{"config":{"start": start.strftime("%Y-%m-%d-%H:%M"),"end": _end.strftime("%Y-%m-%d-%H:%M"),}}}}classProcessDataConfig(Config):
start:str
end:str@opdefprocess_data(context: OpExecutionContext, config: ProcessDataConfig):
s = config.start
e = config.end
context.log.info(f"processing data for {s} - {e}")@job(config=my_offset_partitioned_config)defdo_more_stuff_partitioned():
process_data()deftest_my_offset_partitioned_config():# test that the partition keys are what you expect
keys = my_offset_partitioned_config.get_partition_keys()assert keys[0]=="2020-01-01"assert keys[1]=="2020-01-02"# test that the run_config for a partition is valid for partitioned_op_job
run_config = my_offset_partitioned_config.get_run_config_for_partition_key(keys[0])assert validate_run_config(do_more_stuff_partitioned, run_config)# test that the contents of run_config are what you expectassert run_config =={"ops":{"process_data":{"config":{"start":"2020-01-01-00:15","end":"2020-01-02-00:15"}}}}
To run a partitioned job in-process on a particular partition, supply a value for the partition_key argument of JobDefinition.execute_in_process. For example: