IO Managers
IO Managers are special resources that handle how data is stored and loaded between assets and ops. They provide a clean separation between your business logic and data persistence, making it easy to:
Switch storage backends : Use local files in dev, cloud storage in production
Handle different data types : Automatically serialize/deserialize DataFrames, Parquet files, database tables
Optimize performance : Implement caching, compression, and partitioned storage
Test pipelines : Mock data persistence for unit tests
Why IO Managers Matter
Without IO managers, each asset would need to handle its own persistence:
# ❌ Without IO Manager - persistence logic mixed with business logic
@asset
def upstream ():
data = compute_data()
# Manual persistence
df.to_parquet( "/data/upstream.parquet" )
return data
@asset
def downstream ():
# Manual loading
data = pd.read_parquet( "/data/upstream.parquet" )
return process(data)
With IO managers, persistence is automatic:
# ✅ With IO Manager - clean separation of concerns
@asset
def upstream ():
return compute_data() # IO manager handles storage
@asset
def downstream ( upstream ): # IO manager handles loading
return process(upstream)
How IO Managers Work
When an asset or op produces output:
The computation function returns a value
The IO manager’s handle_output method stores the value
When an asset or op needs input:
The IO manager’s load_input method loads the value
The value is passed to the computation function
Default IO Manager
Dagster provides a default IO manager that uses pickle serialization:
from dagster import FilesystemIOManager, job, op
@op
def op_1 ():
return 1
@op
def op_2 ( a ):
return a + 1
@job ( resource_defs = { "io_manager" : FilesystemIOManager()})
def my_job ():
op_2(op_1())
The FilesystemIOManager stores outputs as pickle files in a local directory.
By default, Dagster uses a filesystem IO manager that stores data in $DAGSTER_HOME/storage/. You can override this behavior by providing your own IO manager.
Creating Custom IO Managers
Define custom IO managers by extending ConfigurableIOManager:
import dagster as dg
class MyIOManager ( dg . ConfigurableIOManager ):
path_prefix: list[ str ] = []
def _get_path ( self , context ) -> str :
return "/" .join( self .path_prefix + context.asset_key.path)
def handle_output ( self , context : dg.OutputContext, obj ):
write_csv( self ._get_path(context), obj)
def load_input ( self , context : dg.InputContext):
return read_csv( self ._get_path(context))
IO Manager Methods
handle_output(context, obj) : Called when an asset/op produces output. Store obj based on information in context.
load_input(context) : Called when an asset/op needs input. Load and return data based on information in context.
Context Information
The context objects provide metadata about the asset/op:
OutputContext
InputContext
from dagster import OutputContext
def handle_output ( self , context : OutputContext, obj ):
# Asset/op metadata
context.asset_key # AssetKey for the asset
context.name # Output name
context.metadata # User-defined metadata
# Partitioning
context.has_partition_key # Whether output is partitioned
context.partition_key # The specific partition
# Type information
context.dagster_type # Dagster type of output
# Add metadata to materialization event
context.add_output_metadata({ "rows" : len (obj)})
from dagster import InputContext
def load_input ( self , context : InputContext):
# Asset/op metadata
context.asset_key # AssetKey for the asset
context.upstream_output # Info about upstream output
# Partitioning
context.has_partition_key # Whether input is partitioned
context.partition_key # The specific partition
context.asset_partition_key # Partition of asset being computed
# Load data based on context
if context.upstream_output:
table_name = context.upstream_output.name
return read_table(table_name)
Database IO Manager Example
Here’s a complete example that stores data in database tables:
import dagster as dg
class DataframeTableIOManager ( dg . ConfigurableIOManager ):
def handle_output ( self , context : dg.OutputContext, obj ):
# Use output name as table name
table_name = context.name
write_dataframe_to_table( name = table_name, dataframe = obj)
def load_input ( self , context : dg.InputContext):
# Load from upstream table
if context.upstream_output:
table_name = context.upstream_output.name
return read_dataframe_from_table( name = table_name)
@dg.job ( resource_defs = { "io_manager" : DataframeTableIOManager()})
def my_job ():
op_2(op_1())
IO managers can add metadata to materialization events:
import dagster as dg
class DataframeTableIOManagerWithMetadata ( dg . ConfigurableIOManager ):
def handle_output ( self , context : dg.OutputContext, obj ):
table_name = context.name
write_dataframe_to_table( name = table_name, dataframe = obj)
# Add metadata visible in the UI
context.add_output_metadata({
"num_rows" : len (obj),
"table_name" : table_name,
"columns" : list (obj.columns),
})
def load_input ( self , context : dg.InputContext):
if context.upstream_output:
table_name = context.upstream_output.name
return read_dataframe_from_table( name = table_name)
Metadata appears in the Dagster UI on the materialization event, helping you understand what was produced.
Partitioned IO Managers
IO managers can handle partitioned assets by checking partition keys:
import dagster as dg
class MyPartitionedIOManager ( dg . IOManager ):
def _get_path ( self , context ) -> str :
if context.has_partition_key:
# Store each partition in a separate file
return "/" .join(context.asset_key.path + [context.asset_partition_key])
else :
return "/" .join(context.asset_key.path)
def handle_output ( self , context : dg.OutputContext, obj ):
write_csv( self ._get_path(context), obj)
def load_input ( self , context : dg.InputContext):
return read_csv( self ._get_path(context))
For backfills that process multiple partitions in a single run:
import dagster as dg
class MyIOManager ( dg . IOManager ):
def load_input ( self , context : dg.InputContext):
# Get time window for all partitions being processed
start_datetime, end_datetime = context.asset_partitions_time_window
return read_data_in_datetime_range(start_datetime, end_datetime)
def handle_output ( self , context : dg.OutputContext, obj ):
start_datetime, end_datetime = context.asset_partitions_time_window
return overwrite_data_in_datetime_range(start_datetime, end_datetime, obj)
daily_partition = dg.DailyPartitionsDefinition( start_date = "2020-01-01" )
@dg.asset (
partitions_def = daily_partition,
backfill_policy = dg.BackfillPolicy.single_run(),
)
def events ( context : dg.AssetExecutionContext, raw_events ):
return compute_events_from_raw_events(raw_events)
IO Manager Factory Pattern
For IO managers with complex initialization, use the factory pattern:
import dagster as dg
class ExternalIOManager ( dg . IOManager ):
def __init__ ( self , api_token ):
self ._api_token = api_token
self ._cache = {} # Stateful cache
def handle_output ( self , context : dg.OutputContext, obj ):
# Store via external API
store_in_external_system( self ._api_token, context.asset_key, obj)
def load_input ( self , context : dg.InputContext):
# Check cache first
if context.asset_key in self ._cache:
return self ._cache[context.asset_key]
# Load from external system
data = fetch_from_external_system( self ._api_token, context.asset_key)
self ._cache[context.asset_key] = data
return data
class ConfigurableExternalIOManager ( dg . ConfigurableIOManagerFactory ):
api_token: str
def create_io_manager ( self , context ) -> ExternalIOManager:
return ExternalIOManager( self .api_token)
Per-Asset IO Managers
Different assets can use different IO managers:
import dagster as dg
@dg.asset ( io_manager_key = "fs_io_manager" )
def asset_uses_fs ():
return [ 1 , 2 , 3 ]
@dg.asset ( io_manager_key = "db_io_manager" )
def asset_uses_db ():
return { "key" : "value" }
defs = dg.Definitions(
assets = [asset_uses_fs, asset_uses_db],
resources = {
"fs_io_manager" : dg.FilesystemIOManager( base_dir = "/data" ),
"db_io_manager" : DatabaseIOManager( connection_string = "..." ),
},
)
Per-Output IO Managers
Multi-assets can specify different IO managers for each output:
import dagster as dg
@dg.multi_asset (
outs = {
"raw_data" : dg.AssetOut( io_manager_key = "fs_io_manager" ),
"transformed_data" : dg.AssetOut( io_manager_key = "db_io_manager" ),
}
)
def extract_and_transform ():
raw = fetch_raw_data()
transformed = transform(raw)
yield dg.Output(raw, output_name = "raw_data" )
yield dg.Output(transformed, output_name = "transformed_data" )
For loading data without a corresponding output, use input managers:
import dagster as dg
class MyInputManager ( dg . InputManager ):
def load_input ( self , context : dg.InputContext):
# Load from external source
return read_from_source(context.asset_key)
@dg.asset ( ins = { "external_data" : dg.AssetIn( input_manager_key = "my_input_manager" )})
def process_external_data ( external_data ):
return transform(external_data)
defs = dg.Definitions(
assets = [process_external_data],
resources = {
"my_input_manager" : MyInputManager(),
},
)
Type-Specific IO Managers
Create IO managers that handle specific data types:
import dagster as dg
import pandas as pd
class ParquetIOManager ( dg . ConfigurableIOManager ):
base_path: str
def _get_path ( self , context ) -> str :
return f " { self .base_path } / { '/' .join(context.asset_key.path) } .parquet"
def handle_output ( self , context : dg.OutputContext, obj : pd.DataFrame):
path = self ._get_path(context)
obj.to_parquet(path)
context.add_output_metadata({
"num_rows" : len (obj),
"num_columns" : len (obj.columns),
"path" : path,
})
def load_input ( self , context : dg.InputContext) -> pd.DataFrame:
return pd.read_parquet( self ._get_path(context))
Testing with IO Managers
Test IO managers in isolation:
import dagster as dg
import tempfile
import os
def test_my_io_manager ():
with tempfile.TemporaryDirectory() as tmpdir:
io_manager = MyIOManager( path_prefix = [tmpdir])
# Create mock context
context = dg.build_output_context(
asset_key = dg.AssetKey([ "test_asset" ])
)
# Test handle_output
test_data = [ 1 , 2 , 3 ]
io_manager.handle_output(context, test_data)
# Test load_input
loaded_data = io_manager.load_input(
dg.build_input_context( asset_key = dg.AssetKey([ "test_asset" ]))
)
assert loaded_data == test_data
Test assets with mock IO managers:
import dagster as dg
class MockIOManager ( dg . IOManager ):
def __init__ ( self ):
self .storage = {}
def handle_output ( self , context , obj ):
self .storage[context.asset_key] = obj
def load_input ( self , context ):
return self .storage[context.asset_key]
def test_asset_pipeline ():
result = dg.materialize(
[upstream_asset, downstream_asset],
resources = { "io_manager" : MockIOManager()},
)
assert result.success
Built-in IO Managers
Dagster provides several built-in IO managers:
Filesystem
In-Memory
UPath (Cloud Storage)
from dagster import FilesystemIOManager
defs = dg.Definitions(
assets = [my_asset],
resources = {
"io_manager" : FilesystemIOManager(
base_dir = "/data/storage"
)
},
)
Stores outputs as pickle files in the specified directory. from dagster import InMemoryIOManager
defs = dg.Definitions(
assets = [my_asset],
resources = {
"io_manager" : InMemoryIOManager()
},
)
Stores outputs in memory. Useful for testing and development. from dagster import UPathIOManager
# Works with S3, GCS, Azure Blob Storage, etc.
defs = dg.Definitions(
assets = [my_asset],
resources = {
"io_manager" : UPathIOManager(
base_path = "s3://my-bucket/dagster-storage"
)
},
)
Universal IO manager that works with any fsspec-compatible storage.
Best Practices
Choose the right IO manager for your data
Use filesystem IO managers for local development, cloud storage IO managers for production, and database IO managers for tabular data that needs to be queried.
Add metadata to understand your data
Use context.add_output_metadata() to record information like row counts, file sizes, and data quality metrics.
Handle partitions efficiently
For partitioned assets, make sure your IO manager stores each partition separately and can load individual partitions or partition ranges.
Keep IO managers stateless when possible
Stateless IO managers are easier to test and reason about. Use the factory pattern if you need stateful caching.
Test IO managers independently
Write unit tests for your IO manager’s handle_output and load_input methods before using them in production.
API Reference