Skip to main content
This quickstart will walk you through creating your first Dagster asset, running it locally, and viewing the results in the web UI.

What You’ll Build

You’ll create a simple data pipeline that:
  • Loads sample data from a CSV file
  • Processes the data using Pandas
  • Saves the transformed output
By the end, you’ll understand Dagster’s core concepts and have a working local setup.

Prerequisites

Make sure you have Dagster installed. If not, see the Installation guide.

Step 1: Create Your First Asset

Dagster assets represent data artifacts in your pipeline. Let’s create a simple one.
1

Create a Python file

Create a new file called hello_dagster.py:
hello_dagster.py
import dagster as dg

@dg.asset
def hello(context: dg.AssetExecutionContext):
    context.log.info("Hello, Dagster!")
    return "Hello from Dagster!"

@dg.asset(deps=[hello])
def world(context: dg.AssetExecutionContext):
    context.log.info("World!")
    return "Hello world pipeline complete"
The @dg.asset decorator turns a Python function into a data asset. Assets with deps depend on other assets.
2

Test your asset

You can materialize assets directly from Python:
if __name__ == "__main__":
    dg.materialize([hello, world])
Run the file:
python hello_dagster.py
You should see log output indicating successful materialization.

Step 2: Create a Real Data Pipeline

Now let’s build something more practical with actual data processing.
1

Create project structure

mkdir dagster_quickstart
cd dagster_quickstart
mkdir -p defs/data
2

Create sample data

Create defs/data/sample_data.csv:
defs/data/sample_data.csv
name,age,city
Alice,25,New York
Bob,35,San Francisco
Charlie,45,Chicago
Diana,28,Boston
3

Create your asset definition

Create defs/assets.py with a data processing asset:
defs/assets.py
import pandas as pd
import dagster as dg

sample_data_file = "defs/data/sample_data.csv"
processed_data_file = "defs/data/processed_data.csv"

@dg.asset
def processed_data():
    # Read data from the CSV
    df = pd.read_csv(sample_data_file)
    
    # Add an age_group column based on the value of age
    df["age_group"] = pd.cut(
        df["age"], 
        bins=[0, 30, 40, 100], 
        labels=["Young", "Middle", "Senior"]
    )
    
    # Save processed data
    df.to_csv(processed_data_file, index=False)
    return "Data loaded successfully"
This asset reads a CSV, adds a computed column, and saves the result. Dagster tracks when this runs and what it produces.
4

Create definitions file

Create definitions.py to expose your assets:
definitions.py
import dagster as dg
from defs.assets import processed_data

defs = dg.Definitions(
    assets=[processed_data]
)

Step 3: Start the Dagster UI

1

Launch the development server

From your project directory, run:
dagster dev
The dagster dev command starts both the Dagster daemon and web server in development mode.
You should see output like:
Serving dagster-webserver on http://127.0.0.1:3000 in process
2

Open the UI

Open your browser to http://localhost:3000You’ll see the Dagster web interface with your asset graph.
3

View your assets

In the UI, you’ll see:
  • Asset catalog - All your defined assets
  • Asset lineage graph - Visual representation of dependencies
  • Materialize button - Click to run your pipeline
The UI shows processed_data as a single asset ready to materialize.

Step 4: Materialize Your Asset

1

Click Materialize

In the asset view, click the Materialize button in the top-right corner.
“Materializing” an asset means executing the function to produce its output.
2

Watch the run

You’ll be taken to the run page where you can:
  • See real-time logs
  • Monitor progress
  • View any errors or warnings
The run should complete in a few seconds.
3

Verify the output

Check that defs/data/processed_data.csv was created with the age_group column:
name,age,city,age_group
Alice,25,New York,Young
Bob,35,San Francisco,Middle
Charlie,45,Chicago,Senior
Diana,28,Boston,Young

Advanced Example: HackerNews Pipeline

Here’s a more complex example from Dagster’s official examples that fetches real data:
import json
import os
import requests
from dagster import asset

@asset(group_name="hackernews", compute_kind="HackerNews API")
def topstory_ids() -> None:
    """Get up to 100 top stories from the HackerNews topstories endpoint.
    
    API Docs: https://github.com/HackerNews/API#new-top-and-best-stories
    """
    newstories_url = "https://hacker-news.firebaseio.com/v0/topstories.json"
    top_new_story_ids = requests.get(newstories_url).json()[:100]
    
    os.makedirs("data", exist_ok=True)
    with open("data/topstory_ids.json", "w") as f:
        json.dump(top_new_story_ids, f)
This example shows:
  • Asset dependencies - topstories depends on topstory_ids
  • Metadata - Attach rich information like record counts and data previews
  • Logging - Track progress during execution
  • Grouping - Organize related assets with group_name

Understanding the Asset Graph

Dagster automatically builds a dependency graph from your assets:
topstory_ids

  topstories

most_frequent_words
The UI shows:
  • Upstream dependencies - Assets that must run first
  • Downstream dependencies - Assets that depend on this one
  • Compute kind badges - Visual indicators of what type of computation
  • Last materialization - When it was last run successfully

Development Workflow

1

Make changes to your code

Edit your asset definitions in your Python files.
2

Reload definitions

In the UI, click the Reload definitions button (or press Cmd+R).
With dagster dev, changes are detected automatically in most cases.
3

Test your changes

Materialize individual assets or subsets to test your changes without running the entire pipeline.

Next Steps

Now that you have a working Dagster setup:

Add Schedules

Run your pipeline automatically on a schedule

Add Metadata

Track data quality metrics and previews

Use Resources

Connect to databases and external services

Deploy to Production

Take your pipeline to production

Common Patterns

@dg.asset(deps=[asset_one, asset_two])
def combined_asset():
    # Access outputs from both dependencies
    pass
from dagster import Config

class MyAssetConfig(Config):
    api_key: str
    max_results: int = 100

@dg.asset
def configured_asset(config: MyAssetConfig):
    # Use config.api_key and config.max_results
    pass
@dg.asset(io_manager_key="database_io_manager")
def database_asset() -> pd.DataFrame:
    # Return DataFrame, IO manager handles storage
    return df
from dagster import materialize

def test_my_asset():
    result = materialize([my_asset])
    assert result.success

Troubleshooting

Port already in use?If port 3000 is already taken, specify a different port:
dagster dev -p 3001
  • Ensure your asset is included in the Definitions object
  • Click Reload definitions in the UI
  • Check for Python syntax errors in your code
  • Verify all dependencies are installed: pip install pandas dagster
  • Check your Python path includes the project directory
  • Check the Logs tab in the run view for detailed error messages
  • Verify file paths and permissions
  • Ensure external APIs or databases are accessible
Learn More: Explore the official Dagster tutorial for a comprehensive walkthrough of building production pipelines.