Observability ensures your data pipelines run reliably and you’re alerted when issues arise. Dagster provides schedules, sensors, logging, and monitoring capabilities.
Schedules
Schedules run your assets or jobs on a fixed cadence.
Basic Schedule
import dagster as dg
@dg.asset
def customer_data(): ...
@dg.asset
def sales_report(): ...
daily_schedule = dg.ScheduleDefinition(
name="daily_refresh",
cron_schedule="0 0 * * *", # Runs at midnight daily
target=[customer_data, sales_report],
)
defs = dg.Definitions(
assets=[customer_data, sales_report],
schedules=[daily_schedule]
)
Partitioned Schedule
Schedule partitioned assets to process incremental data:
import datetime
import dagster as dg
daily_partitions = dg.DailyPartitionsDefinition(start_date="2024-01-01")
@dg.asset(partitions_def=daily_partitions)
def daily_sales_data(context: dg.AssetExecutionContext) -> None:
date = context.partition_key
# Process data for this date
process_sales(date)
# Create a partitioned job
daily_sales_job = dg.define_asset_job(
name="daily_sales_job",
selection=[daily_sales_data],
)
@dg.schedule(
job=daily_sales_job,
cron_schedule="0 1 * * *", # Run at 1:00 AM every day
)
def daily_sales_schedule(context):
"""Process previous day's sales data."""
previous_day = context.scheduled_execution_time.date() - datetime.timedelta(days=1)
date = previous_day.strftime("%Y-%m-%d")
return dg.RunRequest(
run_key=date,
partition_key=date,
)
defs = dg.Definitions(
assets=[daily_sales_data],
schedules=[daily_sales_schedule]
)
Use cron expressions for flexible scheduling. 0 0 * * * runs daily at midnight, 0 */6 * * * runs every 6 hours, and 0 0 * * 1 runs weekly on Mondays.
Sensors
Sensors react to external events to trigger runs.
File Sensor
Trigger runs when new files appear:
import random
import dagster as dg
@dg.asset
def my_asset(context: dg.AssetExecutionContext):
context.log.info("Processing file!")
my_job = dg.define_asset_job("my_job", selection=[my_asset])
def check_for_new_files() -> list[str]:
# In production, check S3, filesystem, etc.
if random.random() > 0.5:
return ["file1", "file2"]
return []
@dg.sensor(
job=my_job,
minimum_interval_seconds=5,
default_status=dg.DefaultSensorStatus.RUNNING,
)
def new_file_sensor():
new_files = check_for_new_files()
if new_files:
for filename in new_files:
yield dg.RunRequest(run_key=filename)
else:
yield dg.SkipReason("No new files found")
defs = dg.Definitions(
assets=[my_asset],
jobs=[my_job],
sensors=[new_file_sensor]
)
Asset Sensor
React to asset materializations:
import dagster as dg
@dg.asset
def upstream_asset():
return "data"
@dg.asset
def downstream_asset():
return "processed"
downstream_job = dg.define_asset_job(
"downstream_job",
selection=[downstream_asset]
)
@dg.asset_sensor(
asset_key=dg.AssetKey("upstream_asset"),
job=downstream_job,
)
def upstream_materialized_sensor(context, asset_event):
"""Run downstream job when upstream asset updates."""
yield dg.RunRequest(
run_key=context.cursor,
run_config={
"ops": {
"downstream_asset": {
"config": {
"upstream_timestamp": asset_event.timestamp
}
}
}
}
)
defs = dg.Definitions(
assets=[upstream_asset, downstream_asset],
jobs=[downstream_job],
sensors=[upstream_materialized_sensor]
)
Sensor with Cursor
Track state between sensor evaluations:
import dagster as dg
@dg.sensor(job=my_job, minimum_interval_seconds=30)
def api_sensor(context: dg.SensorEvaluationContext):
# Get the last processed ID from cursor
last_id = int(context.cursor) if context.cursor else 0
# Fetch new records from API
new_records = fetch_records_since(last_id)
if new_records:
latest_id = max(record.id for record in new_records)
for record in new_records:
yield dg.RunRequest(
run_key=str(record.id),
run_config={"record_id": record.id}
)
# Update cursor to track progress
context.update_cursor(str(latest_id))
else:
yield dg.SkipReason(f"No new records since ID {last_id}")
Multi-Asset Sensor
Wait for multiple assets before triggering:
from dagster import (
AssetKey,
MultiAssetSensorEvaluationContext,
RunRequest,
multi_asset_sensor,
)
@dg.asset
def asset_a(): ...
@dg.asset
def asset_b(): ...
@dg.asset
def asset_c(): ...
combined_job = dg.define_asset_job(
"combined_job",
selection=[asset_c]
)
@multi_asset_sensor(
monitored_assets=[AssetKey("asset_a"), AssetKey("asset_b")],
job=combined_job,
)
def wait_for_all_assets_sensor(context: MultiAssetSensorEvaluationContext):
# Get the latest materialization for each monitored asset
asset_events = context.latest_materialization_records_by_key()
if all(asset_events.values()):
# All assets have been materialized
yield RunRequest(run_key="combined")
else:
# Not all assets ready yet
missing = [k for k, v in asset_events.items() if not v]
yield dg.SkipReason(f"Waiting for: {missing}")
Logging
Capture detailed information about pipeline execution:
from dagster import AssetExecutionContext, asset
import pandas as pd
@asset
def detailed_logging(context: AssetExecutionContext):
context.log.info("Starting data processing")
df = pd.read_csv("data.csv")
context.log.info(f"Loaded {len(df)} rows")
# Log warnings for data quality issues
null_count = df.isnull().sum().sum()
if null_count > 0:
context.log.warning(f"Found {null_count} null values")
# Log debug information
context.log.debug(f"Columns: {df.columns.tolist()}")
processed = df.dropna()
context.log.info(f"After cleaning: {len(processed)} rows")
return processed
Log levels:
debug(): Detailed diagnostic information
info(): General informational messages
warning(): Warning messages for potential issues
error(): Error messages for failures
Run Status Sensors
React to run failures or successes:
from dagster import (
DagsterRunStatus,
RunFailureSensorContext,
run_failure_sensor,
run_status_sensor,
)
from dagster_slack import SlackResource
@run_failure_sensor(
monitored_jobs=[my_job],
request_job=cleanup_job,
)
def failure_cleanup_sensor(context: RunFailureSensorContext):
"""Run cleanup when a job fails."""
yield RunRequest(
run_config={
"ops": {
"cleanup": {
"config": {
"failed_run_id": context.dagster_run.run_id
}
}
}
}
)
@run_status_sensor(
monitored_jobs=[important_job],
run_status=DagsterRunStatus.SUCCESS,
)
def success_notification_sensor(
context,
slack: SlackResource
):
"""Send Slack notification on successful runs."""
slack.get_client().chat_postMessage(
channel="#data-eng",
text=f"✅ Job {context.dagster_run.job_name} completed successfully!"
)
Asset Checks
Monitor data quality continuously:
import pandas as pd
import dagster as dg
@dg.asset
def orders():
orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})
orders_df.to_csv("orders.csv")
@dg.asset_check(asset=orders)
def orders_id_has_no_nulls():
orders_df = pd.read_csv("orders.csv")
num_null_order_ids = orders_df["order_id"].isna().sum()
return dg.AssetCheckResult(
passed=bool(num_null_order_ids == 0),
metadata={
"null_count": int(num_null_order_ids),
"total_rows": len(orders_df),
}
)
@dg.multi_asset_check(
specs=[
dg.AssetCheckSpec(name="orders_id_has_no_nulls", asset="orders"),
dg.AssetCheckSpec(name="items_id_has_no_nulls", asset="orders"),
]
)
def orders_check():
orders_df = pd.read_csv("orders.csv")
# Check for null order_id values
num_null_order_ids = orders_df["order_id"].isna().sum()
yield dg.AssetCheckResult(
check_name="orders_id_has_no_nulls",
passed=bool(num_null_order_ids == 0),
asset_key="orders",
)
# Check for null item_id values
num_null_item_ids = orders_df["item_id"].isna().sum()
yield dg.AssetCheckResult(
check_name="items_id_has_no_nulls",
passed=bool(num_null_item_ids == 0),
asset_key="orders",
)
Freshness Checks
Ensure assets are updated on time:
import datetime
from dagster import asset, build_last_update_freshness_checks
@asset
def daily_metrics():
# Should update daily
return compute_metrics()
# Automatically create freshness checks
freshness_checks = build_last_update_freshness_checks(
assets=[daily_metrics],
lower_bound_delta=datetime.timedelta(days=2),
)
defs = dg.Definitions(
assets=[daily_metrics],
asset_checks=freshness_checks,
)
Define schedules for regular runs
Set up cron schedules for time-based execution.
Add sensors for event-driven pipelines
Create sensors to react to external triggers.
Add context logging throughout your assets.
Define data quality checks for critical assets.
Use run status sensors to send notifications.
Monitoring Best Practices
Use Descriptive Run Keys
Make it easy to identify runs:
@dg.sensor(job=my_job)
def sensor_with_good_keys(context):
for record in get_records():
yield dg.RunRequest(
run_key=f"{record.date}_{record.id}",
tags={
"date": record.date,
"source": record.source,
}
)
Track important metrics:
from dagster import MaterializeResult, MetadataValue
@asset
def tracked_asset() -> MaterializeResult:
df = process_data()
return MaterializeResult(
metadata={
"num_rows": len(df),
"columns": MetadataValue.md(", ".join(df.columns)),
"processing_time_seconds": 12.5,
"data_size_mb": df.memory_usage(deep=True).sum() / 1024 / 1024,
}
)
Organize assets for better visibility:
@asset(group_name="ingestion")
def raw_data(): ...
@asset(group_name="ingestion")
def api_data(): ...
@asset(group_name="transform")
def cleaned_data(): ...
Set Up Alerts
Notify the team of critical failures:
from dagster_slack import SlackResource
@run_failure_sensor
def critical_failure_alert(
context: RunFailureSensorContext,
slack: SlackResource
):
if "critical" in context.dagster_run.tags:
slack.get_client().chat_postMessage(
channel="#alerts",
text=f"🚨 Critical job failed: {context.dagster_run.job_name}\n"
f"Run ID: {context.dagster_run.run_id}\n"
f"Error: {context.failure_event.message}"
)
Schedules and sensors must be turned on in the UI or via the daemon. They won’t run automatically when defined.
Advanced Patterns
Dynamic Sensors
Generate run requests based on API responses:
@dg.sensor(job=process_job)
def dynamic_api_sensor(context: dg.SensorEvaluationContext):
# Query external API
pending_tasks = api_client.get_pending_tasks()
for task in pending_tasks:
yield dg.RunRequest(
run_key=task.id,
run_config={
"ops": {
"process_task": {
"config": {
"task_id": task.id,
"task_type": task.type,
}
}
}
},
tags={
"task_type": task.type,
"priority": task.priority,
}
)
Conditional Scheduling
Skip runs based on conditions:
@dg.schedule(job=expensive_job, cron_schedule="0 2 * * *")
def conditional_schedule(context):
# Only run on weekdays
if context.scheduled_execution_time.weekday() < 5:
return dg.RunRequest()
else:
return dg.SkipReason("Skipping weekend run")
Next Steps