Documentation Index
Fetch the complete documentation index at: https://mintlify.com/dagster-io/dagster/llms.txt
Use this file to discover all available pages before exploring further.
Data quality is critical for reliable pipelines. Dagster provides asset checks, integration with testing frameworks, and patterns for validating your data.
Asset Checks
Asset checks verify data quality when assets are materialized.
Single Asset Check
Define a check for one quality dimension:
import pandas as pd
import dagster as dg
@dg.asset
def orders():
orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})
orders_df.to_csv("orders.csv")
@dg.asset_check(asset=orders)
def orders_id_has_no_nulls():
orders_df = pd.read_csv("orders.csv")
num_null_order_ids = orders_df["order_id"].isna().sum()
return dg.AssetCheckResult(
passed=bool(num_null_order_ids == 0),
metadata={
"null_count": int(num_null_order_ids),
"total_rows": len(orders_df),
}
)
Multiple Asset Checks
Check multiple quality dimensions efficiently:
from collections.abc import Iterable
import pandas as pd
import dagster as dg
@dg.asset
def orders():
orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})
orders_df.to_csv("orders.csv")
@dg.multi_asset_check(
specs=[
dg.AssetCheckSpec(name="orders_id_has_no_nulls", asset="orders"),
dg.AssetCheckSpec(name="items_id_has_no_nulls", asset="orders"),
]
)
def orders_check() -> Iterable[dg.AssetCheckResult]:
orders_df = pd.read_csv("orders.csv")
# Check for null order_id values
num_null_order_ids = orders_df["order_id"].isna().sum()
yield dg.AssetCheckResult(
check_name="orders_id_has_no_nulls",
passed=bool(num_null_order_ids == 0),
asset_key="orders",
metadata={"null_count": int(num_null_order_ids)}
)
# Check for null item_id values
num_null_item_ids = orders_df["item_id"].isna().sum()
yield dg.AssetCheckResult(
check_name="items_id_has_no_nulls",
passed=bool(num_null_item_ids == 0),
asset_key="orders",
metadata={"null_count": int(num_null_item_ids)}
)
Use multi_asset_check when multiple checks share expensive data loading. Load the data once and perform all validations.
Common Data Quality Checks
Null Checks
@dg.asset_check(asset=my_data)
def no_nulls_in_key_columns():
df = pd.read_csv("my_data.csv")
key_columns = ["user_id", "timestamp", "event_type"]
null_counts = {col: df[col].isna().sum() for col in key_columns}
total_nulls = sum(null_counts.values())
return dg.AssetCheckResult(
passed=total_nulls == 0,
metadata={
"null_counts_by_column": null_counts,
"total_nulls": total_nulls,
}
)
Uniqueness Checks
@dg.asset_check(asset=users)
def user_ids_are_unique():
df = pd.read_csv("users.csv")
duplicates = df["user_id"].duplicated().sum()
return dg.AssetCheckResult(
passed=duplicates == 0,
metadata={
"duplicate_count": int(duplicates),
"unique_count": df["user_id"].nunique(),
"total_rows": len(df),
}
)
Range Checks
@dg.asset_check(asset=metrics)
def values_in_expected_range():
df = pd.read_csv("metrics.csv")
min_value = df["revenue"].min()
max_value = df["revenue"].max()
# Revenue should be positive and less than $1M
in_range = (min_value >= 0) and (max_value <= 1_000_000)
return dg.AssetCheckResult(
passed=in_range,
metadata={
"min_value": float(min_value),
"max_value": float(max_value),
"expected_min": 0,
"expected_max": 1_000_000,
}
)
Row Count Checks
@dg.asset_check(asset=daily_events)
def has_minimum_rows():
df = pd.read_csv("daily_events.csv")
min_expected = 1000
return dg.AssetCheckResult(
passed=len(df) >= min_expected,
metadata={
"row_count": len(df),
"minimum_expected": min_expected,
}
)
Schema Checks
@dg.asset_check(asset=customer_data)
def schema_matches_expected():
df = pd.read_csv("customer_data.csv")
expected_columns = {"customer_id", "name", "email", "created_at"}
actual_columns = set(df.columns)
missing = expected_columns - actual_columns
extra = actual_columns - expected_columns
return dg.AssetCheckResult(
passed=len(missing) == 0 and len(extra) == 0,
metadata={
"expected_columns": list(expected_columns),
"actual_columns": list(actual_columns),
"missing_columns": list(missing),
"extra_columns": list(extra),
}
)
Freshness Checks
Ensure data is updated within expected timeframes:
import datetime
from dagster import asset, build_last_update_freshness_checks
@asset
def daily_metrics():
return compute_daily_metrics()
@asset
def hourly_metrics():
return compute_hourly_metrics()
# Automatically create freshness checks
freshness_checks = build_last_update_freshness_checks(
assets=[daily_metrics, hourly_metrics],
lower_bound_delta=datetime.timedelta(days=2),
)
defs = dg.Definitions(
assets=[daily_metrics, hourly_metrics],
asset_checks=freshness_checks,
)
Custom Freshness Check
from datetime import datetime, timedelta
@dg.asset_check(asset=real_time_data)
def data_is_fresh():
df = pd.read_csv("real_time_data.csv")
df["timestamp"] = pd.to_datetime(df["timestamp"])
latest_timestamp = df["timestamp"].max()
age = datetime.now() - latest_timestamp
max_age = timedelta(hours=1)
return dg.AssetCheckResult(
passed=age <= max_age,
metadata={
"latest_timestamp": latest_timestamp.isoformat(),
"age_hours": age.total_seconds() / 3600,
"max_age_hours": max_age.total_seconds() / 3600,
}
)
Great Expectations Integration
Integrate with Great Expectations for advanced data validation:
from dagster import Config, job, op
from dagster_ge.factory import ge_validation_op_factory
from pandas import read_csv
class GEOpConfig(Config):
csv_path: str = "./data/succeed.csv"
@op
def read_in_datafile(config: GEOpConfig):
return read_csv(config.csv_path)
@op
def process_payroll(df):
return len(df)
@op
def postprocess_payroll(numrows, expectation):
if expectation["success"]:
return numrows
else:
raise ValueError("Validation failed")
# Create validation op from Great Expectations suite
payroll_expectations = ge_validation_op_factory(
name="ge_validation_op",
datasource_name="getest",
data_connector_name="my_runtime_data_connector",
data_asset_name="test_asset",
suite_name="basic.warning",
batch_identifiers={"foo": "bar"},
)
@job
def payroll_data():
output_df = read_in_datafile()
postprocess_payroll(process_payroll(output_df), payroll_expectations(output_df))
Blocking Downstream Assets
Prevent downstream assets from running when checks fail:
import dagster as dg
import pandas as pd
@dg.asset
def raw_data():
return pd.DataFrame({"value": [1, 2, 3, None]})
@dg.asset_check(
asset=raw_data,
blocking=True, # Block downstream if this fails
)
def raw_data_quality():
df = raw_data()
has_nulls = df["value"].isna().any()
return dg.AssetCheckResult(
passed=not has_nulls,
description="Raw data should not contain nulls"
)
@dg.asset(deps=[raw_data])
def processed_data():
# Only runs if raw_data_quality passes
df = pd.read_csv("raw_data.csv")
return df.fillna(0)
Blocking checks prevent downstream assets from materializing when they fail. Use this carefully for critical quality gates.
Statistical Validation
Detect anomalies using statistical methods:
import numpy as np
import pandas as pd
@dg.asset_check(asset=daily_revenue)
def revenue_within_normal_range():
df = pd.read_csv("daily_revenue.csv")
# Calculate statistics
mean = df["revenue"].mean()
std = df["revenue"].std()
# Check latest value against 3-sigma rule
latest = df["revenue"].iloc[-1]
lower_bound = mean - 3 * std
upper_bound = mean + 3 * std
in_range = lower_bound <= latest <= upper_bound
return dg.AssetCheckResult(
passed=in_range,
metadata={
"latest_revenue": float(latest),
"mean_revenue": float(mean),
"std_dev": float(std),
"lower_bound": float(lower_bound),
"upper_bound": float(upper_bound),
"z_score": float((latest - mean) / std),
}
)
Check Factories
Generate checks programmatically:
from typing import List
import dagster as dg
def create_null_checks(asset_name: str, columns: List[str]):
"""Factory to create null checks for multiple columns."""
checks = []
for column in columns:
@dg.asset_check(
asset=asset_name,
name=f"{column}_no_nulls"
)
def check_column_nulls():
df = pd.read_csv(f"{asset_name}.csv")
null_count = df[column].isna().sum()
return dg.AssetCheckResult(
passed=null_count == 0,
metadata={"null_count": int(null_count)}
)
checks.append(check_column_nulls)
return checks
# Generate checks for critical columns
orders_null_checks = create_null_checks(
"orders",
["order_id", "customer_id", "order_date", "total_amount"]
)
Data Quality Metrics
Track quality over time:
from dagster import MaterializeResult, MetadataValue
import pandas as pd
@dg.asset
def customer_data() -> MaterializeResult:
df = pd.read_csv("raw_customers.csv")
# Calculate quality metrics
total_rows = len(df)
null_rows = df.isnull().any(axis=1).sum()
duplicate_rows = df.duplicated().sum()
# Clean the data
cleaned_df = df.drop_duplicates().dropna()
cleaned_df.to_csv("customer_data.csv", index=False)
return MaterializeResult(
metadata={
"total_rows": total_rows,
"rows_with_nulls": null_rows,
"duplicate_rows": duplicate_rows,
"rows_after_cleaning": len(cleaned_df),
"data_quality_score": MetadataValue.float(
(total_rows - null_rows - duplicate_rows) / total_rows
),
}
)
Identify critical data quality dimensions
Determine what makes your data valid (nulls, ranges, schema, etc.).
Create checks for each quality dimension.
Include diagnostic information in check results.
Use blocking checks for critical quality gates.
Track check failures over time to identify trends.
Use run status sensors to alert when checks fail.
Best Practices
# Good: Separate concerns
@dg.asset
def raw_data():
return load_data()
@dg.asset_check(asset=raw_data)
def validate_raw_data():
return check_quality()
@dg.asset
def cleaned_data(raw_data):
return clean(raw_data)
Use Descriptive Check Names
# Good: Clear what is being checked
@dg.asset_check(asset=orders, name="order_total_is_positive")
def check_order_total():
...
# Bad: Unclear what is checked
@dg.asset_check(asset=orders, name="check1")
def check1():
...
@dg.asset_check(asset=my_asset)
def comprehensive_check():
df = load_data()
return dg.AssetCheckResult(
passed=is_valid(df),
metadata={
"rows_checked": len(df),
"failures": get_failure_count(df),
"sample_failures": get_sample_failures(df),
"check_timestamp": datetime.now().isoformat(),
}
)
Test Your Checks
import pytest
def test_null_check_passes():
# Create valid test data
pd.DataFrame({"id": [1, 2, 3]}).to_csv("test_data.csv")
result = check_no_nulls()
assert result.passed
def test_null_check_fails():
# Create invalid test data
pd.DataFrame({"id": [1, None, 3]}).to_csv("test_data.csv")
result = check_no_nulls()
assert not result.passed
Integration with Schedules
Run checks on a schedule:
@dg.asset
def production_data():
return load_production_data()
@dg.asset_check(asset=production_data)
def hourly_quality_check():
return validate_production_data()
# Schedule checks to run hourly
quality_check_schedule = dg.ScheduleDefinition(
name="hourly_quality_check",
cron_schedule="0 * * * *",
target=[production_data],
)
Next Steps