Skip to main content

Partitions & Backfills

Partitions allow you to split an asset or job into smaller, independent pieces that can be materialized separately. Backfills enable you to efficiently materialize historical partitions in bulk. Partitioning is essential for:
  • Processing time-series data: Handle daily, weekly, or monthly data increments
  • Incremental computation: Only recompute data that changed
  • Parallel execution: Process multiple partitions simultaneously
  • Historical replay: Backfill data for specific time ranges
  • Cost optimization: Avoid reprocessing all data when only recent data changed

Why Partition?

Without partitions, you process all data every time:
# ❌ Processes all data on every run
@asset
def user_events():
    return query_all_events()  # Gets millions of rows
With partitions, you process only what you need:
# ✅ Processes one day at a time
@asset(partitions_def=DailyPartitionsDefinition(start_date="2024-01-01"))
def user_events(context: AssetExecutionContext):
    date = context.partition_key  # e.g., "2024-01-15"
    return query_events_for_date(date)  # Only today's data

Time-Based Partitions

Dagster provides built-in definitions for common time-based partitioning:
from dagster import asset, DailyPartitionsDefinition, AssetExecutionContext
import urllib.request

@asset(partitions_def=DailyPartitionsDefinition(start_date="2023-10-01"))
def my_daily_partitioned_asset(context: AssetExecutionContext) -> None:
    partition_date_str = context.partition_key
    
    url = f"https://api.example.com/data?date={partition_date_str}"
    target_location = f"data/{partition_date_str}.csv"
    
    urllib.request.urlretrieve(url, target_location)

Time Window Access

For time-based partitions, access the time window instead of just the string key:
from dagster import asset, DailyPartitionsDefinition, AssetExecutionContext
from datetime import datetime

@asset(partitions_def=DailyPartitionsDefinition(start_date="2024-01-01"))
def time_windowed_asset(context: AssetExecutionContext):
    # Get datetime objects for the partition window
    time_window = context.partition_time_window
    start: datetime = time_window.start
    end: datetime = time_window.end
    
    return query_data_between(start, end)

Dynamic Partitions

Dynamic partitions allow you to add partition keys at runtime, useful for data that doesn’t follow a fixed schedule:
from dagster import DynamicPartitionsDefinition, asset, AssetExecutionContext
import requests

releases_partitions_def = DynamicPartitionsDefinition(name="releases")

@asset(
    partitions_def=releases_partitions_def,
    metadata={"partition_expr": "release_tag"},
)
def releases_metadata(context: AssetExecutionContext):
    release_tag = context.partition_key  # e.g., "v1.2.3"
    
    response = requests.get(
        f"https://api.github.com/repos/my-org/my-repo/releases/tags/{release_tag}"
    )
    
    return response.json()
Add partitions dynamically:
from dagster import sensor, RunRequest, SensorEvaluationContext

@sensor(target=[releases_metadata])
def new_release_sensor(context: SensorEvaluationContext):
    # Check for new releases
    new_releases = fetch_new_releases()
    
    for release in new_releases:
        # Add a new partition
        context.instance.add_dynamic_partitions(
            partitions_def_name="releases",
            partition_keys=[release.tag],
        )
        
        # Request a run for the new partition
        yield RunRequest(
            partition_key=release.tag,
            run_key=release.tag,
        )

Static Partitions

Static partitions define a fixed set of partition keys:
from dagster import asset, StaticPartitionsDefinition, AssetExecutionContext

@asset(
    partitions_def=StaticPartitionsDefinition(
        ["US", "EU", "APAC", "LATAM"]
    )
)
def regional_sales(context: AssetExecutionContext):
    region = context.partition_key
    return fetch_sales_data(region=region)

Multi-Dimensional Partitions

Partition by multiple dimensions using MultiPartitionsDefinition:
from dagster import (
    asset,
    MultiPartitionsDefinition,
    DailyPartitionsDefinition,
    StaticPartitionsDefinition,
    AssetExecutionContext,
)

partitions_def = MultiPartitionsDefinition({
    "date": DailyPartitionsDefinition(start_date="2024-01-01"),
    "region": StaticPartitionsDefinition(["US", "EU", "APAC"]),
})

@asset(partitions_def=partitions_def)
def regional_daily_sales(context: AssetExecutionContext):
    # partition_key is like "2024-01-15|US"
    partition_key = context.partition_key
    
    # Access individual dimensions
    date = context.partition_key.keys_by_dimension["date"]
    region = context.partition_key.keys_by_dimension["region"]
    
    return fetch_sales(date=date, region=region)

Partitioned Dependencies

When a partitioned asset depends on another partitioned asset, Dagster automatically maps partitions:
from dagster import asset, DailyPartitionsDefinition, AssetExecutionContext

daily = DailyPartitionsDefinition(start_date="2024-01-01")

@asset(partitions_def=daily)
def raw_events(context: AssetExecutionContext):
    date = context.partition_key
    return fetch_raw_events(date)

@asset(partitions_def=daily)
def processed_events(context: AssetExecutionContext, raw_events):
    # raw_events automatically contains data for the same partition
    return process(raw_events)

Partition Mapping

Customize how partitions map between assets:
from dagster import (
    asset,
    AssetIn,
    DailyPartitionsDefinition,
    WeeklyPartitionsDefinition,
    TimeWindowPartitionMapping,
    AssetExecutionContext,
)

daily = DailyPartitionsDefinition(start_date="2024-01-01")
weekly = WeeklyPartitionsDefinition(start_date="2024-01-01")

@asset(partitions_def=daily)
def daily_sales(context: AssetExecutionContext):
    return fetch_daily_sales(context.partition_key)

@asset(
    partitions_def=weekly,
    ins={
        "daily_sales": AssetIn(
            partition_mapping=TimeWindowPartitionMapping()
        )
    },
)
def weekly_sales(context: AssetExecutionContext, daily_sales):
    # daily_sales contains all daily partitions within the week
    return aggregate_to_weekly(daily_sales)

Backfills

Backfills let you materialize multiple partitions efficiently:

Launching Backfills

  1. Navigate to the asset in the Dagster UI
  2. Click “Materialize” → “Backfill”
  3. Select the partition range
  4. Click “Launch Backfill”

Backfill Policies

Control how backfills execute using BackfillPolicy:
from dagster import asset, DailyPartitionsDefinition, BackfillPolicy

@asset(
    partitions_def=DailyPartitionsDefinition(start_date="2024-01-01"),
    backfill_policy=BackfillPolicy.single_run(),
)
def bulk_load_asset(context):
    # All partitions processed in one execution
    partitions = context.partition_keys
    return load_multiple_partitions(partitions)
Efficient for assets where loading multiple partitions together is faster.

Chunked Backfills

Process backfills in chunks to avoid overwhelming resources:
from dagster import asset, DailyPartitionsDefinition, BackfillPolicy

@asset(
    partitions_def=DailyPartitionsDefinition(start_date="2024-01-01"),
    backfill_policy=BackfillPolicy.single_run(max_partitions_per_run=7),
)
def chunked_asset(context):
    # Processes up to 7 partitions per run
    partitions = context.partition_keys
    return process_partitions(partitions)

Partitioned Jobs

Jobs can also be partitioned:
from dagster import (
    op,
    job,
    DailyPartitionsDefinition,
    OpExecutionContext,
)

@op
def process_partition(context: OpExecutionContext):
    partition_key = context.partition_key
    context.log.info(f"Processing {partition_key}")
    return process_data_for_partition(partition_key)

@job(
    partitions_def=DailyPartitionsDefinition(start_date="2024-01-01")
)
def partitioned_job():
    process_partition()

Partition-Aware IO Managers

IO managers can handle partitions automatically:
import dagster as dg

class MyIOManager(dg.IOManager):
    def _get_path(self, context) -> str:
        if context.has_partition_key:
            # Store each partition separately
            return f"data/{'/'.join(context.asset_key.path)}/{context.asset_partition_key}.parquet"
        else:
            return f"data/{'/'.join(context.asset_key.path)}.parquet"

    def handle_output(self, context: dg.OutputContext, obj):
        path = self._get_path(context)
        write_parquet(path, obj)

    def load_input(self, context: dg.InputContext):
        path = self._get_path(context)
        return read_parquet(path)

Time Window Partitions in IO Managers

For single-run backfills, IO managers can load multiple partitions:
import dagster as dg

class MyIOManager(dg.IOManager):
    def load_input(self, context: dg.InputContext):
        # Get the time window for all partitions being processed
        start_datetime, end_datetime = context.asset_partitions_time_window
        return read_data_in_datetime_range(start_datetime, end_datetime)

    def handle_output(self, context: dg.OutputContext, obj):
        start_datetime, end_datetime = context.asset_partitions_time_window
        return overwrite_data_in_datetime_range(start_datetime, end_datetime, obj)

Testing Partitioned Assets

Test partitioned assets by providing a partition key:
from dagster import (
    build_asset_context,
    materialize,
    DailyPartitionsDefinition,
)

def test_partitioned_asset():
    # Test a specific partition
    context = build_asset_context(partition_key="2024-01-15")
    result = my_daily_asset(context)
    assert result is not None
    
    # Test materialization
    result = materialize(
        [my_daily_asset],
        partition_key="2024-01-15",
    )
    assert result.success

Partition Status

Check which partitions have been materialized:
from dagster import AssetExecutionContext

@asset(partitions_def=DailyPartitionsDefinition(start_date="2024-01-01"))
def incremental_asset(context: AssetExecutionContext):
    # Get materialized partitions
    materialized = context.instance.get_materialized_partitions(
        asset_key=context.asset_key
    )
    
    # Get failed partitions
    failed = context.instance.get_failed_partitions(
        asset_key=context.asset_key
    )
    
    context.log.info(f"Materialized: {len(materialized)} partitions")
    context.log.info(f"Failed: {len(failed)} partitions")

Best Practices

Balance between too many small partitions (overhead) and too few large partitions (long execution times). Daily partitions work well for most use cases.
When daily assets feed weekly assets, use TimeWindowPartitionMapping to automatically map partitions.
Each partition should be independently recomputable without side effects. This enables safe retries and backfills.
If loading multiple partitions together is more efficient (e.g., bulk database queries), use BackfillPolicy.single_run().
Use the Dagster UI to track which partitions are materialized, failed, or missing. This helps identify gaps in your data.

API Reference