Skip to main content

Automation

Automation in Dagster allows you to declaratively specify when assets should be materialized without writing manual schedules or sensors. Using automation conditions, you can define rules that determine when assets need updates based on:
  • Changes to upstream dependencies
  • Time-based schedules
  • Custom business logic
  • Missing data
  • Code changes
Automation provides a higher-level abstraction than schedules and sensors, making it easier to build self-updating data pipelines.

Why Use Automation?

Traditional approaches require explicit scheduling:
# ❌ Manual scheduling - must coordinate multiple schedules
@schedule(cron_schedule="0 1 * * *")
def schedule_asset_a():
    return RunRequest(asset_selection=[asset_a])

@schedule(cron_schedule="0 2 * * *")  # Must run after asset_a
def schedule_asset_b():
    return RunRequest(asset_selection=[asset_b])
Automation conditions handle coordination automatically:
# ✅ Declarative automation - automatic coordination
@asset(automation_condition=AutomationCondition.eager())
def asset_a():
    return load_data()

@asset(automation_condition=AutomationCondition.eager())
def asset_b(asset_a):  # Automatically runs after asset_a updates
    return process(asset_a)

Core Automation Conditions

Eager Automation

The eager condition materializes assets as soon as their dependencies update:
import dagster as dg

@dg.asset(
    deps=["upstream"],
    automation_condition=dg.AutomationCondition.eager(),
)
def eager_asset() -> None:
    # Materializes immediately when 'upstream' is updated
    ...
Eager assets are ideal for:
  • Real-time data pipelines
  • Assets that should always reflect the latest data
  • Critical downstream dependencies

On Cron Automation

Schedule assets using cron expressions:
import dagster as dg

@dg.asset(
    automation_condition=dg.AutomationCondition.on_cron("0 9 * * *"),
)
def daily_report():
    # Materializes every day at 9 AM
    return generate_report()
Cron-based automation is useful for:
  • Daily, weekly, or monthly reports
  • Time-sensitive computations
  • Rate-limited external APIs

On Missing Automation

Materialize only when the asset doesn’t exist:
import dagster as dg

@dg.asset(
    automation_condition=dg.AutomationCondition.on_missing(),
)
def one_time_asset():
    # Only materializes if not already materialized
    return expensive_computation()
Useful for:
  • One-time migrations
  • Expensive seed data
  • Historical backfills

Custom Conditions

Combine conditions using logical operators:
import dagster as dg

@dg.asset(
    automation_condition=(
        # Run daily at 9 AM OR when upstream changes
        dg.AutomationCondition.on_cron("0 9 * * *")
        | dg.AutomationCondition.eager()
    ),
)
def flexible_asset():
    return compute_data()

Automation Condition Operators

Combine conditions using logical operators:
import dagster as dg

@dg.asset(
    automation_condition=(
        # Must satisfy BOTH conditions
        dg.AutomationCondition.eager()
        & dg.AutomationCondition.on_cron("0 9 * * *")
    ),
)
def selective_asset():
    # Only runs if upstream updated AND it's 9 AM
    return compute()

Code Version Tracking

Track when asset code changes using code_version:
import dagster as dg

@dg.asset(
    automation_condition=dg.AutomationCondition.eager(),
    code_version="v2",  # Increment when logic changes
)
def versioned_asset():
    return compute_with_new_logic()
When code_version changes:
  • Dagster detects the asset is “stale”
  • Automation conditions can trigger rematerialization
  • Helps track which code produced each materialization

Working with Partitioned Assets

Automation conditions work with partitioned assets:
import dagster as dg

@dg.asset(
    partitions_def=dg.DailyPartitionsDefinition(start_date="2024-01-01"),
    automation_condition=dg.AutomationCondition.eager(),
)
def partitioned_asset(context: dg.AssetExecutionContext):
    # Each partition materializes eagerly when upstream updates
    date = context.partition_key
    return fetch_data(date)

@dg.asset(
    partitions_def=dg.DailyPartitionsDefinition(start_date="2024-01-01"),
    automation_condition=dg.AutomationCondition.on_cron("0 2 * * *"),
)
def scheduled_partitioned_asset(context: dg.AssetExecutionContext):
    # Latest partition materializes daily at 2 AM
    date = context.partition_key
    return process_data(date)

Asset Groups with Automation

Apply automation to entire asset groups:
import dagster as dg

@dg.asset(group_name="raw_data")
def raw_users():
    return fetch_users()

@dg.asset(group_name="raw_data")
def raw_orders():
    return fetch_orders()

@dg.asset(
    group_name="processed",
    deps=["raw_users", "raw_orders"],
    automation_condition=dg.AutomationCondition.eager(),
)
def combined_data(raw_users, raw_orders):
    return merge(raw_users, raw_orders)

# All assets in 'processed' group update automatically

Automation with Dependencies

Allowing Missing Upstreams

Materialize even if some upstreams are missing:
import dagster as dg

@dg.asset(
    automation_condition=(
        dg.AutomationCondition.eager()
        .allow_missing_upstream()
    ),
)
def tolerant_asset(optional_upstream=None):
    # Runs even if optional_upstream isn't materialized
    if optional_upstream:
        return process(optional_upstream)
    return default_value()

Allowing Updated Dependencies

Materialize when downstream dependencies have been updated:
import dagster as dg

@dg.asset(
    automation_condition=(
        dg.AutomationCondition.eager()
        .allow_dependencies()
    ),
)
def inclusive_asset():
    # Considers both upstream and downstream changes
    return compute()

Blocking Conditions

Prevent materialization based on asset checks:
import dagster as dg

@dg.asset
def validated_asset():
    return load_data()

@dg.asset_check(asset=validated_asset)
def quality_check():
    # Validate data quality
    data = load_data()
    passed = validate(data)
    return dg.AssetCheckResult(passed=passed)

@dg.asset(
    deps=[validated_asset],
    automation_condition=(
        dg.AutomationCondition.eager()
        .without_blocking_checks()  # Only run if checks pass
    ),
)
def downstream_asset(validated_asset):
    return process(validated_asset)

Testing Automation

Test automation conditions in unit tests:
import dagster as dg

@dg.asset(automation_condition=dg.AutomationCondition.eager())
def my_asset():
    return [1, 2, 3]

def test_automation():
    # Materialize and verify automation
    result = dg.materialize([my_asset])
    assert result.success
    
    # Check that automation condition is set
    assert my_asset.automation_condition is not None

Observability

Automation Evaluation History

View why assets were or weren’t materialized:
  • In the UI: Navigate to the Asset Details page → Automation tab
  • View evaluation results: See which conditions were satisfied
  • Debug skipped runs: Understand why assets weren’t materialized

Automation Logs

Automation evaluations are logged:
import dagster as dg

@dg.asset(
    automation_condition=dg.AutomationCondition.eager(),
)
def logged_asset():
    # Automation evaluations appear in the run logs
    return compute_data()
Logs show:
  • When conditions were evaluated
  • Which conditions were true/false
  • Why runs were requested or skipped

Advanced Patterns

Time-Window Conditions

Materialize only during specific time windows:
import dagster as dg
from datetime import time

@dg.asset(
    automation_condition=(
        dg.AutomationCondition.eager()
        & dg.AutomationCondition.on_cron("0 9-17 * * 1-5")  # Business hours
    ),
)
def business_hours_asset():
    # Only updates during 9 AM - 5 PM, Monday-Friday
    return compute()

Multi-Condition Assets

Complex automation logic:
import dagster as dg

@dg.asset(
    automation_condition=(
        # Run in any of these scenarios:
        dg.AutomationCondition.on_cron("0 0 * * *")  # Daily at midnight
        | (
            # OR when upstream updates during business hours
            dg.AutomationCondition.eager()
            & dg.AutomationCondition.on_cron("0 9-17 * * 1-5")
        )
        | dg.AutomationCondition.on_missing()  # OR when missing
    ),
)
def sophisticated_asset():
    return compute_complex_logic()

Asset Check Integration

Use asset checks to control automation:
import dagster as dg

@dg.asset
def source_data():
    return load_external_data()

@dg.asset_check(asset=source_data)
def freshness_check():
    data = load_external_data()
    is_fresh = check_freshness(data)
    return dg.AssetCheckResult(passed=is_fresh)

@dg.asset(
    deps=[source_data],
    automation_condition=(
        dg.AutomationCondition.eager()
        .with_blocking_checks(["freshness_check"])
    ),
)
def processed_data(source_data):
    # Only runs if source_data passed freshness_check
    return process(source_data)

Automation vs Schedules vs Sensors

  • You want declarative, condition-based execution
  • Assets should update based on dependency changes
  • You need automatic coordination between related assets
  • You want to express “when” logic without writing code
  • You need precise control over timing
  • Complex run configuration is required
  • You want explicit, imperative scheduling
  • Your schedule logic is very custom
  • You need to react to external events
  • You’re integrating with third-party systems
  • You need stateful polling logic
  • Timing is unpredictable

Enabling Automation

Automation requires the asset daemon to be running:

In Development

# The dagster dev command runs the daemon automatically
dagster dev

In Production

Ensure your deployment runs the daemon:
# Run the daemon separately
dagster-daemon run
For Dagster Cloud or Kubernetes deployments, the daemon runs automatically.

Monitoring Automation

Asset Health

The Dagster UI shows:
  • Automation status: Whether automation is enabled
  • Last evaluation: When conditions were last checked
  • Next expected run: When the asset will likely materialize
  • Evaluation history: Past decisions and reasons

Daemon Logs

View daemon logs to debug automation:
# View daemon logs
dagster-daemon logs
Logs show:
  • Automation evaluations
  • Requested runs
  • Skipped runs and reasons
  • Errors or warnings

Best Practices

For most assets, AutomationCondition.eager() provides the right behavior: update when dependencies change.
Increment code_version when you change asset logic. This helps track which code produced each materialization.
Use eager | on_cron to get both reactive updates and guaranteed periodic refreshes.
Run dagster dev locally to see how automation behaves before deploying to production.
Ensure the asset daemon is running and healthy. Check daemon logs if automation isn’t working as expected.
Block downstream automation until upstream data passes quality checks using without_blocking_checks().

Migration from Schedules/Sensors

Convert existing schedules to automation:
# Before: Using schedules
@asset
def my_asset():
    return load_data()

@schedule(target=[my_asset], cron_schedule="0 0 * * *")
def my_schedule():
    return RunRequest()

# After: Using automation
@asset(
    automation_condition=AutomationCondition.on_cron("0 0 * * *")
)
def my_asset():
    return load_data()
Benefits of migration:
  • Simpler code (no separate schedule definition)
  • Better visibility in the UI
  • Automatic coordination with dependencies
  • Declarative condition composition

API Reference

  • @asset - Define assets with automation conditions
  • Partitions - Partition definitions for automation
  • Schedules - Alternative time-based automation