Skip to main content

OutputDefinition

Defines an output from an op’s compute function. Ops can have multiple outputs, in which case outputs cannot be anonymous. Many ops have only one output, which is given the default name “result”.

Class Definition

class OutputDefinition:
    def __init__(
        self,
        dagster_type=None,
        name: str | None = None,
        description: str | None = None,
        is_required: bool = True,
        io_manager_key: str | None = None,
        metadata: ArbitraryMetadataMapping | None = None,
        code_version: str | None = None,
    )
Defined in: dagster._core.definitions.output

Parameters

dagster_type
Type | DagsterType | None
The type of this output. Users should provide the Python type of objects they expect the op to yield, or a DagsterType for runtime validation.Defaults to Any.
from dagster import OutputDefinition, op

@op(out=OutputDefinition(dagster_type=int))
def my_op():
    return 42
name
str | None
default:"result"
Name of the output. Defaults to “result” for single outputs.
from dagster import OutputDefinition, op

@op(out=OutputDefinition(name="my_output"))
def my_op():
    return "data"
description
str | None
Human-readable description of the output.
OutputDefinition(
    description="Processed customer records from the ETL pipeline"
)
is_required
bool
default:"True"
Whether the presence of this output is required.
# Optional output
OutputDefinition(is_required=False)
io_manager_key
str | None
default:"io_manager"
The resource key of the IOManager used for storing this output and loading it in downstream steps.
from dagster import OutputDefinition

OutputDefinition(io_manager_key="s3_io_manager")
metadata
Dict[str, Any] | None
Static metadata for the output. For example, file paths or database table information.
OutputDefinition(
    metadata={
        "table": "customers",
        "schema": "public"
    }
)
code_version
str | None
Version of the code that generates this output. Versions should be set only for code that deterministically produces the same output given the same inputs.
OutputDefinition(code_version="1.0.0")

Properties

name
str
The name of the output.
dagster_type
DagsterType
The Dagster type of the output.
description
str | None
The description of the output.
is_required
bool
Whether the output is required.
optional
bool
Whether the output is optional (inverse of is_required).
io_manager_key
str
The IO manager key for this output.
metadata
ArbitraryMetadataMapping
The metadata attached to this output.
is_dynamic
bool
Whether this is a dynamic output. Returns False for regular outputs.

Methods

mapping_from

def mapping_from(
    self,
    node_name: str,
    output_name: str | None = None,
    from_dynamic_mapping: bool = False
) -> OutputMapping
Create an output mapping from an output of a child node. Used in graph definitions.
node_name
str
required
The name of the child node from which to map this output.
output_name
str | None
The name of the child node’s output. Defaults to “result”.
from dagster import OutputDefinition

output_mapping = OutputDefinition(int).mapping_from('child_node')

Out

A more concise way to define outputs using the @op decorator. This is the recommended approach for defining outputs.

Class Definition

class Out(NamedTuple):
    dagster_type: DagsterType | type[NoValueSentinel]
    description: str | None
    is_required: bool
    io_manager_key: str
    metadata: RawMetadataMapping | None
    code_version: str | None
Defined in: dagster._core.definitions.output

Constructor

def __new__(
    cls,
    dagster_type: type | DagsterType | None = NoValueSentinel,
    description: str | None = None,
    is_required: bool = True,
    io_manager_key: str | None = None,
    metadata: ArbitraryMetadataMapping | None = None,
    code_version: str | None = None,
)

Parameters

Parameters are identical to OutputDefinition, but Out is designed for use with the decorator syntax.
dagster_type
Type | DagsterType | None
The type of this output. Only set if the correct type cannot be inferred from the function signature.
description
str | None
Human-readable description of the output.
is_required
bool
default:"True"
Whether this output is required.
io_manager_key
str | None
default:"io_manager"
The resource key of the output manager.
metadata
Dict[str, Any] | None
Metadata for the output.
code_version
str | None
Version of the code that generates this output.

Examples

Single Output

from dagster import op, Out

@op(out=Out(dagster_type=int, description="The computed value"))
def compute_value() -> int:
    return 42

Multiple Outputs

from dagster import op, Out
from typing import Tuple

@op(
    out={
        "sum": Out(int, description="Sum of inputs"),
        "product": Out(int, description="Product of inputs"),
    }
)
def calculate(a: int, b: int) -> Tuple[int, int]:
    return (a + b, a * b)

With IO Manager

from dagster import op, Out

@op(out=Out(io_manager_key="s3_io_manager"))
def store_in_s3():
    return large_dataframe

With Metadata

from dagster import op, Out

@op(
    out=Out(
        metadata={
            "table": "customers",
            "partition_key": "date"
        }
    )
)
def load_customers():
    return customer_data

Optional Output

from dagster import op, Out
from typing import Optional

@op(out=Out(is_required=False))
def maybe_output(condition: bool) -> Optional[str]:
    if condition:
        return "data"
    return None

With Type Inference

from dagster import op, Out
import pandas as pd

# Type is inferred from return annotation
@op(out=Out(description="Customer dataframe"))
def load_data() -> pd.DataFrame:
    return pd.read_csv("customers.csv")

DynamicOutputDefinition

Variant of OutputDefinition for outputs that will dynamically alter the graph at runtime.

Class Definition

class DynamicOutputDefinition(OutputDefinition):
    # Inherits constructor from OutputDefinition
    pass
Defined in: dagster._core.definitions.output

Usage

When using in composition functions like @job, dynamic outputs must be used with:
  • map - Clone downstream nodes for each DynamicOutput
  • collect - Gather all DynamicOutput into a list

Example

import os
from dagster import op, job, DynamicOut, DynamicOutput

@op(out=DynamicOut(str))
def files_in_directory(context):
    path = context.op_config["path"]
    dirname, _, filenames = next(os.walk(path))
    for file in filenames:
        yield DynamicOutput(
            os.path.join(dirname, file),
            mapping_key=file.replace(".", "_")
        )

@op
def process_file(file_path: str) -> int:
    # Process individual file
    return len(open(file_path).read())

@op
def summarize_directory(file_sizes: list[int]) -> int:
    return sum(file_sizes)

@job
def process_directory():
    files = files_in_directory()
    # Map process_file over each dynamic output
    file_results = files.map(process_file)
    # Collect results into a list
    summarize_directory(file_results.collect())

DynamicOut

Variant of Out for dynamic outputs. Used with the decorator syntax.

Class Definition

class DynamicOut(Out):
    # Inherits constructor from Out
    pass
Defined in: dagster._core.definitions.output

Example

from dagster import op, job, DynamicOut, DynamicOutput

@op(
    config_schema={"items": [str]},
    out=DynamicOut(str)
)
def generate_items(context) -> None:
    for item in context.op_config["items"]:
        yield DynamicOutput(item, mapping_key=item)

@op
def process_item(item: str) -> str:
    return item.upper()

@op
def combine_items(items: list[str]) -> str:
    return ", ".join(items)

@job
def dynamic_job():
    items = generate_items()
    processed = items.map(process_item)
    combine_items(processed.collect())

OutputMapping

Defines an output mapping for a graph, connecting an output from a child node to a graph output.

Class Definition

class OutputMapping(NamedTuple):
    graph_output_name: str
    mapped_node_name: str
    mapped_node_output_name: str
    graph_output_description: str | None = None
    dagster_type: DagsterType | None = None
    from_dynamic_mapping: bool = False
Defined in: dagster._core.definitions.output

Parameters

graph_output_name
str
required
Name of the output in the graph being mapped to.
mapped_node_name
str
required
Name of the node (op/graph) that the output is being mapped from.
mapped_node_output_name
str
required
Name of the output in the node being mapped from.
graph_output_description
str | None
Description of the graph output.
from_dynamic_mapping
bool
default:"False"
Set to true if the node being mapped to is a mapped dynamic node.
dagster_type
DagsterType | None
The dagster type of the graph’s output.

Example

from dagster import OutputMapping, GraphDefinition, op, graph, GraphOut

@op
def emit_five():
    return 5

# Using OutputMapping explicitly
GraphDefinition(
    name="the_graph",
    node_defs=[emit_five],
    output_mappings=[
        OutputMapping(
            graph_output_name="result",
            mapped_node_name="emit_five",
            mapped_node_output_name="result"
        )
    ]
)

# Equivalent using decorator
@graph(out=GraphOut())
def the_graph():
    return emit_five()

GraphOut

Represents information about outputs that a graph maps. Used with the @graph decorator.

Class Definition

class GraphOut(NamedTuple):
    description: str | None
Defined in: dagster._core.definitions.output

Constructor

def __new__(cls, description: str | None = None)

Parameter

description
str | None
Human-readable description of the output.

Example

from dagster import graph, op, GraphOut

@op
def op_a() -> int:
    return 1

@op
def op_b() -> int:
    return 2

@graph(out=GraphOut(description="Sum of op_a and op_b"))
def my_graph() -> int:
    return op_a() + op_b()

Common Patterns

Single vs Multiple Outputs

from dagster import op, Out

# Single output (implicit)
@op
def single_output():
    return 42

# Single output (explicit)
@op(out=Out(int))
def explicit_single():
    return 42

# Multiple outputs
@op(out={"a": Out(int), "b": Out(str)})
def multiple_outputs():
    return 42, "hello"

Type Safety

from dagster import op, Out
import pandas as pd

# Type inferred from annotation
@op
def load_dataframe() -> pd.DataFrame:
    return pd.DataFrame({"col": [1, 2, 3]})

# Explicit type
@op(out=Out(dagster_type=pd.DataFrame))
def load_dataframe_explicit():
    return pd.DataFrame({"col": [1, 2, 3]})

Custom IO Managers

from dagster import op, Out, asset, Definitions
from dagster import ConfigurableIOManager

class MyIOManager(ConfigurableIOManager):
    def handle_output(self, context, obj):
        # Custom storage logic
        pass
    
    def load_input(self, context):
        # Custom loading logic
        pass

@op(out=Out(io_manager_key="my_io_manager"))
def my_op():
    return "data"

defs = Definitions(
    ops=[my_op],
    resources={"my_io_manager": MyIOManager()}
)

Output Metadata

from dagster import op, Out

@op(
    out=Out(
        metadata={
            "format": "parquet",
            "compression": "snappy",
            "schema_version": "2.0"
        }
    )
)
def save_data():
    return dataframe