AWS Integration
The dagster-aws library provides integrations with Amazon Web Services, enabling you to:
- Store assets and intermediate data in S3
- Execute compute workloads on ECS and EMR
- Query data with Athena
- Manage secrets with AWS Secrets Manager
- Connect to RDS databases
- Use AWS Lambda for serverless execution
Installation
pip install dagster-aws
# Install with specific service extras
pip install dagster-aws[redshift,athena]
Core Components
S3 Resource
Connect to S3 for storage operations:
from dagster import asset, Definitions
from dagster_aws.s3 import S3Resource
@asset
def s3_file(s3: S3Resource):
# Get S3 client
s3_client = s3.get_client()
# List objects
response = s3_client.list_objects_v2(
Bucket="my-bucket",
Prefix="data/",
)
# Upload data
s3_client.put_object(
Bucket="my-bucket",
Key="output/result.json",
Body='{"result": "success"}',
)
return {"files": len(response.get("Contents", []))}
defs = Definitions(
assets=[s3_file],
resources={
"s3": S3Resource(
region_name="us-west-2",
aws_access_key_id={"env": "AWS_ACCESS_KEY_ID"},
aws_secret_access_key={"env": "AWS_SECRET_ACCESS_KEY"},
)
},
)
S3Resource configuration:
region_name: AWS region
endpoint_url: Custom S3 endpoint (for S3-compatible services)
use_unsigned_session: Use unsigned requests (for public buckets)
profile_name: AWS profile from credentials file
aws_access_key_id: AWS access key
aws_secret_access_key: AWS secret key
aws_session_token: Temporary session token
S3 IO Manager
Store asset outputs in S3:
from dagster import asset, Definitions
from dagster_aws.s3 import S3PickleIOManager, S3Resource
import pandas as pd
@asset
def extract_data() -> pd.DataFrame:
return pd.DataFrame({"id": [1, 2, 3], "value": [10, 20, 30]})
@asset
def transform_data(extract_data: pd.DataFrame) -> pd.DataFrame:
# Data automatically loaded from S3
return extract_data[extract_data["value"] > 15]
defs = Definitions(
assets=[extract_data, transform_data],
resources={
"io_manager": S3PickleIOManager(
s3_resource=S3Resource(),
s3_bucket="my-dagster-bucket",
s3_prefix="dagster/storage",
)
},
)
The S3 IO Manager automatically handles serialization, S3 uploads/downloads, and path management for asset outputs.
ECS Integration
Execute tasks on AWS ECS:
from dagster import asset, Definitions
from dagster_aws.ecs import EcsRunLauncher
# Configure in dagster.yaml
# run_launcher:
# module: dagster_aws.ecs
# class: EcsRunLauncher
# config:
# cluster: my-ecs-cluster
# subnets:
# - subnet-12345
# security_group_ids:
# - sg-12345
@asset
def ecs_computation():
# This runs on ECS when using EcsRunLauncher
return {"result": "computed on ECS"}
EMR Integration
Run Spark jobs on Amazon EMR:
from dagster import asset, Definitions, OpExecutionContext
from dagster_aws.emr import EmrJobRunner, EmrClient
@asset
def emr_spark_job(context: OpExecutionContext):
emr_client = EmrClient(
region_name="us-west-2",
)
job_runner = EmrJobRunner(
region="us-west-2",
cluster_id="j-XXXXXXXXXXXXX", # Existing cluster
)
# Submit Spark job
step_id = emr_client.run_job_flow(
cluster_id="j-XXXXXXXXXXXXX",
steps=[
{
"Name": "Spark Job",
"ActionOnFailure": "CONTINUE",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": [
"spark-submit",
"s3://my-bucket/scripts/job.py",
],
},
}
],
)
context.log.info(f"Submitted EMR step: {step_id}")
return {"step_id": step_id}
Athena Integration
Query data in S3 using Athena:
from dagster import asset, Definitions, OpExecutionContext
from dagster_aws.athena import AthenaClient
import time
@asset
def athena_query(context: OpExecutionContext):
athena = AthenaClient(
region_name="us-west-2",
database="my_database",
output_location="s3://my-bucket/athena-results/",
)
# Execute query
query = "SELECT * FROM my_table WHERE date = '2024-01-01'"
execution_id = athena.execute_query(query)
# Wait for completion
while True:
status = athena.get_query_status(execution_id)
if status in ["SUCCEEDED", "FAILED", "CANCELLED"]:
break
time.sleep(2)
if status == "SUCCEEDED":
# Get results
results = athena.get_query_results(execution_id)
context.log.info(f"Query returned {len(results)} rows")
return {"row_count": len(results)}
else:
raise Exception(f"Query failed with status: {status}")
defs = Definitions(
assets=[athena_query],
)
Secrets Manager
Retrieve secrets from AWS Secrets Manager:
from dagster import asset, Definitions, OpExecutionContext
from dagster_aws.secretsmanager import SecretsManagerResource
@asset
def secret_consumer(context: OpExecutionContext, secrets: SecretsManagerResource):
# Fetch secret
db_password = secrets.fetch_secret("prod/database/password")
# Use secret for database connection
context.log.info("Retrieved database password from Secrets Manager")
return {"status": "connected"}
defs = Definitions(
assets=[secret_consumer],
resources={
"secrets": SecretsManagerResource(
region_name="us-west-2",
)
},
)
RDS Integration
Connect to RDS databases:
from dagster import asset, Definitions
from dagster_aws.rds import RDSResource
@asset
def rds_query(rds: RDSResource):
# Get database connection
with rds.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM customers")
count = cursor.fetchone()[0]
return {"customer_count": count}
defs = Definitions(
assets=[rds_query],
resources={
"rds": RDSResource(
host="my-db.us-west-2.rds.amazonaws.com",
port=5432,
database="mydb",
user={"env": "DB_USER"},
password={"env": "DB_PASSWORD"},
)
},
)
Redshift
Connect to Amazon Redshift:
from dagster import asset, Definitions
from dagster_aws.redshift import RedshiftResource
@asset
def redshift_analysis(redshift: RedshiftResource):
query = """
SELECT product_category, SUM(revenue) as total_revenue
FROM sales
WHERE date >= CURRENT_DATE - 30
GROUP BY product_category
ORDER BY total_revenue DESC
"""
with redshift.get_connection() as conn:
results = conn.cursor().execute(query).fetchall()
return {"top_categories": results}
defs = Definitions(
assets=[redshift_analysis],
resources={
"redshift": RedshiftResource(
host="my-cluster.redshift.amazonaws.com",
port=5439,
database="analytics",
user={"env": "REDSHIFT_USER"},
password={"env": "REDSHIFT_PASSWORD"},
)
},
)
CloudWatch Integration
Send metrics and logs to CloudWatch:
from dagster import asset, OpExecutionContext
from dagster_aws.cloudwatch import CloudWatchClient
@asset
def monitored_computation(context: OpExecutionContext):
cloudwatch = CloudWatchClient(region_name="us-west-2")
# Emit custom metric
cloudwatch.put_metric_data(
namespace="Dagster/Pipeline",
metric_data=[
{
"MetricName": "RecordsProcessed",
"Value": 1000,
"Unit": "Count",
}
],
)
return {"records": 1000}
Dagster Pipes with AWS
Use Dagster Pipes for AWS compute:
from dagster import asset, Definitions
from dagster_aws.pipes import PipesECSClient, PipesLambdaClient
@asset
def ecs_pipes_asset(context, pipes_ecs: PipesECSClient):
# Run code on ECS with Pipes
return pipes_ecs.run(
context=context,
task_definition="my-task-definition",
cluster="my-cluster",
launch_type="FARGATE",
).get_materialize_result()
@asset
def lambda_pipes_asset(context, pipes_lambda: PipesLambdaClient):
# Execute Lambda function with Pipes
return pipes_lambda.run(
context=context,
function_name="my-lambda-function",
event={"key": "value"},
).get_materialize_result()
defs = Definitions(
assets=[ecs_pipes_asset, lambda_pipes_asset],
resources={
"pipes_ecs": PipesECSClient(),
"pipes_lambda": PipesLambdaClient(),
},
)
IAM Roles and Permissions
Dagster AWS integrations use boto3, which respects standard AWS credential chain:
- Environment variables (
AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)
- Shared credentials file (
~/.aws/credentials)
- IAM role for EC2 instances
- ECS task role
- IAM role for Lambda
When running in AWS (ECS, EC2, Lambda), use IAM roles instead of hardcoded credentials for better security.
Example IAM policy for S3 and Secrets Manager:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:PutObject",
"s3:ListBucket"
],
"Resource": [
"arn:aws:s3:::my-dagster-bucket/*",
"arn:aws:s3:::my-dagster-bucket"
]
},
{
"Effect": "Allow",
"Action": [
"secretsmanager:GetSecretValue"
],
"Resource": "arn:aws:secretsmanager:*:*:secret:dagster/*"
}
]
}
Multi-Region Support
Work with resources across multiple AWS regions:
from dagster import asset, Definitions
from dagster_aws.s3 import S3Resource
@asset
def cross_region_sync(us_s3: S3Resource, eu_s3: S3Resource):
# Read from US bucket
us_client = us_s3.get_client()
obj = us_client.get_object(Bucket="us-bucket", Key="data.json")
data = obj["Body"].read()
# Write to EU bucket
eu_client = eu_s3.get_client()
eu_client.put_object(Bucket="eu-bucket", Key="data.json", Body=data)
return {"synced": True}
defs = Definitions(
assets=[cross_region_sync],
resources={
"us_s3": S3Resource(region_name="us-west-2"),
"eu_s3": S3Resource(region_name="eu-west-1"),
},
)
Best Practices
- Use IAM roles: Prefer IAM roles over access keys when running in AWS
- Leverage S3 IO Manager: Use for automatic asset persistence to S3
- Regional resources: Create region-specific resources to minimize latency
- Secrets management: Store credentials in Secrets Manager, not code
- Cost optimization: Use S3 lifecycle policies for old execution data
- Monitoring: Send custom metrics to CloudWatch for observability
Troubleshooting
Credentials not found
Ensure AWS credentials are configured:
aws configure
# or set environment variables
export AWS_ACCESS_KEY_ID=your_key
export AWS_SECRET_ACCESS_KEY=your_secret
export AWS_DEFAULT_REGION=us-west-2
S3 permissions errors
Verify IAM policy includes necessary S3 actions:
s3:GetObject
s3:PutObject
s3:ListBucket
ECS task failures
Check:
- Task definition is valid
- Cluster has capacity
- Security groups allow necessary traffic
- IAM task role has required permissions
API Reference
Key resources and clients:
S3Resource: S3 client access
S3PickleIOManager: S3-backed IO manager
EcsRunLauncher: Execute runs on ECS
EmrJobRunner: Submit EMR jobs
AthenaClient: Query with Athena
SecretsManagerResource: Fetch secrets
RDSResource: RDS database connections
RedshiftResource: Redshift connections
PipesECSClient: Dagster Pipes on ECS
PipesLambdaClient: Dagster Pipes on Lambda
For complete documentation, see dagster-aws API reference.
Next Steps