Skip to main content

materialize

Executes a single-threaded, in-process run which materializes provided assets. By default, materializes assets to the local filesystem.

Function Signature

def materialize(
    assets: Sequence[AssetsDefinition | AssetSpec | SourceAsset],
    run_config: Any = None,
    instance: DagsterInstance | None = None,
    resources: Mapping[str, object] | None = None,
    partition_key: str | None = None,
    raise_on_error: bool = True,
    tags: Mapping[str, str] | None = None,
    selection: Optional[CoercibleToAssetSelection] = None,
) -> ExecuteInProcessResult
Defined in: dagster._core.definitions.materialize

Parameters

assets
Sequence[AssetsDefinition | AssetSpec | SourceAsset]
required
The assets to materialize.Unless you’re using deps or non_argument_deps, you must include all upstream assets. This is because upstream asset definitions contain information needed to load their contents when materializing downstream assets.Use the selection argument to distinguish between assets you want to materialize and assets present only for loading.
from dagster import asset

@asset
def upstream_asset():
    return [1, 2, 3]

@asset
def downstream_asset(upstream_asset):
    return sum(upstream_asset)

# Must include both assets
materialize([upstream_asset, downstream_asset])
run_config
Any
The run config to use for the run that materializes the assets. Used to provide configuration to ops and resources.
from dagster import asset, Config

class MyConfig(Config):
    threshold: int

@asset
def configured_asset(config: MyConfig):
    return config.threshold * 2

materialize(
    [configured_asset],
    run_config={
        "ops": {
            "configured_asset": {
                "config": {"threshold": 10}
            }
        }
    }
)
instance
DagsterInstance | None
The Dagster instance to use for execution. If not provided, a temporary instance is created.
from dagster import DagsterInstance

with DagsterInstance.get() as instance:
    result = materialize([my_asset], instance=instance)
resources
Mapping[str, object] | None
Resources needed for execution. Can provide resource instances directly or resource definitions.If provided resources conflict with resources directly on assets, an error is thrown.
from dagster import asset, ConfigurableResource

class DatabaseResource(ConfigurableResource):
    connection_string: str

@asset
def db_asset(database: DatabaseResource):
    return database.connection_string

materialize(
    [db_asset],
    resources={
        "database": DatabaseResource(connection_string="postgresql://localhost")
    }
)
partition_key
str | None
The partition key that specifies the run config to execute. Can only be used to select run config for assets with partitioned config.
from dagster import asset, DailyPartitionsDefinition

@asset(partitions_def=DailyPartitionsDefinition("2024-01-01"))
def partitioned_asset(context):
    partition = context.partition_key
    return f"Data for {partition}"

# Materialize specific partition
result = materialize(
    [partitioned_asset],
    partition_key="2024-01-15"
)
raise_on_error
bool
default:"True"
Whether to raise an exception if the run fails. If False, returns a result object with failure information.
result = materialize([my_asset], raise_on_error=False)
if not result.success:
    print(f"Run failed: {result.failure_data}")
tags
Mapping[str, str] | None
Tags for the run. Tags are metadata that can be used for filtering and organization.
materialize(
    [my_asset],
    tags={
        "environment": "dev",
        "team": "data-eng"
    }
)
selection
CoercibleToAssetSelection | None
A sub-selection of assets to materialize. If not provided, all assets will be materialized.Can be:
  • A string using asset selection syntax
  • A sequence of strings
  • A sequence of AssetKeys
  • A sequence of AssetsDefinition or SourceAsset objects
  • An AssetSelection object
# Materialize only downstream_asset, loading from upstream_asset
materialize(
    [upstream_asset, downstream_asset],
    selection=[downstream_asset]
)

# Using selection syntax
materialize(
    [asset1, asset2, asset3],
    selection="asset1+"  # asset1 and its downstream dependencies
)

Returns

result
ExecuteInProcessResult
The result of the execution, containing:
  • success - Whether the run succeeded
  • output_for_node(node_name) - Retrieve output value for a specific node
  • asset_materializations_for_node(node_name) - Get materialization events
  • all_events - All events that occurred during execution
  • run_id - The unique identifier for this run
result = materialize([my_asset])

# Check success
assert result.success

# Get output value
output_value = result.output_for_node("my_asset")

# Get materialization metadata
materializations = result.asset_materializations_for_node("my_asset")
for mat in materializations:
    print(mat.metadata)

Examples

Basic Usage

from dagster import asset, materialize

@asset
def asset1():
    return "data"

@asset
def asset2(asset1):
    return asset1.upper()

# Execute both assets
result = materialize([asset1, asset2])
assert result.success

With Configuration

from dagster import asset, materialize, Config

class AssetConfig(Config):
    api_key: str
    retries: int = 3

@asset
def configured_asset(config: AssetConfig):
    return f"API Key: {config.api_key}, Retries: {config.retries}"

result = materialize(
    [configured_asset],
    run_config={
        "ops": {
            "configured_asset": {
                "config": {
                    "api_key": "secret_key",
                    "retries": 5
                }
            }
        }
    }
)

With Resources

from dagster import asset, materialize, ConfigurableResource

class APIResource(ConfigurableResource):
    base_url: str
    timeout: int = 30

@asset
def api_asset(api: APIResource):
    return f"Fetching from {api.base_url}"

result = materialize(
    [api_asset],
    resources={
        "api": APIResource(
            base_url="https://api.example.com",
            timeout=60
        )
    }
)

With Partitions

from dagster import asset, materialize, DailyPartitionsDefinition

partitions_def = DailyPartitionsDefinition(start_date="2024-01-01")

@asset(partitions_def=partitions_def)
def daily_asset(context):
    partition = context.partition_key
    return f"Data for {partition}"

# Materialize a specific partition
result = materialize(
    [daily_asset],
    partition_key="2024-01-15"
)

With Selection

from dagster import asset, materialize

@asset
def raw_data():
    return [1, 2, 3, 4, 5]

@asset
def processed_data(raw_data):
    return [x * 2 for x in raw_data]

@asset
def aggregated_data(processed_data):
    return sum(processed_data)

# Materialize only processed_data and aggregated_data
# raw_data is included for loading but not materialized
result = materialize(
    [raw_data, processed_data, aggregated_data],
    selection=[processed_data, aggregated_data]
)

Testing Asset Outputs

from dagster import asset, materialize
import pytest

@asset
def compute_metrics():
    return {"accuracy": 0.95, "precision": 0.92}

def test_compute_metrics():
    result = materialize([compute_metrics])
    
    # Test execution succeeded
    assert result.success
    
    # Test output values
    metrics = result.output_for_node("compute_metrics")
    assert metrics["accuracy"] > 0.9
    assert metrics["precision"] > 0.9

Multi-Asset Example

from dagster import multi_asset, AssetOut, materialize

@multi_asset(
    outs={
        "customers": AssetOut(),
        "orders": AssetOut(),
    }
)
def extract_data():
    customers = fetch_customers()
    orders = fetch_orders()
    return customers, orders

@asset
def customer_metrics(customers):
    return calculate_metrics(customers)

result = materialize([extract_data, customer_metrics])

Common Use Cases

Use materialize to test assets locally before deploying:
from dagster import asset, materialize

@asset
def my_asset():
    # Asset logic
    return data

if __name__ == "__main__":
    # Test locally
    result = materialize([my_asset])
    print(f"Success: {result.success}")
Test asset logic in unit tests:
import pytest
from dagster import asset, materialize

@asset
def data_processor(context):
    # Process data
    return processed_data

def test_data_processor():
    result = materialize([data_processor])
    assert result.success
    output = result.output_for_node("data_processor")
    assert len(output) > 0
Quickly iterate on asset logic in notebooks or scripts:
from dagster import asset, materialize

@asset
def experimental_feature():
    # Try new logic
    return new_approach()

# Run immediately to see results
result = materialize([experimental_feature])

Comparison with materialize_to_memory

materialize persists assets to the configured IO manager (filesystem by default), while materialize_to_memory keeps everything in memory. Use materialize when:
  • Testing end-to-end workflows including IO
  • Debugging IO manager behavior
  • Needing to inspect persisted artifacts
Use materialize_to_memory when:
  • Running fast unit tests
  • Testing pure computation logic
  • Not concerned with IO behavior

materialize_to_memory

Executes a single-threaded, in-process run which materializes assets in memory using the mem_io_manager.

Function Signature

def materialize_to_memory(
    assets: Sequence[AssetsDefinition | AssetSpec | SourceAsset],
    run_config: Any = None,
    instance: DagsterInstance | None = None,
    resources: Mapping[str, object] | None = None,
    partition_key: str | None = None,
    raise_on_error: bool = True,
    tags: Mapping[str, str] | None = None,
    selection: Optional[CoercibleToAssetSelection] = None,
) -> ExecuteInProcessResult
Defined in: dagster._core.definitions.materialize

Parameters

Parameters are identical to materialize, with one key difference:
Important: materialize_to_memory will explicitly use mem_io_manager for all required IO manager keys. If any IO managers are provided via the resources argument, a DagsterInvariantViolationError is thrown.

Returns

result
ExecuteInProcessResult
Same as materialize - the result of execution with access to outputs and events.

Examples

Basic In-Memory Execution

from dagster import asset, materialize_to_memory

@asset
def data_asset():
    return [1, 2, 3, 4, 5]

@asset
def processed_asset(data_asset):
    return [x * 2 for x in data_asset]

# Materialize in memory - no disk IO
result = materialize_to_memory([data_asset, processed_asset])

# Access the actual Python objects
data = result.output_for_node("data_asset")
processed = result.output_for_node("processed_asset")

assert processed == [2, 4, 6, 8, 10]

Fast Unit Tests

from dagster import asset, materialize_to_memory
import pytest

@asset
def calculate_statistics(data):
    return {
        "mean": sum(data) / len(data),
        "max": max(data),
        "min": min(data)
    }

def test_calculate_statistics():
    # Fast in-memory test
    result = materialize_to_memory([calculate_statistics])
    
    stats = result.output_for_node("calculate_statistics")
    assert "mean" in stats
    assert "max" in stats
    assert "min" in stats

Testing Pure Logic

from dagster import asset, materialize_to_memory

@asset
def transform_data():
    raw_data = [1, 2, 3, 4, 5]
    return [x ** 2 for x in raw_data]

@asset
def filter_data(transform_data):
    return [x for x in transform_data if x > 10]

def test_data_pipeline():
    result = materialize_to_memory([transform_data, filter_data])
    
    transformed = result.output_for_node("transform_data")
    assert transformed == [1, 4, 9, 16, 25]
    
    filtered = result.output_for_node("filter_data")
    assert filtered == [16, 25]

With Asset Dependencies

from dagster import asset, materialize_to_memory

@asset
def upstream():
    return {"key": "value"}

@asset
def middle(upstream):
    return {**upstream, "processed": True}

@asset
def downstream(middle):
    return middle["key"].upper()

# All intermediate values stay in memory
result = materialize_to_memory([upstream, middle, downstream])

# Can access all outputs
assert result.output_for_node("upstream") == {"key": "value"}
assert result.output_for_node("middle") == {"key": "value", "processed": True}
assert result.output_for_node("downstream") == "VALUE"

Error Handling

from dagster import asset, materialize_to_memory, DagsterInvariantViolationError
from dagster import ConfigurableIOManager

class MyIOManager(ConfigurableIOManager):
    def load_input(self, context):
        pass
    
    def handle_output(self, context, obj):
        pass

@asset
def my_asset():
    return "data"

# This will raise an error - cannot provide IO managers
try:
    materialize_to_memory(
        [my_asset],
        resources={"io_manager": MyIOManager()}
    )
except DagsterInvariantViolationError as e:
    print("Cannot override io_manager in materialize_to_memory")

When to Use Each Function

Use Casematerializematerialize_to_memory
Unit testing pure logic✓✓ (faster)
Integration testing✓✓
Testing IO managers✓✓
Local development✓✓
Debugging persistence✓✓
Fast iteration✓✓
CI/CD tests✓✓