The @io_manager decorator defines an I/O manager - a special type of resource that handles storing op/asset outputs and loading them as inputs to downstream ops/assets.
Signature
@io_manager(
config_schema: Optional[ConfigSchema] = None,
description: Optional[str] = None,
output_config_schema: Optional[ConfigSchema] = None,
input_config_schema: Optional[ConfigSchema] = None,
required_resource_keys: Optional[Set[str]] = None,
version: Optional[str] = None,
) -> IOManagerDefinition
Parameters
The schema for the resource config. Configuration data available in init_context.resource_config. If not set, Dagster will accept any config provided.
A human-readable description of the resource.
The schema for per-output config. If not set, no per-output configuration will be allowed.
The schema for per-input config. If not set, Dagster will accept any config provided.
Keys for the resources required by the I/O manager.
The version of a resource function. Two wrapped resource functions should only have the same version if they produce the same resource definition when provided with the same inputs.
Returns
Type: IOManagerDefinition
An I/O manager definition object.
Examples
Basic I/O Manager
from dagster import IOManager, io_manager
import pickle
class MyIOManager(IOManager):
def handle_output(self, context, obj):
# Store the output
file_path = f"/tmp/{context.asset_key.path[-1]}.pkl"
with open(file_path, "wb") as f:
pickle.dump(obj, f)
def load_input(self, context):
# Load the input
file_path = f"/tmp/{context.asset_key.path[-1]}.pkl"
with open(file_path, "rb") as f:
return pickle.load(f)
@io_manager
def my_io_manager(init_context):
return MyIOManager()
I/O Manager with Configuration
from dagster import IOManager, io_manager, Field, String
import pandas as pd
class CSVIOManager(IOManager):
def __init__(self, base_dir: str):
self.base_dir = base_dir
def handle_output(self, context, obj: pd.DataFrame):
file_path = f"{self.base_dir}/{context.asset_key.path[-1]}.csv"
obj.to_csv(file_path, index=False)
def load_input(self, context) -> pd.DataFrame:
file_path = f"{self.base_dir}/{context.asset_key.path[-1]}.csv"
return pd.read_csv(file_path)
@io_manager(
config_schema={
"base_dir": Field(String, default_value="/tmp/data")
}
)
def csv_io_manager(init_context):
base_dir = init_context.resource_config["base_dir"]
return CSVIOManager(base_dir=base_dir)
Using I/O Manager with Assets
from dagster import asset, Definitions, io_manager, IOManager
import json
class JSONIOManager(IOManager):
def handle_output(self, context, obj):
file_path = f"/tmp/{context.asset_key.path[-1]}.json"
with open(file_path, "w") as f:
json.dump(obj, f)
def load_input(self, context):
file_path = f"/tmp/{context.asset_key.path[-1]}.json"
with open(file_path, "r") as f:
return json.load(f)
@io_manager
def json_io_manager(init_context):
return JSONIOManager()
@asset
def upstream_asset():
return {"data": [1, 2, 3]}
@asset
def downstream_asset(upstream_asset):
# upstream_asset is automatically loaded by the I/O manager
return {"processed": upstream_asset["data"]}
defs = Definitions(
assets=[upstream_asset, downstream_asset],
resources={"io_manager": json_io_manager},
)
I/O Manager with Per-Output Config
from dagster import io_manager, IOManager, Out, op, job
class ConfigurableIOManager(IOManager):
def handle_output(self, context, obj):
# Access per-output config
file_format = context.config.get("format", "json")
file_path = f"/tmp/{context.asset_key.path[-1]}.{file_format}"
# Save based on format...
def load_input(self, context):
# Load based on metadata...
pass
@io_manager(
output_config_schema={
"format": str,
"compression": bool,
}
)
def configurable_io_manager(init_context):
return ConfigurableIOManager()
@op(out=Out(io_manager_key="my_io_manager"))
def my_op():
return {"data": [1, 2, 3]}
@job(
resource_defs={"my_io_manager": configurable_io_manager},
config={
"ops": {
"my_op": {
"outputs": {
"result": {"format": "parquet", "compression": True}
}
}
}
},
)
def my_job():
my_op()
Multiple I/O Managers
from dagster import asset, Definitions, io_manager, IOManager
class DatabaseIOManager(IOManager):
def handle_output(self, context, obj):
# Store in database
pass
def load_input(self, context):
# Load from database
pass
class S3IOManager(IOManager):
def handle_output(self, context, obj):
# Store in S3
pass
def load_input(self, context):
# Load from S3
pass
@io_manager
def database_io_manager(init_context):
return DatabaseIOManager()
@io_manager
def s3_io_manager(init_context):
return S3IOManager()
@asset(io_manager_key="database_io_manager")
def database_asset():
return {"data": [1, 2, 3]}
@asset(io_manager_key="s3_io_manager")
def s3_asset():
return {"data": [4, 5, 6]}
defs = Definitions(
assets=[database_asset, s3_asset],
resources={
"database_io_manager": database_io_manager,
"s3_io_manager": s3_io_manager,
},
)
Partitioned I/O Manager
from dagster import (
IOManager,
io_manager,
asset,
DailyPartitionsDefinition,
)
import pandas as pd
class PartitionedIOManager(IOManager):
def handle_output(self, context, obj: pd.DataFrame):
partition_key = context.partition_key
file_path = f"/tmp/data/{partition_key}.parquet"
obj.to_parquet(file_path)
def load_input(self, context) -> pd.DataFrame:
partition_key = context.partition_key
file_path = f"/tmp/data/{partition_key}.parquet"
return pd.read_parquet(file_path)
@io_manager
def partitioned_io_manager(init_context):
return PartitionedIOManager()
@asset(
partitions_def=DailyPartitionsDefinition(start_date="2024-01-01")
)
def partitioned_asset(context):
date = context.partition_key
return pd.DataFrame({"date": [date], "value": [42]})
I/O Manager with Resource Dependencies
from dagster import io_manager, IOManager, resource
@resource
def database_connection(init_context):
return DatabaseConnection()
class DatabaseIOManager(IOManager):
def __init__(self, connection):
self.connection = connection
def handle_output(self, context, obj):
self.connection.execute(f"INSERT INTO {context.asset_key.path[-1]} ...")
def load_input(self, context):
return self.connection.query(f"SELECT * FROM {context.asset_key.path[-1]}")
@io_manager(required_resource_keys={"database_connection"})
def database_io_manager(init_context):
connection = init_context.resources.database_connection
return DatabaseIOManager(connection=connection)