Skip to main content
Testing is essential for building reliable data pipelines. Dagster provides tools and patterns for testing assets, ops, resources, and entire pipelines.

Unit Testing Assets

Assets are just Python functions, so you can test them directly:
import pandas as pd
import dagster as dg

@dg.asset
def orders():
    orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})
    orders_df.to_csv("orders.csv")
    return orders_df

# Test the asset
def test_orders():
    result = orders()
    assert len(result) == 2
    assert "order_id" in result.columns
    assert "item_id" in result.columns

Testing Assets with Dependencies

For assets with dependencies, pass the upstream values as function arguments:
import pandas as pd
import dagster as dg

@dg.asset
def upstream_data():
    return pd.DataFrame({"value": [1, 2, 3]})

@dg.asset
def downstream_data(upstream_data: pd.DataFrame):
    return upstream_data["value"].sum()

# Test with mock data
def test_downstream_data():
    mock_upstream = pd.DataFrame({"value": [10, 20, 30]})
    result = downstream_data(mock_upstream)
    assert result == 60

Testing Assets with Resources

Mock resources to avoid external dependencies during testing:
from dagster_aws.s3 import S3FileHandle, S3FileManager
import dagster as dg
from unittest import mock

@dg.asset
def loaded_file(file_manager: S3FileManager) -> str:
    return file_manager.read_data(S3FileHandle("bucket", "path.txt"))

# Test with mocked resource
def test_file() -> None:
    mocked_resource = mock.Mock(spec=S3FileManager)
    mocked_resource.read_data.return_value = "contents"

    assert loaded_file(mocked_resource) == "contents"
    assert mocked_resource.read_data.called_once_with(
        S3FileHandle("bucket", "path.txt")
    )
Use unittest.mock or pytest-mock to create mocked resources that match the interface of production resources without making actual external calls.

Testing Assets with Context

When your asset uses AssetExecutionContext, you need to provide a test context:
from dagster import AssetExecutionContext, asset, build_asset_context
import pandas as pd

@asset
def partitioned_asset(context: AssetExecutionContext) -> None:
    partition_key = context.partition_key
    df = pd.DataFrame({"date": [partition_key], "value": [100]})
    context.log.info(f"Processing partition {partition_key}")
    df.to_csv(f"data_{partition_key}.csv")

# Test with build_asset_context
def test_partitioned_asset():
    context = build_asset_context(partition_key="2024-01-01")
    partitioned_asset(context)
    
    # Verify the file was created
    df = pd.read_csv("data_2024-01-01.csv")
    assert len(df) == 1
    assert df["date"][0] == "2024-01-01"

Testing with Definitions

Test that your definitions load correctly:
from src.quickstart_etl.definitions import defs

def test_def_can_load():
    assert defs.get_job_def("all_assets_job")
This simple test ensures your code can be imported and all definitions are valid.

Testing Asset Materialization

Use materialize to test the full execution path:
from dagster import materialize, asset
import pandas as pd

@asset
def orders():
    return pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})

@asset
def order_count(orders: pd.DataFrame) -> int:
    return len(orders)

def test_asset_materialization():
    result = materialize([orders, order_count])
    assert result.success
    
    # Access materialized values
    order_count_value = result.output_for_node("order_count")
    assert order_count_value == 2

Testing Asset Checks

Test your data quality checks independently:
import pandas as pd
import dagster as dg

@dg.asset
def orders():
    orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})
    orders_df.to_csv("orders.csv")

@dg.asset_check(asset=orders)
def orders_id_has_no_nulls():
    orders_df = pd.read_csv("orders.csv")
    num_null_order_ids = orders_df["order_id"].isna().sum()
    
    return dg.AssetCheckResult(
        passed=bool(num_null_order_ids == 0),
    )

# Test the check
def test_orders_check_passes():
    # First materialize the asset
    orders()
    
    # Then run the check
    result = orders_id_has_no_nulls()
    assert result.passed

def test_orders_check_fails_with_nulls():
    # Create data with nulls
    orders_df = pd.DataFrame({"order_id": [1, None], "item_id": [432, 878]})
    orders_df.to_csv("orders.csv")
    
    result = orders_id_has_no_nulls()
    assert not result.passed

Integration Testing

Test multiple assets together to verify integration:
from dagster import materialize, DagsterInstance
import tempfile
import os

def test_pipeline_integration():
    # Use a temporary directory for test outputs
    with tempfile.TemporaryDirectory() as tmpdir:
        os.chdir(tmpdir)
        
        # Materialize the full pipeline
        result = materialize(
            [topstory_ids, topstories, most_frequent_words],
        )
        
        assert result.success
        
        # Verify outputs exist
        assert os.path.exists("data/topstory_ids.json")
        assert os.path.exists("data/topstories.csv")
        assert os.path.exists("data/most_frequent_words.json")

Testing with Custom Resources

Provide test-specific resource configurations:
import dagster as dg
from dagster import Definitions, materialize

class DatabaseResource(dg.ConfigurableResource):
    connection_string: str

@dg.asset
def users(database: DatabaseResource):
    # Use database connection
    return database.query("SELECT * FROM users")

def test_users_asset():
    # Use a test database
    test_resources = {
        "database": DatabaseResource(
            connection_string="sqlite:///:memory:"
        )
    }
    
    result = materialize(
        [users],
        resources=test_resources,
    )
    
    assert result.success

Testing Ops

Ops can be tested similarly to assets:
from dagster import op, job, In, Out

@op(ins={"df": In()}, out=Out())
def process_payroll(df):
    return len(df)

def test_process_payroll():
    import pandas as pd
    test_df = pd.DataFrame({"employee_id": [1, 2, 3]})
    result = process_payroll(test_df)
    assert result == 3

Testing Schedules and Sensors

Test that schedules and sensors produce the expected run requests:
import dagster as dg
from dagster import build_sensor_context, validate_run_config

@dg.sensor(job=my_job, minimum_interval_seconds=5)
def new_file_sensor():
    new_files = check_for_new_files()
    if new_files:
        for filename in new_files:
            yield dg.RunRequest(run_key=filename)
    else:
        yield dg.SkipReason("No new files found")

def test_sensor_yields_run_requests():
    # Mock the file check to return files
    with mock.patch('check_for_new_files', return_value=['file1.csv']):
        context = build_sensor_context()
        run_requests = list(new_file_sensor(context))
        
        assert len(run_requests) == 1
        assert run_requests[0].run_key == 'file1.csv'

def test_sensor_skips_when_no_files():
    with mock.patch('check_for_new_files', return_value=[]):
        context = build_sensor_context()
        results = list(new_file_sensor(context))
        
        assert len(results) == 1
        assert isinstance(results[0], dg.SkipReason)
1
Write unit tests for individual assets
2
Test assets as pure functions with mocked dependencies.
3
Add integration tests for pipelines
4
Test multiple assets together to verify end-to-end behavior.
5
Mock external resources
6
Use test doubles for databases, APIs, and file systems.
7
Test edge cases
8
Verify behavior with empty data, null values, and error conditions.
9
Use pytest fixtures
10
Create reusable test fixtures for common setup and teardown.

Best Practices

Separate Test Data

Keep test data in dedicated fixtures or files:
import pytest
import pandas as pd

@pytest.fixture
def sample_orders():
    return pd.DataFrame({
        "order_id": [1, 2, 3],
        "item_id": [100, 200, 300],
        "quantity": [1, 2, 1]
    })

def test_order_processing(sample_orders):
    result = process_orders(sample_orders)
    assert result["total_quantity"] == 4

Use Temporary Directories

Avoid polluting your working directory with test outputs:
import tempfile
import os

def test_file_output():
    with tempfile.TemporaryDirectory() as tmpdir:
        os.chdir(tmpdir)
        # Run your test
        create_output_file()
        assert os.path.exists("output.csv")
    # tmpdir is automatically cleaned up

Test for Failures

Verify that your code fails appropriately:
import pytest

def test_invalid_input_raises_error():
    with pytest.raises(ValueError, match="Invalid order ID"):
        process_order(order_id=-1)

Parametrize Tests

Test multiple scenarios efficiently:
import pytest

@pytest.mark.parametrize("input_val,expected", [
    (10, 20),
    (5, 10),
    (0, 0),
])
def test_double_value(input_val, expected):
    assert double(input_val) == expected
Avoid testing implementation details. Focus on the public interface and expected behavior. If you refactor the internal implementation, your tests should still pass.

Continuous Integration

Run tests in CI to catch issues early:
# .github/workflows/test.yml
name: Test
on: [push, pull_request]

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3
      - uses: actions/setup-python@v4
        with:
          python-version: '3.10'
      - run: pip install -e ".[dev]"
      - run: pytest tests/ -v

Next Steps