Data Engineering Pipeline Scheduled Task Monitoring Tutorial
As a data engineer, you build pipelines that ingest, transform, and load data, often on a schedule. These pipelines are the lifeblood of data-driven organizations, feeding dashboards, machine learning models, and business intelligence reports. But what happens when a critical scheduled task, say, your daily ETL job, silently fails to run or simply stops completing?
The consequences can range from stale data and missed business opportunities to broken downstream systems and a frantic scramble to diagnose the problem. The challenge isn't just knowing when a job fails while running, but more fundamentally, knowing when a job doesn't run at all or doesn't complete successfully within its expected timeframe. Traditional logging and internal application monitoring often miss this crucial "absence of a signal."
This is where heartbeat monitoring comes in. Instead of waiting for an error log or an alert from a failed process, you proactively tell a monitoring service that your job did run and did complete successfully. If the monitoring service doesn't hear from your job within an expected interval, it alerts you.
Heartfly is a SaaS tool designed specifically for this. It provides unique "heartbeat URLs" for each of your scheduled tasks. Your job simply pings this URL upon successful completion. If Heartfly doesn't receive a ping within the configured timeframe, it triggers an alert via Slack, Discord, email, or webhooks.
Let's dive into how you can integrate this into your data engineering pipelines.
The Core Concept: Ping on Success
The simplest and most robust way to use heartbeat monitoring is to have your scheduled task make a network request (a "ping") to its unique Heartfly URL only after it has successfully completed all its work.
Consider a cron job that runs a Python script to fetch data. If the script finishes without error, it pings Heartfly. If the script fails halfway through, crashes, or simply doesn't start, the ping never happens, and Heartfly alerts you.
Example 1: Monitoring a Simple Cron Job
Let's say you have a Python script daily_data_fetch.py that fetches some data and stores it. This script is scheduled to run daily at 3 AM via cron.
# daily_data_fetch.py
import requests
import sys
import logging
import os
from datetime import datetime
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def fetch_and_store_data():
logging.info("Starting daily data fetch process...")
try:
# Simulate some data fetching and processing logic
# For a real script, this would involve APIs, databases, etc.
data_source_url = "https://jsonplaceholder.typicode.com/posts/1" # Example API
response = requests.get(data_source_url, timeout=10)
response.raise_for_status() # Raise an exception for bad status codes
data = response.json()
# Simulate storing data
output_dir = "/tmp/data_pipeline"
os.makedirs(output_dir, exist_ok=True)
filename = f"{output_dir}/fetched_data_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(filename, 'w') as f:
import json
json.dump(data, f, indent=2)
logging.info(f"Successfully fetched and stored data to {filename}")
return True
except requests.exceptions.RequestException as e:
logging.error(f"Network or API error during data fetch: {e}")
return False
except Exception as e:
logging.error(f"An unexpected error occurred: {e}")
return False
if __name__ == "__main__":
if not fetch_and_store_data():
logging.error("Daily data fetch failed. Exiting with error code.")
sys.exit(1) # Exit with a non-zero code to indicate failure
else:
logging.info("Daily data fetch completed successfully.")
# The heartbeat ping will be handled by the cron job wrapper
sys.exit(0) # Exit with zero for success
Now, let's configure your cron job. First, you'd create a monitor in Heartfly and get its unique heartbeat URL, which will look something like https://cron2.91-99-176-101.nip.io/api/v1/heartbeat/YOUR_UNIQUE_ID_HERE.
Your cron entry would then look like this:
# m h dom mon dow command
0 3 * * * /usr/bin/python3 /path/to/your/daily_data_fetch.py && curl -fsS --retry 3 --retry-delay 5 https://cron2.91-99-176-101.nip.io/api/v1/heartbeat/YOUR_UNIQUE_ID_HERE
Let's break down that cron line:
0 3 * * *: This schedules the job to run daily at 3:00 AM./usr/bin/python3 /path/to/your/daily_data_fetch.py: This executes your Python script.&&: This is crucial. The&&operator means that thecurlcommand will only execute if the preceding command (your Python script) exits with a zero status code (indicating success). If the Python script fails (exits with a non-zero status code), thecurlcommand is skipped.curl -fsS --retry 3 --retry-delay 5 https://cron2.91-99-176-101.nip.io/api/v1/heartbeat/YOUR_UNIQUE_ID_HERE:curl: The command-line tool for making network requests.-f: Fail silently (no output on HTTP errors).-s: Silent mode (don't show progress meter or error messages).-S: Show error messages even with-s. This is useful for debuggingcurlitself.--retry 3 --retry-delay 5: These flags make thecurlcommand more resilient. If the network is temporarily flaky,curlwill retry up to 3 times, waiting 5 seconds between each attempt. This helps prevent false alerts due to transient network issues between your server and Heartfly.
With this setup, if your Python script finishes successfully, Heartfly gets a ping. If it fails, crashes, or doesn't start, Heartfly doesn't get a ping, and you're alerted.
Integrating with Orchestration Tools
Many modern data pipelines use orchestration tools like Apache Airflow, Prefect, Dagster, or AWS Step Functions. These tools provide more sophisticated ways to manage dependencies, retries, and error handling. Integrating heartbeat monitoring here is equally important, especially for critical top-level DAGs or workflows.
Example 2: Monitoring an Airflow DAG
In Airflow, you typically want to monitor the overall success of a DAG run, not just individual tasks. You can achieve this by adding a final task that pings Heartfly, or by using Airflow's on_success_callback mechanism.
Let's consider a simple Airflow DAG:
```python from airflow import DAG from airflow.operators.bash import BashOperator from airflow.operators.python import PythonOperator from airflow.utils.dates import days_ago import pendulum import requests import logging
Get your Heartfly URL (e.g., from Airflow Variables or environment)
HEARTFLY_URL = "https://cron2.91-99-176-101.nip.io/api/v1/heartbeat/YOUR_AIRFLOW_DAG_ID_HERE"
def ping_heartfly(url): try: response = requests.get(url, timeout=5) response.raise_for_status() logging.info(f"Successfully pinged Heartfly: {url}") except requests.exceptions.RequestException as e: logging.error(f"Failed to ping Heartfly: {e}") # Depending on criticality, you might want to raise an exception here # or just log it and let the DAG succeed. For monitoring, it's usually # better to let the DAG complete if its core work is done, but log the ping failure.
with DAG( dag_id='data_processing_pipeline', start_date=pendulum.datetime(2023, 1, 1, tz="UTC"), schedule_interval='@daily', catchup=False, tags=['data_engineering', 'etl'], ) as dag: start_task = BashOperator( task_id='start_pipeline', bash_command='echo "Starting data processing..."', )
extract_data = BashOperator(
task_id='extract_data',
bash_command='echo "Extracting data..."; sleep 5; exit 0', # Simulate success
# bash_command='echo "Extracting data..."; sleep 5; exit 1', # Simulate failure
)
transform_data = BashOperator(
task_id='transform_data',
bash_command='echo "Transforming data..."; sleep 5',
)
load_data = BashOperator(
task_id='load_data',
bash_command='echo "Loading data..."; sleep 5',
)
# Option 1: Add a final task to ping Heartfly
# This task will only run if all preceding tasks