Ask AI

You are viewing an unreleased or outdated version of the documentation

Jobs

A Job binds a Graph and the resources it needs to be executable.

Jobs are created by calling GraphDefinition.to_job() on a graph instance, or using the job decorator.

@dagster.job(compose_fn=None, *, name=None, description=None, resource_defs=None, config=None, tags=None, run_tags=None, metadata=None, logger_defs=None, executor_def=None, hooks=None, op_retry_policy=None, partitions_def=None, input_values=None)[source]

Creates a job with the specified parameters from the decorated graph/op invocation function.

Using this decorator allows you to build an executable job by writing a function that invokes ops (or graphs).

Parameters:
  • (Callable[... (compose_fn) – The decorated function. The body should contain op or graph invocations. Unlike op functions, does not accept a context argument.

  • Any] – The decorated function. The body should contain op or graph invocations. Unlike op functions, does not accept a context argument.

  • name (Optional[str]) – The name for the Job. Defaults to the name of the this graph.

  • resource_defs (Optional[Mapping[str, object]]) – Resources that are required by this graph for execution. If not defined, io_manager will default to filesystem.

  • config

    Describes how the job is parameterized at runtime.

    If no value is provided, then the schema for the job’s run config is a standard format based on its ops and resources.

    If a dictionary is provided, then it must conform to the standard config schema, and it will be used as the job’s run config for the job whenever the job is executed. The values provided will be viewable and editable in the Dagster UI, so be careful with secrets.

    If a RunConfig object is provided, then it will be used directly as the run config for the job whenever the job is executed, similar to providing a dictionary.

    If a ConfigMapping object is provided, then the schema for the job’s run config is determined by the config mapping, and the ConfigMapping, which should return configuration in the standard format to configure the job.

    If a PartitionedConfig object is provided, then it defines a discrete set of config values that can parameterize the job, as well as a function for mapping those values to the base config. The values provided will be viewable and editable in the Dagster UI, so be careful with secrets.

  • tags (Optional[Mapping[str, object]]) – A set of key-value tags that annotate the job and can be used for searching and filtering in the UI. Values that are not already strings will be serialized as JSON. If run_tags is not set, then the content of tags will also be automatically appended to the tags of any runs of this job.

  • run_tags (Optional[Mapping[str, object]]) – A set of key-value tags that will be automatically attached to runs launched by this job. Values that are not already strings will be serialized as JSON. These tag values may be overwritten by tag values provided at invocation time. If run_tags is set, then tags are not automatically appended to the tags of any runs of this job.

  • metadata (Optional[Dict[str, RawMetadataValue]]) – Arbitrary information that will be attached to the JobDefinition and be viewable in the Dagster UI. Keys must be strings, and values must be python primitive types or one of the provided MetadataValue types

  • logger_defs (Optional[Dict[str, LoggerDefinition]]) – A dictionary of string logger identifiers to their implementations.

  • executor_def (Optional[ExecutorDefinition]) – How this Job will be executed. Defaults to multiprocess_executor .

  • op_retry_policy (Optional[RetryPolicy]) – The default retry policy for all ops in this job. Only used if retry policy is not defined on the op definition or op invocation.

  • partitions_def (Optional[PartitionsDefinition]) – Defines a discrete set of partition keys that can parameterize the job. If this argument is supplied, the config argument can’t also be supplied.

  • input_values (Optional[Mapping[str, Any]]) – A dictionary that maps python objects to the top-level inputs of a job.

Examples

@op
def return_one():
    return 1

@op
def add_one(in1):
    return in1 + 1

@job
def job1():
    add_one(return_one())
class dagster.JobDefinition(*, graph_def, resource_defs=None, executor_def=None, logger_defs=None, name=None, config=None, description=None, partitions_def=None, tags=None, run_tags=None, metadata=None, hook_defs=None, op_retry_policy=None, _subset_selection_data=None, asset_layer=None, input_values=None, _was_explicitly_provided_resources=None)[source]

Defines a Dagster job.

execute_in_process(run_config=None, instance=None, partition_key=None, raise_on_error=True, op_selection=None, asset_selection=None, run_id=None, input_values=None, tags=None, resources=None)[source]

Execute the Job in-process, gathering results in-memory.

The executor_def on the Job will be ignored, and replaced with the in-process executor. If using the default io_manager, it will switch from filesystem to in-memory.

Parameters:
  • (Optional[Mapping[str (run_config) – The configuration for the run

  • Any]] – The configuration for the run

  • instance (Optional[DagsterInstance]) – The instance to execute against, an ephemeral one will be used if none provided.

  • partition_key – (Optional[str]) The string partition key that specifies the run config to execute. Can only be used to select run config for jobs with partitioned config.

  • raise_on_error (Optional[bool]) – Whether or not to raise exceptions when they occur. Defaults to True.

  • op_selection (Optional[Sequence[str]]) – A list of op selection queries (including single op names) to execute. For example: * ['some_op']: selects some_op itself. * ['*some_op']: select some_op and all its ancestors (upstream dependencies). * ['*some_op+++']: select some_op, all its ancestors, and its descendants (downstream dependencies) within 3 levels down. * ['*some_op', 'other_op_a', 'other_op_b+']: select some_op and all its ancestors, other_op_a itself, and other_op_b and its direct child ops.

  • input_values (Optional[Mapping[str, Any]]) – A dictionary that maps python objects to the top-level inputs of the job. Input values provided here will override input values that have been provided to the job directly.

  • resources (Optional[Mapping[str, Any]]) – The resources needed if any are required. Can provide resource instances directly, or resource definitions.

Returns:

ExecuteInProcessResult

run_request_for_partition(partition_key, run_key=None, tags=None, asset_selection=None, run_config=None, current_time=None, dynamic_partitions_store=None)[source]

deprecated This API will be removed in version 2.0.0.

Directly instantiate RunRequest(partition_key=...) instead..

Creates a RunRequest object for a run that processes the given partition.

Parameters:
  • partition_key – The key of the partition to request a run for.

  • run_key (Optional[str]) – A string key to identify this launched run. For sensors, ensures that only one run is created per run key across all sensor evaluations. For schedules, ensures that one run is created per tick, across failure recoveries. Passing in a None value means that a run will always be launched per evaluation.

  • tags (Optional[Dict[str, str]]) – A dictionary of tags (string key-value pairs) to attach to the launched run.

  • (Optional[Mapping[str (run_config) – Configuration for the run. If the job has a PartitionedConfig, this value will override replace the config provided by it.

  • Any]] – Configuration for the run. If the job has a PartitionedConfig, this value will override replace the config provided by it.

  • current_time (Optional[datetime]) – Used to determine which time-partitions exist. Defaults to now.

  • dynamic_partitions_store (Optional[DynamicPartitionsStore]) – The DynamicPartitionsStore object that is responsible for fetching dynamic partitions. Required when the partitions definition is a DynamicPartitionsDefinition with a name defined. Users can pass the DagsterInstance fetched via context.instance to this argument.

Returns:

an object that requests a run to process the given partition.

Return type:

RunRequest

with_hooks(hook_defs)[source]

Apply a set of hooks to all op instances within the job.

with_top_level_resources(resource_defs)[source]

Apply a set of resources to all op instances within the job.

property config_mapping

The config mapping for the job, if it has one.

A config mapping defines a way to map a top-level config schema to run config for the job.

property executor_def

Returns the default ExecutorDefinition for the job.

If the user has not specified an executor definition, then this will default to the multi_or_in_process_executor(). If a default is specified on the Definitions object the job was provided to, then that will be used instead.

property has_specified_executor

Returns True if this job has explicitly specified an executor, and False if the executor was inherited through defaults or the Definitions object the job was provided to.

property has_specified_loggers

Returns true if the job explicitly set loggers, and False if loggers were inherited through defaults or the Definitions object the job was provided to.

property loggers

Returns the set of LoggerDefinition objects specified on the job.

If the user has not specified a mapping of LoggerDefinition objects, then this will default to the colored_console_logger() under the key console. If a default is specified on the Definitions object the job was provided to, then that will be used instead.

property partitioned_config

The partitioned config for the job, if it has one.

A partitioned config defines a way to map partition keys to run config for the job.

property partitions_def

Returns the PartitionsDefinition for the job, if it has one.

A partitions definition defines the set of partition keys the job operates on.

property resource_defs

Returns the set of ResourceDefinition objects specified on the job.

This may not be the complete set of resources required by the job, since those can also be provided on the Definitions object the job may be provided to.

Reconstructable jobs

class dagster.reconstructable(target)[source]

Create a ReconstructableJob from a function that returns a JobDefinition/JobDefinition, or a function decorated with @job.

When your job must cross process boundaries, e.g., for execution on multiple nodes or in different systems (like dagstermill), Dagster must know how to reconstruct the job on the other side of the process boundary.

Passing a job created with ~dagster.GraphDefinition.to_job to reconstructable(), requires you to wrap that job’s definition in a module-scoped function, and pass that function instead:

from dagster import graph, reconstructable

@graph
def my_graph():
    ...

def define_my_job():
    return my_graph.to_job()

reconstructable(define_my_job)

This function implements a very conservative strategy for reconstruction, so that its behavior is easy to predict, but as a consequence it is not able to reconstruct certain kinds of jobs or jobs, such as those defined by lambdas, in nested scopes (e.g., dynamically within a method call), or in interactive environments such as the Python REPL or Jupyter notebooks.

If you need to reconstruct objects constructed in these ways, you should use build_reconstructable_job() instead, which allows you to specify your own reconstruction strategy.

Examples

from dagster import job, reconstructable

@job
def foo_job():
    ...

reconstructable_foo_job = reconstructable(foo_job)


@graph
def foo():
    ...

def make_bar_job():
    return foo.to_job()

reconstructable_bar_job = reconstructable(make_bar_job)
dagster.build_reconstructable_job(reconstructor_module_name, reconstructor_function_name, reconstructable_args=None, reconstructable_kwargs=None, reconstructor_working_directory=None)[source]

experimental This API may break in future versions, even between dot releases.

Create a dagster._core.definitions.reconstructable.ReconstructableJob.

When your job must cross process boundaries, e.g., for execution on multiple nodes or in different systems (like dagstermill), Dagster must know how to reconstruct the job on the other side of the process boundary.

This function allows you to use the strategy of your choice for reconstructing jobs, so that you can reconstruct certain kinds of jobs that are not supported by reconstructable(), such as those defined by lambdas, in nested scopes (e.g., dynamically within a method call), or in interactive environments such as the Python REPL or Jupyter notebooks.

If you need to reconstruct jobs constructed in these ways, use this function instead of reconstructable().

Parameters:
  • reconstructor_module_name (str) – The name of the module containing the function to use to reconstruct the job.

  • reconstructor_function_name (str) – The name of the function to use to reconstruct the job.

  • reconstructable_args (Tuple) – Args to the function to use to reconstruct the job. Values of the tuple must be JSON serializable.

  • reconstructable_kwargs (Dict[str, Any]) – Kwargs to the function to use to reconstruct the job. Values of the dict must be JSON serializable.

Examples

# module: mymodule

from dagster import JobDefinition, job, build_reconstructable_job

class JobFactory:
    def make_job(*args, **kwargs):

        @job
        def _job(...):
            ...

        return _job

def reconstruct_job(*args):
    factory = JobFactory()
    return factory.make_job(*args)

factory = JobFactory()

foo_job_args = (...,...)

foo_job_kwargs = {...:...}

foo_job = factory.make_job(*foo_job_args, **foo_job_kwargs)

reconstructable_foo_job = build_reconstructable_job(
    'mymodule',
    'reconstruct_job',
    foo_job_args,
    foo_job_kwargs,
)