Dagster Sensor Failure Alerting Setup
Dagster sensors are the eyes and ears of your data platform. They continuously monitor external systems or internal states, triggering jobs when conditions are met. Whether it's watching for new files in S3, polling an external API, or checking database changes, sensors are critical for keeping your data pipelines reactive and up-to-date.
But what happens when a sensor fails silently? A sensor could crash due to an unhandled exception, get stuck in an infinite loop, or simply stop being scheduled by the Dagster daemon. If a sensor stops running, your downstream jobs won't trigger, new data won't be processed, and your data assets will become stale. This is a silent killer, often going unnoticed until a stakeholder asks why their dashboard hasn't updated.
As engineers, we know that if something isn't monitored, it's effectively broken. This article will guide you through setting up robust failure alerting for your Dagster sensors using heartbeat monitoring with Heartfly.
Understanding Dagster Sensors and Their Failure Modes
At its core, a Dagster sensor is a function that Dagster executes periodically. This function inspects some external state and, if conditions are met, yields RunRequest objects to trigger jobs. Sensors are defined with an interval (e.g., minute_interval_sensor, daily_sensor) which tells the Dagster daemon how often to execute their logic.
Common failure modes for sensors include:
- Process Crash: The Python process running the Dagster daemon (which executes sensors) dies or restarts unexpectedly.
- Unhandled Exceptions: An error within the sensor's logic (e.g., API call failure, file system error, database connection issue) isn't caught, causing the sensor execution to fail and potentially stop future executions.
- Resource Exhaustion: The sensor consumes too much memory or CPU, leading to its termination by the underlying container orchestrator (Kubernetes, ECS) or operating system.
- Scheduling Issues: The Dagster daemon itself encounters problems and stops scheduling sensors, even if the daemon process is technically "alive."
- Silent Logic Failure: The sensor runs, but due to a subtle bug, it never finds new data or yields run requests, giving a false sense of security.
While Dagster provides some built-in event log monitoring, it often focuses on job run failures, not the liveness of the sensors themselves. We need a way to know if a sensor stops performing its regular check.
Traditional Alerting Challenges in Dagster
Dagster provides a robust event log, and you can integrate with tools like Datadog, Splunk, or custom Slack bots to get alerts on specific events, such as a job run failing. However, monitoring sensor liveness presents a unique challenge:
- "No News Is Bad News": A sensor that stops running generates no events. If your monitoring only reacts to events (like a sensor yielding a
RunRequestor an error log), you won't know it's dead until it's too late. - Granularity: You might get alerts if the entire Dagster daemon process goes down, but what if just one specific sensor within the daemon encounters an unhandled exception and stops being scheduled, while others continue?
- Complexity: Setting up custom liveness checks for each sensor within your observability stack can become cumbersome, requiring custom metrics, dashboards, and alert rules for every single sensor.
This is where external heartbeat monitoring becomes invaluable.
Introducing Heartbeat Monitoring for Sensors
Heartbeat monitoring is a simple yet powerful concept: a scheduled task (in our case, a Dagster sensor) periodically "pings" an external service. If the external service doesn't receive a ping within an expected timeframe, it assumes the task has failed and triggers an alert.
For Dagster sensors, this means:
- Every time your sensor successfully completes its execution logic (regardless of whether it yields a run request), it makes a simple HTTP GET request to a unique URL.
- Heartfly, our monitoring service, registers this "heartbeat."
- If Heartfly doesn't receive a heartbeat from that specific URL within a configured grace period, it sends an alert via Slack, Discord, email, or other integrations.
This approach directly addresses the "no news is bad news" problem. You'll be alerted within minutes if a critical sensor stops running for any reason.
Setting Up Heartfly for a Dagster Sensor
Integrating Heartfly into your Dagster sensors is straightforward. Here's the general workflow:
-
Create a Monitor in Heartfly:
- Log into your Heartfly account.
- Click "Add Monitor" or similar.
- Give your monitor a descriptive name (e.g., "Dagster - S3 New File Sensor").
- Set the "Expected Interval." This should be slightly longer than your sensor's
intervalin Dagster, plus a small buffer for network latency or execution variability. For example, if your sensor runs every minute, set the Heartfly interval to 2 minutes. - Heartfly will provide you with a unique "Heartbeat URL" for this monitor. It will look something like
https://cron2.91-99-176-101.nip.io/api/v1/heartbeat/your-unique-id. - Configure your desired alert channels (Slack, email, etc.).
-
Integrate the Heartbeat URL into Your Sensor Code:
- Store the Heartbeat URL securely, ideally as an environment variable or in Dagster's
dagster.yamlconfig. Never hardcode sensitive URLs directly in your code. - Use a simple HTTP client (like
requests) to make a GET request to this URL at the end of your sensor's logic.
- Store the Heartbeat URL securely, ideally as an environment variable or in Dagster's
Let's look at some concrete examples.
Example 1: A Simple File Watcher Sensor
Imagine you have a sensor that watches an S3 bucket for new data.csv files.
```python import os import requests from dagster import sensor, RunRequest, SensorEvaluationContext from dagster_aws.s3 import s3_resource
Get Heartfly URL from environment variable
HEARTFLY_S3_SENSOR_URL = os.getenv("HEARTFLY_S3_SENSOR_URL")
@sensor(job_name="process_s3_data_job", minimum_interval_seconds=60) def s3_new_file_sensor(context: SensorEvaluationContext, s3: s3_resource): """ Monitors an S3 bucket for new 'data.csv' files and triggers a job. Sends a heartbeat to Heartfly after each check. """ bucket_name = "my-data-lake-bucket" prefix = "landing_zone/"
# Placeholder for actual S3 check logic
# In a real scenario, you'd store state (e.g., last processed file key)
# and compare against current bucket contents.
new_files_found = False
try:
s3_client = s3.get_client()
response = s3_client.list_objects_v2(Bucket=bucket_name, Prefix=prefix)
# Simplified logic: Assume we find a "new_data.csv" file
# In reality, you'd compare against stored cursor/state
for obj in response.get("Contents", []):
if "new_data.csv" in obj["Key"] and obj["Key"] not in context.cursor:
context.update_cursor(obj["Key"]) # Store for next run