Skip to main content
Assets are the core building blocks of Dagster. They represent data produced by your pipelines - files, database tables, ML models, or any other data artifact.

Basic Asset Definition

Define a simple asset using the @asset decorator:
import pandas as pd
import dagster as dg

@dg.asset
def processed_data():
    # Read data from the CSV
    df = pd.read_csv("src/data/sample_data.csv")

    # Add an age_group column based on the value of age
    df["age_group"] = pd.cut(
        df["age"], bins=[0, 30, 40, 100], labels=["Young", "Middle", "Senior"]
    )

    # Save processed data
    df.to_csv("src/data/processed_data.csv", index=False)
    return "Data loaded successfully"
The asset name is automatically derived from the function name. Dagster tracks when this asset was last materialized and its dependencies.

Asset Dependencies

Assets can depend on other assets, creating a data pipeline:
import json
import pandas as pd
import requests
from dagster import AssetExecutionContext, MaterializeResult, MetadataValue, asset

@asset(group_name="hackernews", compute_kind="HackerNews API")
def topstory_ids() -> None:
    """Get up to 100 top stories from the HackerNews topstories endpoint."""
    newstories_url = "https://hacker-news.firebaseio.com/v0/topstories.json"
    top_new_story_ids = requests.get(newstories_url).json()[:100]

    os.makedirs("data", exist_ok=True)
    with open("data/topstory_ids.json", "w") as f:
        json.dump(top_new_story_ids, f)

@asset(deps=[topstory_ids], group_name="hackernews", compute_kind="HackerNews API")
def topstories(context: AssetExecutionContext) -> MaterializeResult:
    """Get items based on story ids from the HackerNews items endpoint."""
    with open("data/topstory_ids.json") as f:
        topstory_ids = json.load(f)

    results = []
    for item_id in topstory_ids:
        item = requests.get(f"https://hacker-news.firebaseio.com/v0/item/{item_id}.json").json()
        results.append(item)

        if len(results) % 20 == 0:
            context.log.info(f"Got {len(results)} items so far.")

    df = pd.DataFrame(results)
    df.to_csv("data/topstories.csv")

    return MaterializeResult(
        metadata={
            "num_records": len(df),
            "preview": MetadataValue.md(str(df.head().to_markdown())),
        }
    )
Use deps for implicit dependencies where you don’t need the upstream asset’s value. Use function parameters for explicit dependencies where you need to access the upstream data.

Asset Inputs

For more control over dependencies, use AssetIn:
from dagster import AssetIn, asset

@asset(ins={"input2": AssetIn(key_prefix="something_else")})
def asset1(input1, input2):
    # input1 comes from an asset named "input1"
    # input2 comes from "something_else/input2"
    return input1 + input2

Asset Metadata

Add rich metadata to your assets to provide context and enable observability:
import base64
from io import BytesIO
import matplotlib.pyplot as plt
from dagster import AssetExecutionContext, MaterializeResult, MetadataValue, asset

@asset(deps=[topstories], group_name="hackernews", compute_kind="Plot")
def most_frequent_words(context: AssetExecutionContext) -> MaterializeResult:
    """Get the top 25 most frequent words in HackerNews story titles."""
    stopwords = ["a", "the", "an", "of", "to", "in", "for", "and", "with", "on", "is"]

    topstories = pd.read_csv("data/topstories.csv")

    # Loop through titles and count word frequency
    word_counts = {}
    for raw_title in topstories["title"]:
        title = raw_title.lower()
        for word in title.split():
            cleaned_word = word.strip(".,-!?:;()[]'\"\"")
            if cleaned_word not in stopwords and len(cleaned_word) > 0:
                word_counts[cleaned_word] = word_counts.get(cleaned_word, 0) + 1

    # Get the top 25 most frequent words
    top_words = {
        pair[0]: pair[1]
        for pair in sorted(word_counts.items(), key=lambda x: x[1], reverse=True)[:25]
    }

    # Make a bar chart
    plt.figure(figsize=(10, 6))
    plt.bar(list(top_words.keys()), list(top_words.values()))
    plt.xticks(rotation=45, ha="right")
    plt.title("Top 25 Words in Hacker News Titles")
    plt.tight_layout()

    # Convert image to saveable format
    buffer = BytesIO()
    plt.savefig(buffer, format="png")
    image_data = base64.b64encode(buffer.getvalue())

    # Convert to Markdown for preview
    md_content = f"![img](data:image/png;base64,{image_data.decode()})"

    with open("data/most_frequent_words.json", "w") as f:
        json.dump(top_words, f)

    # Attach metadata to the asset
    return MaterializeResult(metadata={"plot": MetadataValue.md(md_content)})

Multi-Assets

Define multiple assets that are computed together:
from dagster import AssetOut, multi_asset

@multi_asset(
    outs={
        "a": AssetOut(),
        "b": AssetOut(),
        "c": AssetOut()
    },
    internal_asset_deps={
        "a": {AssetKey("in1"), AssetKey("in2")},
        "b": set(),
        "c": {AssetKey("a"), AssetKey("b"), AssetKey("in2"), AssetKey("in3")}
    },
    can_subset=True
)
def abc_(context, in1, in2, in3):
    # Compute all three assets together
    a_result = compute_a(in1, in2)
    b_result = compute_b()
    c_result = compute_c(a_result, b_result, in2, in3)
    return a_result, b_result, c_result

Partitioned Assets

Partition assets to process data incrementally:
import datetime
import os
import pandas as pd
import dagster as dg

# Create the PartitionDefinition
daily_partitions = dg.DailyPartitionsDefinition(start_date="2024-01-01")

@dg.asset(partitions_def=daily_partitions)
def daily_sales_data(context: dg.AssetExecutionContext) -> None:
    date = context.partition_key
    # Simulate fetching daily sales data
    df = pd.DataFrame({
        "date": [date] * 10,
        "sales": [100, 200, 300, 400, 500, 600, 700, 800, 900, 1000],
    })

    os.makedirs("data/daily_sales", exist_ok=True)
    filename = f"data/daily_sales/sales_{date}.csv"
    df.to_csv(filename, index=False)

    context.log.info(f"Daily sales data written to {filename}")

@dg.asset(
    partitions_def=daily_partitions,
    deps=[daily_sales_data]
)
def daily_sales_summary(context):
    partition_date_str = context.partition_key
    filename = f"data/daily_sales/sales_{partition_date_str}.csv"
    df = pd.read_csv(filename)

    summary = {
        "date": partition_date_str,
        "total_sales": df["sales"].sum(),
    }

    context.log.info(f"Daily sales summary for {partition_date_str}: {summary}")
1
Define the partition scheme
2
Choose from DailyPartitionsDefinition, WeeklyPartitionsDefinition, MonthlyPartitionsDefinition, or create a custom StaticPartitionsDefinition.
3
Apply partitions to assets
4
Add the partitions_def parameter to your asset decorators.
5
Access partition keys
6
Use context.partition_key to get the current partition being processed.

Configurable Resources

Use resources to configure external connections and share them across assets:
import requests
import dagster as dg

class SunResource(dg.ConfigurableResource):
    latitude: str
    longitude: str
    time_zone: str

    @property
    def query_string(self) -> str:
        return f"https://api.sunrise-sunset.org/json?lat={self.latitude}&lng={self.longitude}&date=today&tzid={self.time_zone}"

    def sunrise(self) -> str:
        data = requests.get(self.query_string, timeout=5).json()
        return data["results"]["sunrise"]

@dg.asset
def sfo_sunrise(context: dg.AssetExecutionContext, sun_resource: SunResource) -> None:
    sunrise = sun_resource.sunrise()
    context.log.info(f"Sunrise in San Francisco is at {sunrise}.")

defs = dg.Definitions(
    assets=[sfo_sunrise],
    resources={
        "sun_resource": SunResource(
            latitude="37.615223",
            longitude="-122.389977",
            time_zone="America/Los_Angeles",
        )
    },
)

Integration Assets

Leverage pre-built integrations for popular data tools:
from dagster_airbyte import build_airbyte_assets
from dagster_dbt import DbtCliResource, dbt_assets
from dagster import AssetExecutionContext

# Airbyte assets
airbyte_assets = build_airbyte_assets(
    connection_id="your-connection-id",
    destination_tables=["orders", "users"],
    asset_key_prefix=["postgres_replica"],
)

# dbt assets
@dbt_assets(manifest="/path/to/manifest.json")
def dbt_project_assets(context: AssetExecutionContext, dbt: DbtCliResource):
    yield from dbt.cli(["build"], context=context).stream()
When using integration assets, ensure you have the corresponding library installed (e.g., dagster-dbt, dagster-airbyte) and properly configured resources.

Best Practices

Organize with Groups

Group related assets together for better organization in the UI:
@asset(group_name="analytics")
def revenue_metrics():
    pass

@asset(group_name="analytics")
def user_metrics():
    pass

Use Compute Kinds

Label assets with their computation type for clarity:
@asset(compute_kind="dbt")
def dbt_model():
    pass

@asset(compute_kind="Python")
def python_transform():
    pass

Return Metadata

Always return useful metadata to make assets observable:
@asset
def my_asset() -> MaterializeResult:
    df = compute_data()
    return MaterializeResult(
        metadata={
            "num_rows": len(df),
            "columns": MetadataValue.md(", ".join(df.columns)),
            "preview": MetadataValue.md(df.head().to_markdown()),
        }
    )

Next Steps