Data Engineering Pipeline Scheduled Task Monitoring Tutorial

As a data engineer, you build pipelines that ingest, transform, and load data, often on a schedule. These pipelines are the lifeblood of data-driven organizations, feeding dashboards, machine learning models, and business intelligence reports. But what happens when a critical scheduled task, say, your daily ETL job, silently fails to run or simply stops completing?

The consequences can range from stale data and missed business opportunities to broken downstream systems and a frantic scramble to diagnose the problem. The challenge isn't just knowing when a job fails while running, but more fundamentally, knowing when a job doesn't run at all or doesn't complete successfully within its expected timeframe. Traditional logging and internal application monitoring often miss this crucial "absence of a signal."

This is where heartbeat monitoring comes in. Instead of waiting for an error log or an alert from a failed process, you proactively tell a monitoring service that your job did run and did complete successfully. If the monitoring service doesn't hear from your job within an expected interval, it alerts you.

Heartfly is a SaaS tool designed specifically for this. It provides unique "heartbeat URLs" for each of your scheduled tasks. Your job simply pings this URL upon successful completion. If Heartfly doesn't receive a ping within the configured timeframe, it triggers an alert via Slack, Discord, email, or webhooks.

Let's dive into how you can integrate this into your data engineering pipelines.

The Core Concept: Ping on Success

The simplest and most robust way to use heartbeat monitoring is to have your scheduled task make a network request (a "ping") to its unique Heartfly URL only after it has successfully completed all its work.

Consider a cron job that runs a Python script to fetch data. If the script finishes without error, it pings Heartfly. If the script fails halfway through, crashes, or simply doesn't start, the ping never happens, and Heartfly alerts you.

Example 1: Monitoring a Simple Cron Job

Let's say you have a Python script daily_data_fetch.py that fetches some data and stores it. This script is scheduled to run daily at 3 AM via cron.

# daily_data_fetch.py
import requests
import sys
import logging
import os
from datetime import datetime

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

def fetch_and_store_data():
    logging.info("Starting daily data fetch process...")
    try:
        # Simulate some data fetching and processing logic
        # For a real script, this would involve APIs, databases, etc.
        data_source_url = "https://jsonplaceholder.typicode.com/posts/1" # Example API
        response = requests.get(data_source_url, timeout=10)
        response.raise_for_status() # Raise an exception for bad status codes
        data = response.json()

        # Simulate storing data
        output_dir = "/tmp/data_pipeline"
        os.makedirs(output_dir, exist_ok=True)
        filename = f"{output_dir}/fetched_data_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
        with open(filename, 'w') as f:
            import json
            json.dump(data, f, indent=2)

        logging.info(f"Successfully fetched and stored data to {filename}")
        return True
    except requests.exceptions.RequestException as e:
        logging.error(f"Network or API error during data fetch: {e}")
        return False
    except Exception as e:
        logging.error(f"An unexpected error occurred: {e}")
        return False

if __name__ == "__main__":
    if not fetch_and_store_data():
        logging.error("Daily data fetch failed. Exiting with error code.")
        sys.exit(1) # Exit with a non-zero code to indicate failure
    else:
        logging.info("Daily data fetch completed successfully.")
        # The heartbeat ping will be handled by the cron job wrapper
        sys.exit(0) # Exit with zero for success

Now, let's configure your cron job. First, you'd create a monitor in Heartfly and get its unique heartbeat URL, which will look something like https://cron2.91-99-176-101.nip.io/api/v1/heartbeat/YOUR_UNIQUE_ID_HERE.

Your cron entry would then look like this:

# m h dom mon dow command
0 3 * * * /usr/bin/python3 /path/to/your/daily_data_fetch.py && curl -fsS --retry 3 --retry-delay 5 https://cron2.91-99-176-101.nip.io/api/v1/heartbeat/YOUR_UNIQUE_ID_HERE

Let's break down that cron line:

  • 0 3 * * *: This schedules the job to run daily at 3:00 AM.
  • /usr/bin/python3 /path/to/your/daily_data_fetch.py: This executes your Python script.
  • &&: This is crucial. The && operator means that the curl command will only execute if the preceding command (your Python script) exits with a zero status code (indicating success). If the Python script fails (exits with a non-zero status code), the curl command is skipped.
  • curl -fsS --retry 3 --retry-delay 5 https://cron2.91-99-176-101.nip.io/api/v1/heartbeat/YOUR_UNIQUE_ID_HERE:
    • curl: The command-line tool for making network requests.
    • -f: Fail silently (no output on HTTP errors).
    • -s: Silent mode (don't show progress meter or error messages).
    • -S: Show error messages even with -s. This is useful for debugging curl itself.
    • --retry 3 --retry-delay 5: These flags make the curl command more resilient. If the network is temporarily flaky, curl will retry up to 3 times, waiting 5 seconds between each attempt. This helps prevent false alerts due to transient network issues between your server and Heartfly.

With this setup, if your Python script finishes successfully, Heartfly gets a ping. If it fails, crashes, or doesn't start, Heartfly doesn't get a ping, and you're alerted.

Integrating with Orchestration Tools

Many modern data pipelines use orchestration tools like Apache Airflow, Prefect, Dagster, or AWS Step Functions. These tools provide more sophisticated ways to manage dependencies, retries, and error handling. Integrating heartbeat monitoring here is equally important, especially for critical top-level DAGs or workflows.

Example 2: Monitoring an Airflow DAG

In Airflow, you typically want to monitor the overall success of a DAG run, not just individual tasks. You can achieve this by adding a final task that pings Heartfly, or by using Airflow's on_success_callback mechanism.

Let's consider a simple Airflow DAG:

```python from airflow import DAG from airflow.operators.bash import BashOperator from airflow.operators.python import PythonOperator from airflow.utils.dates import days_ago import pendulum import requests import logging

Get your Heartfly URL (e.g., from Airflow Variables or environment)

HEARTFLY_URL = "https://cron2.91-99-176-101.nip.io/api/v1/heartbeat/YOUR_AIRFLOW_DAG_ID_HERE"

def ping_heartfly(url): try: response = requests.get(url, timeout=5) response.raise_for_status() logging.info(f"Successfully pinged Heartfly: {url}") except requests.exceptions.RequestException as e: logging.error(f"Failed to ping Heartfly: {e}") # Depending on criticality, you might want to raise an exception here # or just log it and let the DAG succeed. For monitoring, it's usually # better to let the DAG complete if its core work is done, but log the ping failure.

with DAG( dag_id='data_processing_pipeline', start_date=pendulum.datetime(2023, 1, 1, tz="UTC"), schedule_interval='@daily', catchup=False, tags=['data_engineering', 'etl'], ) as dag: start_task = BashOperator( task_id='start_pipeline', bash_command='echo "Starting data processing..."', )

extract_data = BashOperator(
    task_id='extract_data',
    bash_command='echo "Extracting data..."; sleep 5; exit 0', # Simulate success
    # bash_command='echo "Extracting data..."; sleep 5; exit 1', # Simulate failure
)

transform_data = BashOperator(
    task_id='transform_data',
    bash_command='echo "Transforming data..."; sleep 5',
)

load_data = BashOperator(
    task_id='load_data',
    bash_command='echo "Loading data..."; sleep 5',
)

# Option 1: Add a final task to ping Heartfly
# This task will only run if all preceding tasks