Skip to main content
Observability ensures your data pipelines run reliably and you’re alerted when issues arise. Dagster provides schedules, sensors, logging, and monitoring capabilities.

Schedules

Schedules run your assets or jobs on a fixed cadence.

Basic Schedule

import dagster as dg

@dg.asset
def customer_data(): ...

@dg.asset
def sales_report(): ...

daily_schedule = dg.ScheduleDefinition(
    name="daily_refresh",
    cron_schedule="0 0 * * *",  # Runs at midnight daily
    target=[customer_data, sales_report],
)

defs = dg.Definitions(
    assets=[customer_data, sales_report],
    schedules=[daily_schedule]
)

Partitioned Schedule

Schedule partitioned assets to process incremental data:
import datetime
import dagster as dg

daily_partitions = dg.DailyPartitionsDefinition(start_date="2024-01-01")

@dg.asset(partitions_def=daily_partitions)
def daily_sales_data(context: dg.AssetExecutionContext) -> None:
    date = context.partition_key
    # Process data for this date
    process_sales(date)

# Create a partitioned job
daily_sales_job = dg.define_asset_job(
    name="daily_sales_job",
    selection=[daily_sales_data],
)

@dg.schedule(
    job=daily_sales_job,
    cron_schedule="0 1 * * *",  # Run at 1:00 AM every day
)
def daily_sales_schedule(context):
    """Process previous day's sales data."""
    previous_day = context.scheduled_execution_time.date() - datetime.timedelta(days=1)
    date = previous_day.strftime("%Y-%m-%d")
    return dg.RunRequest(
        run_key=date,
        partition_key=date,
    )

defs = dg.Definitions(
    assets=[daily_sales_data],
    schedules=[daily_sales_schedule]
)
Use cron expressions for flexible scheduling. 0 0 * * * runs daily at midnight, 0 */6 * * * runs every 6 hours, and 0 0 * * 1 runs weekly on Mondays.

Sensors

Sensors react to external events to trigger runs.

File Sensor

Trigger runs when new files appear:
import random
import dagster as dg

@dg.asset
def my_asset(context: dg.AssetExecutionContext):
    context.log.info("Processing file!")

my_job = dg.define_asset_job("my_job", selection=[my_asset])

def check_for_new_files() -> list[str]:
    # In production, check S3, filesystem, etc.
    if random.random() > 0.5:
        return ["file1", "file2"]
    return []

@dg.sensor(
    job=my_job,
    minimum_interval_seconds=5,
    default_status=dg.DefaultSensorStatus.RUNNING,
)
def new_file_sensor():
    new_files = check_for_new_files()
    if new_files:
        for filename in new_files:
            yield dg.RunRequest(run_key=filename)
    else:
        yield dg.SkipReason("No new files found")

defs = dg.Definitions(
    assets=[my_asset],
    jobs=[my_job],
    sensors=[new_file_sensor]
)

Asset Sensor

React to asset materializations:
import dagster as dg

@dg.asset
def upstream_asset():
    return "data"

@dg.asset
def downstream_asset():
    return "processed"

downstream_job = dg.define_asset_job(
    "downstream_job",
    selection=[downstream_asset]
)

@dg.asset_sensor(
    asset_key=dg.AssetKey("upstream_asset"),
    job=downstream_job,
)
def upstream_materialized_sensor(context, asset_event):
    """Run downstream job when upstream asset updates."""
    yield dg.RunRequest(
        run_key=context.cursor,
        run_config={
            "ops": {
                "downstream_asset": {
                    "config": {
                        "upstream_timestamp": asset_event.timestamp
                    }
                }
            }
        }
    )

defs = dg.Definitions(
    assets=[upstream_asset, downstream_asset],
    jobs=[downstream_job],
    sensors=[upstream_materialized_sensor]
)

Sensor with Cursor

Track state between sensor evaluations:
import dagster as dg

@dg.sensor(job=my_job, minimum_interval_seconds=30)
def api_sensor(context: dg.SensorEvaluationContext):
    # Get the last processed ID from cursor
    last_id = int(context.cursor) if context.cursor else 0
    
    # Fetch new records from API
    new_records = fetch_records_since(last_id)
    
    if new_records:
        latest_id = max(record.id for record in new_records)
        
        for record in new_records:
            yield dg.RunRequest(
                run_key=str(record.id),
                run_config={"record_id": record.id}
            )
        
        # Update cursor to track progress
        context.update_cursor(str(latest_id))
    else:
        yield dg.SkipReason(f"No new records since ID {last_id}")

Multi-Asset Sensor

Wait for multiple assets before triggering:
from dagster import (
    AssetKey,
    MultiAssetSensorEvaluationContext,
    RunRequest,
    multi_asset_sensor,
)

@dg.asset
def asset_a(): ...

@dg.asset
def asset_b(): ...

@dg.asset
def asset_c(): ...

combined_job = dg.define_asset_job(
    "combined_job",
    selection=[asset_c]
)

@multi_asset_sensor(
    monitored_assets=[AssetKey("asset_a"), AssetKey("asset_b")],
    job=combined_job,
)
def wait_for_all_assets_sensor(context: MultiAssetSensorEvaluationContext):
    # Get the latest materialization for each monitored asset
    asset_events = context.latest_materialization_records_by_key()
    
    if all(asset_events.values()):
        # All assets have been materialized
        yield RunRequest(run_key="combined")
    else:
        # Not all assets ready yet
        missing = [k for k, v in asset_events.items() if not v]
        yield dg.SkipReason(f"Waiting for: {missing}")

Logging

Capture detailed information about pipeline execution:
from dagster import AssetExecutionContext, asset
import pandas as pd

@asset
def detailed_logging(context: AssetExecutionContext):
    context.log.info("Starting data processing")
    
    df = pd.read_csv("data.csv")
    context.log.info(f"Loaded {len(df)} rows")
    
    # Log warnings for data quality issues
    null_count = df.isnull().sum().sum()
    if null_count > 0:
        context.log.warning(f"Found {null_count} null values")
    
    # Log debug information
    context.log.debug(f"Columns: {df.columns.tolist()}")
    
    processed = df.dropna()
    context.log.info(f"After cleaning: {len(processed)} rows")
    
    return processed
Log levels:
  • debug(): Detailed diagnostic information
  • info(): General informational messages
  • warning(): Warning messages for potential issues
  • error(): Error messages for failures

Run Status Sensors

React to run failures or successes:
from dagster import (
    DagsterRunStatus,
    RunFailureSensorContext,
    run_failure_sensor,
    run_status_sensor,
)
from dagster_slack import SlackResource

@run_failure_sensor(
    monitored_jobs=[my_job],
    request_job=cleanup_job,
)
def failure_cleanup_sensor(context: RunFailureSensorContext):
    """Run cleanup when a job fails."""
    yield RunRequest(
        run_config={
            "ops": {
                "cleanup": {
                    "config": {
                        "failed_run_id": context.dagster_run.run_id
                    }
                }
            }
        }
    )

@run_status_sensor(
    monitored_jobs=[important_job],
    run_status=DagsterRunStatus.SUCCESS,
)
def success_notification_sensor(
    context,
    slack: SlackResource
):
    """Send Slack notification on successful runs."""
    slack.get_client().chat_postMessage(
        channel="#data-eng",
        text=f"✅ Job {context.dagster_run.job_name} completed successfully!"
    )

Asset Checks

Monitor data quality continuously:
import pandas as pd
import dagster as dg

@dg.asset
def orders():
    orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})
    orders_df.to_csv("orders.csv")

@dg.asset_check(asset=orders)
def orders_id_has_no_nulls():
    orders_df = pd.read_csv("orders.csv")
    num_null_order_ids = orders_df["order_id"].isna().sum()
    
    return dg.AssetCheckResult(
        passed=bool(num_null_order_ids == 0),
        metadata={
            "null_count": int(num_null_order_ids),
            "total_rows": len(orders_df),
        }
    )

@dg.multi_asset_check(
    specs=[
        dg.AssetCheckSpec(name="orders_id_has_no_nulls", asset="orders"),
        dg.AssetCheckSpec(name="items_id_has_no_nulls", asset="orders"),
    ]
)
def orders_check():
    orders_df = pd.read_csv("orders.csv")
    
    # Check for null order_id values
    num_null_order_ids = orders_df["order_id"].isna().sum()
    yield dg.AssetCheckResult(
        check_name="orders_id_has_no_nulls",
        passed=bool(num_null_order_ids == 0),
        asset_key="orders",
    )
    
    # Check for null item_id values
    num_null_item_ids = orders_df["item_id"].isna().sum()
    yield dg.AssetCheckResult(
        check_name="items_id_has_no_nulls",
        passed=bool(num_null_item_ids == 0),
        asset_key="orders",
    )

Freshness Checks

Ensure assets are updated on time:
import datetime
from dagster import asset, build_last_update_freshness_checks

@asset
def daily_metrics():
    # Should update daily
    return compute_metrics()

# Automatically create freshness checks
freshness_checks = build_last_update_freshness_checks(
    assets=[daily_metrics],
    lower_bound_delta=datetime.timedelta(days=2),
)

defs = dg.Definitions(
    assets=[daily_metrics],
    asset_checks=freshness_checks,
)
1
Define schedules for regular runs
2
Set up cron schedules for time-based execution.
3
Add sensors for event-driven pipelines
4
Create sensors to react to external triggers.
5
Implement logging
6
Add context logging throughout your assets.
7
Set up asset checks
8
Define data quality checks for critical assets.
9
Configure alerts
10
Use run status sensors to send notifications.

Monitoring Best Practices

Use Descriptive Run Keys

Make it easy to identify runs:
@dg.sensor(job=my_job)
def sensor_with_good_keys(context):
    for record in get_records():
        yield dg.RunRequest(
            run_key=f"{record.date}_{record.id}",
            tags={
                "date": record.date,
                "source": record.source,
            }
        )

Add Metadata to Assets

Track important metrics:
from dagster import MaterializeResult, MetadataValue

@asset
def tracked_asset() -> MaterializeResult:
    df = process_data()
    
    return MaterializeResult(
        metadata={
            "num_rows": len(df),
            "columns": MetadataValue.md(", ".join(df.columns)),
            "processing_time_seconds": 12.5,
            "data_size_mb": df.memory_usage(deep=True).sum() / 1024 / 1024,
        }
    )
Organize assets for better visibility:
@asset(group_name="ingestion")
def raw_data(): ...

@asset(group_name="ingestion")
def api_data(): ...

@asset(group_name="transform")
def cleaned_data(): ...

Set Up Alerts

Notify the team of critical failures:
from dagster_slack import SlackResource

@run_failure_sensor
def critical_failure_alert(
    context: RunFailureSensorContext,
    slack: SlackResource
):
    if "critical" in context.dagster_run.tags:
        slack.get_client().chat_postMessage(
            channel="#alerts",
            text=f"🚨 Critical job failed: {context.dagster_run.job_name}\n"
                 f"Run ID: {context.dagster_run.run_id}\n"
                 f"Error: {context.failure_event.message}"
        )
Schedules and sensors must be turned on in the UI or via the daemon. They won’t run automatically when defined.

Advanced Patterns

Dynamic Sensors

Generate run requests based on API responses:
@dg.sensor(job=process_job)
def dynamic_api_sensor(context: dg.SensorEvaluationContext):
    # Query external API
    pending_tasks = api_client.get_pending_tasks()
    
    for task in pending_tasks:
        yield dg.RunRequest(
            run_key=task.id,
            run_config={
                "ops": {
                    "process_task": {
                        "config": {
                            "task_id": task.id,
                            "task_type": task.type,
                        }
                    }
                }
            },
            tags={
                "task_type": task.type,
                "priority": task.priority,
            }
        )

Conditional Scheduling

Skip runs based on conditions:
@dg.schedule(job=expensive_job, cron_schedule="0 2 * * *")
def conditional_schedule(context):
    # Only run on weekdays
    if context.scheduled_execution_time.weekday() < 5:
        return dg.RunRequest()
    else:
        return dg.SkipReason("Skipping weekend run")

Next Steps