AssetExecutionContext
The context object that is available as the first argument to asset compute functions. This context provides system information such as resources, configuration, logging, and partition information.
Class Definition
class AssetExecutionContext:
def __init__(self, op_execution_context: OpExecutionContext) -> None
Defined in: dagster._core.execution.context.asset_execution_context
Getting the Context
@staticmethod
def get() -> AssetExecutionContext
Retrieve the current AssetExecutionContext from the execution scope.
Returns: The current AssetExecutionContext
Raises: DagsterInvariantViolationError if no context is in scope
from dagster import asset, AssetExecutionContext
@asset
def my_asset():
# Inside the asset, you can get the context
context = AssetExecutionContext.get()
context.log.info("Retrieved context")
Core Properties
The log manager for this execution. Logs are viewable in the Dagster UI.@asset
def logger_asset(context: AssetExecutionContext):
context.log.info("Info level message")
context.log.warning("Warning message")
context.log.error("Error message")
The DagsterRun object for the current execution. Access run metadata including run_id, run_config, and tags.@asset
def my_asset(context: AssetExecutionContext):
run_id = context.run.run_id
tags = context.run.tags
config = context.run.run_config
The current Dagster instance.@asset
def my_asset(context: AssetExecutionContext):
# Access instance-level configuration
storage = context.instance.storage_directory()
The resources available in the execution context.@asset
def my_asset(context: AssetExecutionContext):
# Access configured resources
db = context.resources.database
api_client = context.resources.api
Provides access to pdb debugging from within the asset.@asset
def debug_asset(context: AssetExecutionContext):
context.pdb.set_trace() # Enter debugger
The definition for the currently executing job.
The name of the currently executing job.
The current op definition.
Access to the underlying OpExecutionContext for advanced use cases.
Which retry attempt is currently executing (0 for initial attempt, 1 for first retry, etc.).
The Dagster repository for the currently executing job.
The AssetKey for the current asset. For multi-assets with multiple outputs, use asset_key_for_output() instead.@asset
def my_asset(context: AssetExecutionContext):
key = context.asset_key # AssetKey(["my_asset"])
Whether there is a backing AssetsDefinition for what is currently executing.
The backing AssetsDefinition for what is currently executing.Raises: DagsterInvalidPropertyError if not available
The set of AssetKeys this execution is expected to materialize.
selected_asset_check_keys
AbstractSet[AssetCheckKey]
The asset check keys corresponding to the current selection of assets.
Methods
asset_key_for_output
def asset_key_for_output(self, output_name: str = "result") -> AssetKey
Return the AssetKey for the corresponding output.
The name of the output. For @asset, this is automatically “result”. For @multi_asset, specify the output name from AssetOut.
Returns: AssetKey
from dagster import AssetOut, multi_asset
@multi_asset(
outs={
"asset1": AssetOut(key=["my_assets", "asset1"]),
"asset2": AssetOut(key=["my_assets", "asset2"]),
}
)
def my_multi_asset(context):
key1 = context.asset_key_for_output("asset1")
key2 = context.asset_key_for_output("asset2")
return {"asset1": data1, "asset2": data2}
output_for_asset_key
def output_for_asset_key(self, asset_key: AssetKey) -> str
Return the output name for the corresponding asset key.
The asset key to look up.
Returns: str - The output name
def asset_key_for_input(self, input_name: str) -> AssetKey
Return the AssetKey for the corresponding input.
Returns: AssetKey
Whether the current run targets a single partition.
The partition key for the current run.Raises: Error if not a partitioned run or operating over a range of partitions.from dagster import asset, DailyPartitionsDefinition
partitions_def = DailyPartitionsDefinition("2023-08-20")
@asset(partitions_def=partitions_def)
def my_asset(context: AssetExecutionContext):
context.log.info(context.partition_key)
# Materializing 2023-08-21 partition logs: "2023-08-21"
List of partition keys for the current run. Useful for backfills of multiple partitions.@asset(partitions_def=DailyPartitionsDefinition("2023-08-20"))
def my_asset(context: AssetExecutionContext):
context.log.info(context.partition_keys)
# Backfill of 2023-08-21 through 2023-08-25 logs:
# ["2023-08-21", "2023-08-22", "2023-08-23", "2023-08-24", "2023-08-25"]
Whether the current run targets a range of partitions.
The range of partition keys for the current run. Returns a range with the same start and end for single partition runs.@asset(partitions_def=DailyPartitionsDefinition("2023-08-20"))
def my_asset(context: AssetExecutionContext):
key_range = context.partition_key_range
# Backfill logs: PartitionKeyRange(start="2023-08-21", end="2023-08-25")
The partition time window for the current run.Raises: Error if not partitioned or not using TimeWindowPartitionsDefinition.@asset(partitions_def=DailyPartitionsDefinition("2023-08-20"))
def my_asset(context: AssetExecutionContext):
window = context.partition_time_window
# Materializing 2023-08-21 logs: TimeWindow("2023-08-21", "2023-08-22")
def asset_partition_key_for_input(self, input_name: str) -> str
def asset_partition_keys_for_input(self, input_name: str) -> Sequence[str]
def asset_partition_key_range_for_input(self, input_name: str) -> PartitionKeyRange
def asset_partitions_time_window_for_input(self, input_name: str) -> TimeWindow
def asset_partitions_def_for_input(self, input_name: str) -> PartitionsDefinition
These methods provide partition information for input assets.
from dagster import asset, AssetIn, DailyPartitionsDefinition, TimeWindowPartitionMapping
partitions_def = DailyPartitionsDefinition("2023-08-20")
@asset(partitions_def=partitions_def)
def upstream_asset():
...
@asset(
partitions_def=partitions_def,
ins={
"upstream_asset": AssetIn(
partition_mapping=TimeWindowPartitionMapping(start_offset=-1, end_offset=-1)
)
}
)
def downstream_asset(context: AssetExecutionContext, upstream_asset):
# Get partition key of the upstream asset
upstream_key = context.asset_partition_key_for_input("upstream_asset")
# Materializing 2023-08-21 partition, upstream_key = "2023-08-20"
def add_output_metadata(
self,
metadata: Mapping[str, Any],
output_name: str | None = None,
mapping_key: str | None = None,
) -> None
Add metadata to an output. Can be invoked multiple times; last value for duplicate keys wins.
metadata
Mapping[str, Any]
required
The metadata to attach to the output.
The name of the output. Not needed for single-output ops.
The mapping key for dynamic outputs.
from dagster import asset, Out
@asset
def my_asset(context):
context.add_output_metadata({
"row_count": 100,
"preview": "data preview"
})
return data
@asset(out={"a": Out(), "b": Out()})
def multi_output_asset(context):
context.add_output_metadata({"foo": "bar"}, output_name="a")
context.add_output_metadata({"baz": "bat"}, output_name="b")
return ("data_a", "data_b")
def add_asset_metadata(
self,
metadata: Mapping[str, Any],
asset_key: CoercibleToAssetKey | None = None,
partition_key: str | None = None,
) -> None
Add metadata to an asset materialization event. Visible in the Dagster UI.
metadata
Mapping[str, Any]
required
The metadata to add to the asset materialization.
asset_key
CoercibleToAssetKey | None
The asset key. Not needed if only one asset is being materialized.
The partition key for partitioned assets. If not provided on a partitioned asset, metadata is added to all partitions being materialized.
import dagster as dg
@dg.asset
def my_asset(context):
context.add_asset_metadata({"row_count": 1000})
return data
@dg.asset(partitions_def=dg.StaticPartitionsDefinition(["a", "b"]))
def partitioned_asset(context):
# Add to all partitions
context.add_asset_metadata({"source": "api"})
# Add to specific partition
for partition_key in context.partition_keys:
if partition_key == "a":
context.add_asset_metadata(
{"special": "value"},
partition_key=partition_key
)
@dg.multi_asset(specs=[dg.AssetSpec("asset1"), dg.AssetSpec("asset2")])
def my_multi_asset(context):
context.add_asset_metadata({"key": "value"}, asset_key="asset1")
# Must specify asset_key in multi-assets
def get_output_metadata(
self,
output_name: str,
mapping_key: str | None = None,
) -> Mapping[str, Any] | None
Retrieve metadata that has been set for a specific output.
Event Logging
log_event
def log_event(self, event: UserEvent) -> None
Log an AssetMaterialization, AssetObservation, or ExpectationResult. Events appear in the event log and DagsterEvents list.
event
AssetMaterialization | AssetObservation | ExpectationResult
required
The event to log.
from dagster import asset, AssetMaterialization
@asset
def my_asset(context):
context.log_event(AssetMaterialization(
asset_key="external_asset",
description="Materialized external asset"
))
return data
Data Lineage
get_asset_provenance
def get_asset_provenance(self, asset_key: AssetKey) -> DataProvenance | None
Return provenance information for the most recent materialization of an asset.
The asset key to retrieve provenance for.
Returns: DataProvenance | None - Provenance information, or None if never materialized or record is too old.
@asset
def downstream_asset(context: AssetExecutionContext, upstream_asset):
provenance = context.get_asset_provenance(AssetKey(["upstream_asset"]))
if provenance:
context.log.info(f"Data version: {provenance.data_version}")
Debugging
describe_op
def describe_op(self) -> str
Return a string description of the currently executing op.
get_mapping_key
def get_mapping_key(self) -> str | None
Return the mapping key if downstream of a DynamicOutput, otherwise None.