dbt Integration
dbt is one of the most popular integrations. Dagster can load your dbt project and represent each model as an asset.Basic dbt Integration
from dagster_dbt import DbtCliResource, dbt_assets
import dagster as dg
@dbt_assets(manifest="/path/to/manifest.json")
def dbt_models(context: dg.AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()
defs = dg.Definitions(
assets=[dbt_models],
resources={
"dbt": DbtCliResource(project_dir="/path/to/dbt/project")
},
)
Upstream Dependencies
Combine dbt models with Python assets:import pandas as pd
import dagster as dg
from dagster_dbt import DbtCliResource, dbt_assets
@dg.asset
def raw_customers():
"""Extract raw customer data."""
df = pd.read_csv("s3://bucket/raw_customers.csv")
df.to_csv("data/raw_customers.csv", index=False)
return df
@dbt_assets(
manifest="/path/to/manifest.json",
io_manager_key="db_io_manager"
)
def dbt_models(context: dg.AssetExecutionContext, dbt: DbtCliResource):
"""Transform data with dbt."""
yield from dbt.cli(["build"], context=context).stream()
@dg.asset(deps=[dbt_models])
def customer_analysis(context: dg.AssetExecutionContext):
"""Analyze transformed customer data."""
df = pd.read_sql("SELECT * FROM customers", con=get_connection())
return df.describe()
The
@dbt_assets decorator automatically creates an asset for each dbt model in your project, maintaining dependencies between models.Airbyte Integration
Airbyte syncs data from external sources into your warehouse. Dagster can orchestrate these syncs:from dagster_airbyte import build_airbyte_assets, AirbyteResource
import dagster as dg
# Build assets from Airbyte connection
airbyte_assets = build_airbyte_assets(
connection_id="your-connection-id",
destination_tables=["orders", "users"],
asset_key_prefix=["postgres_replica"],
)
defs = dg.Definitions(
assets=[airbyte_assets],
resources={
"airbyte": AirbyteResource(
host="localhost",
port="8000",
)
},
)
Snowflake Integration
Connect to Snowflake for reading and writing data:from dagster_snowflake import SnowflakeResource
import dagster as dg
import pandas as pd
class SnowflakeConfig(dg.ConfigurableResource):
account: str
user: str
password: str
database: str
warehouse: str
schema_: str
@dg.asset
def snowflake_table(snowflake: SnowflakeResource) -> pd.DataFrame:
with snowflake.get_connection() as conn:
df = pd.read_sql(
"SELECT * FROM my_table LIMIT 1000",
conn
)
return df
defs = dg.Definitions(
assets=[snowflake_table],
resources={
"snowflake": SnowflakeResource(
account=dg.EnvVar("SNOWFLAKE_ACCOUNT"),
user=dg.EnvVar("SNOWFLAKE_USER"),
password=dg.EnvVar("SNOWFLAKE_PASSWORD"),
database="ANALYTICS",
warehouse="COMPUTE_WH",
schema="PUBLIC",
)
},
)
Snowflake I/O Manager
Use the I/O manager to automatically store assets in Snowflake:from dagster_snowflake_pandas import SnowflakePandasIOManager
import dagster as dg
import pandas as pd
@dg.asset(io_manager_key="snowflake_io")
def customers() -> pd.DataFrame:
return pd.DataFrame({
"customer_id": [1, 2, 3],
"name": ["Alice", "Bob", "Charlie"]
})
@dg.asset(io_manager_key="snowflake_io")
def orders(customers: pd.DataFrame) -> pd.DataFrame:
# Data is automatically loaded from Snowflake
return pd.DataFrame({
"order_id": [101, 102],
"customer_id": [1, 2]
})
defs = dg.Definitions(
assets=[customers, orders],
resources={
"snowflake_io": SnowflakePandasIOManager(
account=dg.EnvVar("SNOWFLAKE_ACCOUNT"),
user=dg.EnvVar("SNOWFLAKE_USER"),
password=dg.EnvVar("SNOWFLAKE_PASSWORD"),
database="ANALYTICS",
warehouse="COMPUTE_WH",
schema="PUBLIC",
)
},
)
OpenAI Integration
Build AI-powered pipelines with the OpenAI integration:from dagster_openai import OpenAIResource
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import FAISS
import dagster as dg
@dg.asset(compute_kind="OpenAI")
def search_index(
context: dg.AssetExecutionContext,
openai: OpenAIResource,
source_docs: list
) -> bytes:
"""Create a search index from documents."""
with openai.get_client(context) as client:
search_index = FAISS.from_documents(
source_docs,
OpenAIEmbeddings(client=client.embeddings)
)
return search_index.serialize_to_bytes()
class OpenAIConfig(dg.Config):
model: str
question: str
@dg.asset(compute_kind="OpenAI")
def completion(
context: dg.AssetExecutionContext,
openai: OpenAIResource,
config: OpenAIConfig,
search_index: bytes
):
"""Generate completion using the search index."""
index = FAISS.deserialize_from_bytes(
search_index,
OpenAIEmbeddings(),
allow_dangerous_deserialization=True
)
with openai.get_client(context) as client:
model = ChatOpenAI(
client=client.chat.completions,
model=config.model,
temperature=0
)
docs = index.similarity_search(config.question, k=4)
# Generate completion with retrieved docs
response = model.invoke(format_prompt(docs, config.question))
context.log.info(response)
Databricks Integration
Execute Spark jobs on Databricks:from dagster_databricks import DatabricksClientResource
import dagster as dg
@dg.asset
def databricks_job(
databricks: DatabricksClientResource,
context: dg.AssetExecutionContext
):
"""Run a Databricks job."""
run_id = databricks.run_now(
job_id="12345",
notebook_params={"date": context.partition_key}
)
context.log.info(f"Started Databricks run {run_id}")
databricks.wait_for_run_completion(run_id)
return run_id
defs = dg.Definitions(
assets=[databricks_job],
resources={
"databricks": DatabricksClientResource(
host=dg.EnvVar("DATABRICKS_HOST"),
token=dg.EnvVar("DATABRICKS_TOKEN"),
)
},
)
BigQuery Integration
Work with Google BigQuery:from dagster_gcp import BigQueryResource
import dagster as dg
import pandas as pd
@dg.asset
def bigquery_data(bigquery: BigQueryResource) -> pd.DataFrame:
query = """
SELECT
date,
SUM(revenue) as total_revenue
FROM `project.dataset.sales`
WHERE date >= DATE_SUB(CURRENT_DATE(), INTERVAL 30 DAY)
GROUP BY date
ORDER BY date
"""
with bigquery.get_client() as client:
df = client.query(query).to_dataframe()
return df
defs = dg.Definitions(
assets=[bigquery_data],
resources={
"bigquery": BigQueryResource(
project=dg.EnvVar("GCP_PROJECT"),
)
},
)
Fivetran Integration
Orchestrate Fivetran syncs:from dagster_fivetran import FivetranResource, build_fivetran_assets
import dagster as dg
fivetran_assets = build_fivetran_assets(
connector_id="your-connector-id",
destination_tables=["schema.table1", "schema.table2"],
)
defs = dg.Definitions(
assets=[fivetran_assets],
resources={
"fivetran": FivetranResource(
api_key=dg.EnvVar("FIVETRAN_API_KEY"),
api_secret=dg.EnvVar("FIVETRAN_API_SECRET"),
)
},
)
DuckDB Integration
Use DuckDB for fast analytics:from dagster_duckdb import DuckDBResource
import dagster as dg
import pandas as pd
@dg.asset
def duckdb_analysis(duckdb: DuckDBResource) -> pd.DataFrame:
query = """
SELECT
category,
COUNT(*) as count,
AVG(price) as avg_price
FROM products
GROUP BY category
"""
with duckdb.get_connection() as conn:
df = conn.execute(query).df()
return df
defs = dg.Definitions(
assets=[duckdb_analysis],
resources={
"duckdb": DuckDBResource(
database="analytics.duckdb"
)
},
)
Slack Integration
Send notifications to Slack:from dagster_slack import SlackResource
import dagster as dg
@dg.asset
def important_metric() -> float:
return calculate_kpi()
@dg.sensor(job=my_job)
def metric_alert_sensor(
context: dg.SensorEvaluationContext,
slack: SlackResource
):
metric_value = fetch_latest_metric()
if metric_value < THRESHOLD:
slack.get_client().chat_postMessage(
channel="#alerts",
text=f"⚠️ Metric dropped to {metric_value}"
)
yield dg.RunRequest(run_key=str(context.cursor))
else:
yield dg.SkipReason(f"Metric is healthy: {metric_value}")
defs = dg.Definitions(
assets=[important_metric],
sensors=[metric_alert_sensor],
resources={
"slack": SlackResource(
token=dg.EnvVar("SLACK_TOKEN")
)
},
)
PySpark Integration
Run Spark transformations:from dagster_pyspark import PySparkResource
import dagster as dg
from pyspark.sql import DataFrame
@dg.asset
def spark_transform(pyspark: PySparkResource) -> DataFrame:
spark = pyspark.spark_session
df = spark.read.parquet("s3://bucket/input/")
transformed = df.filter(df.value > 100) \
.groupBy("category") \
.agg({"value": "sum"})
transformed.write.parquet("s3://bucket/output/")
return transformed
defs = dg.Definitions(
assets=[spark_transform],
resources={
"pyspark": PySparkResource(
spark_conf={
"spark.executor.memory": "4g",
"spark.executor.cores": "2",
}
)
},
)
Sling Integration
Use Sling for data replication:from dagster_sling import SlingResource, sling_assets
import dagster as dg
replication_config = """
source: MY_POSTGRES
target: MY_SNOWFLAKE
streams:
public.users:
sql: SELECT * FROM public.users WHERE updated_at > '{last_value}'
primary_key: [id]
update_key: updated_at
public.orders:
sql: SELECT * FROM public.orders
primary_key: [order_id]
"""
@sling_assets(replication_config=replication_config)
def my_sling_assets(context: dg.AssetExecutionContext, sling: SlingResource):
yield from sling.replicate(context=context).stream()
defs = dg.Definitions(
assets=[my_sling_assets],
resources={
"sling": SlingResource(
connections=[
{"name": "MY_POSTGRES", "type": "postgres", ...},
{"name": "MY_SNOWFLAKE", "type": "snowflake", ...},
]
)
},
)
Modern Data Stack Example
Combine multiple integrations into a complete pipeline:from dagster_airbyte import build_airbyte_assets
from dagster_dbt import DbtCliResource, dbt_assets
from dagster import AssetExecutionContext, asset
import numpy as np
import pandas as pd
from scipy import optimize
# Extract: Airbyte syncs data from source to warehouse
airbyte_assets = build_airbyte_assets(
connection_id="abc123",
destination_tables=["orders", "users"],
asset_key_prefix=["postgres_replica"],
)
# Transform: dbt models transform raw data
@dbt_assets(
manifest="/path/to/manifest.json",
io_manager_key="db_io_manager",
)
def dbt_project_assets(context: AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()
# Model: Python asset for ML forecasting
def model_func(x, a, b):
return a * np.exp(b * (x / 10**18 - 1.6095))
@asset(compute_kind="python")
def order_forecast_model(daily_order_summary: pd.DataFrame) -> np.ndarray:
"""Model parameters that best fit the observed data."""
train_set = daily_order_summary.to_numpy()
return optimize.curve_fit(
f=model_func,
xdata=train_set[:, 0],
ydata=train_set[:, 2],
p0=[10, 100]
)[0]
@asset(compute_kind="python", io_manager_key="db_io_manager")
def predicted_orders(
daily_order_summary: pd.DataFrame,
order_forecast_model: np.ndarray
) -> pd.DataFrame:
"""Predicted orders for the next 30 days."""
a, b = tuple(order_forecast_model)
start_date = daily_order_summary.order_date.max()
future_dates = pd.date_range(
start=start_date,
end=start_date + pd.DateOffset(days=30)
)
predicted_data = model_func(x=future_dates.astype(np.int64), a=a, b=b)
return pd.DataFrame({
"order_date": future_dates,
"num_orders": predicted_data
})
Ensure you have network connectivity and proper credentials when using external integrations. Use environment variables for sensitive configuration.
Best Practices
Use I/O Managers
Let I/O managers handle data storage and retrieval:@asset(io_manager_key="snowflake_io")
def my_table() -> pd.DataFrame:
# Automatically stored in Snowflake
return pd.DataFrame({...})
Partition for Performance
Partition large datasets for incremental processing:@dbt_assets(
manifest="/path/to/manifest.json",
partitions_def=daily_partitions
)
def dbt_models(context, dbt):
dbt_vars = {"partition_date": context.partition_key}
yield from dbt.cli(["build", "--vars", json.dumps(dbt_vars)]).stream()
Handle Credentials Securely
Use environment variables:resources={
"snowflake": SnowflakeResource(
account=dg.EnvVar("SNOWFLAKE_ACCOUNT"),
password=dg.EnvVar("SNOWFLAKE_PASSWORD"),
)
}
Next Steps
- Set up Observability for your integrations
- Add Data Quality checks
- Configure Deployment for production
- Write Tests for your pipelines
