Skip to main content
The @job decorator creates an executable job from a function that invokes ops or graphs. Jobs represent the full execution plan for a set of operations.

Signature

@job(
    name: Optional[str] = None,
    description: Optional[str] = None,
    resource_defs: Optional[Mapping[str, object]] = None,
    config: Optional[Union[ConfigMapping, Mapping[str, Any], RunConfig, PartitionedConfig]] = None,
    tags: Optional[Mapping[str, str]] = None,
    run_tags: Optional[Mapping[str, str]] = None,
    metadata: Optional[Mapping[str, RawMetadataValue]] = None,
    logger_defs: Optional[Mapping[str, LoggerDefinition]] = None,
    executor_def: Optional[ExecutorDefinition] = None,
    hooks: Optional[AbstractSet[HookDefinition]] = None,
    op_retry_policy: Optional[RetryPolicy] = None,
    partitions_def: Optional[PartitionsDefinition] = None,
    input_values: Optional[Mapping[str, object]] = None,
    owners: Optional[Sequence[str]] = None,
) -> JobDefinition

Parameters

name
Optional[str]
The name for the job. Defaults to the name of the decorated function.
description
Optional[str]
A human-readable description of the job.
resource_defs
Optional[Mapping[str, object]]
Resources that are required by this job for execution. If not defined, io_manager will default to filesystem.
config
Optional[Union[ConfigMapping, Mapping[str, Any], RunConfig, PartitionedConfig]]
Describes how the job is parameterized at runtime.
  • If no value is provided, the schema is based on ops and resources.
  • If a dictionary is provided, it will be used as the job’s run config.
  • If a RunConfig object is provided, it will be used directly as the run config.
  • If a ConfigMapping object is provided, it defines the config schema and mapping.
  • If a PartitionedConfig object is provided, it defines a discrete set of config values that can parameterize the job.
tags
Optional[Mapping[str, str]]
A set of key-value tags that annotate the job and can be used for searching and filtering in the UI. If run_tags is not set, then the content of tags will also be automatically appended to the tags of any runs of this job.
run_tags
Optional[Mapping[str, str]]
A set of key-value tags that will be automatically attached to runs launched by this job. These tag values may be overwritten by tag values provided at invocation time. If run_tags is set, then tags are not automatically appended to runs.
metadata
Optional[Dict[str, RawMetadataValue]]
Arbitrary information that will be attached to the JobDefinition and be viewable in the Dagster UI. Keys must be strings, and values must be Python primitive types or provided MetadataValue types.
logger_defs
Optional[Dict[str, LoggerDefinition]]
A dictionary of string logger identifiers to their implementations.
executor_def
Optional[ExecutorDefinition]
How this job will be executed. Defaults to multiprocess_executor.
hooks
Optional[AbstractSet[HookDefinition]]
A set of hooks to attach to the job.
op_retry_policy
Optional[RetryPolicy]
The default retry policy for all ops in this job. Only used if retry policy is not defined on the op definition or op invocation.
partitions_def
Optional[PartitionsDefinition]
Defines a discrete set of partition keys that can parameterize the job. If this argument is supplied, the config argument can’t also be supplied.
input_values
Optional[Mapping[str, Any]]
A dictionary that maps Python objects to the top-level inputs of a job.
owners
Optional[Sequence[str]]
A sequence of strings identifying the owners of the job.

Returns

Type: JobDefinition A job definition object.

Examples

Basic Job

from dagster import job, op

@op
def return_one():
    return 1

@op
def add_one(x: int) -> int:
    return x + 1

@job
def my_job():
    add_one(return_one())

Job with Resources

from dagster import job, op, OpExecutionContext, resource

@resource
def database_resource():
    return DatabaseConnection()

@op(required_resource_keys={"database"})
def query_database(context: OpExecutionContext):
    return context.resources.database.query("SELECT * FROM users")

@job(resource_defs={"database": database_resource})
def database_job():
    query_database()

Job with Configuration

from dagster import job, op, RunConfig

@op
def configurable_op(config: dict):
    return config["value"]

@job(
    config=RunConfig(
        ops={"configurable_op": {"config": {"value": 42}}}
    )
)
def configured_job():
    configurable_op()

Partitioned Job

from dagster import job, op, DailyPartitionsDefinition, OpExecutionContext

@op
def process_partition(context: OpExecutionContext):
    partition_date = context.partition_key
    return f"Processing {partition_date}"

@job(
    partitions_def=DailyPartitionsDefinition(start_date="2024-01-01")
)
def partitioned_job():
    process_partition()

Job with Retry Policy

from dagster import job, op, RetryPolicy, Backoff

@op
def flaky_op():
    result = call_external_api()
    return result

@job(
    op_retry_policy=RetryPolicy(
        max_retries=3,
        delay=2,
        backoff=Backoff.EXPONENTIAL,
    )
)
def resilient_job():
    flaky_op()

Job with Tags and Metadata

from dagster import job, op, MetadataValue

@op
def my_op():
    return "result"

@job(
    tags={"team": "data-platform", "priority": "high"},
    metadata={
        "owner": "data-team@company.com",
        "slack_channel": MetadataValue.url("https://slack.com/archives/C123"),
    },
    owners=["team:data-platform"],
)
def tagged_job():
    my_op()

Job with Custom Executor

from dagster import job, op, in_process_executor

@op
def my_op():
    return 42

@job(executor_def=in_process_executor)
def in_process_job():
    my_op()
  • @op - Define operations
  • @graph - Compose ops into reusable graphs
  • Partitions - Define partitions for jobs
  • Resources - Configure external services