March 29, 2026 · 14 min read

Monitor Celery Tasks and Python Scheduled Jobs with CronPeek

Your Celery Beat scheduler is running. The workers are running. But are your periodic tasks actually executing and succeeding? A worker process can be alive and dequeuing tasks while your beat schedule silently stops triggering, your Redis broker quietly drops messages, or your task logic raises an exception that Celery swallows and retries into oblivion. This guide shows you how to add dead man's switch monitoring to every Python scheduling pattern—Celery tasks, Celery Beat, APScheduler, and Django management commands—so you find out immediately when something breaks.

Why Celery Monitoring Is Harder Than It Looks

Celery is powerful and it is also a distributed system with a lot of moving parts. You have a beat scheduler producing tasks, a broker (usually Redis or RabbitMQ) queuing them, and one or more workers consuming them. Any of these layers can fail independently, and none of them will tell you in a way that surfaces as an obvious error.

Here are the failure modes that catch Python developers off guard:

Celery Flower shows you task history and worker status, but it does not alert you when a periodic task stops completing. Traditional uptime monitoring does not catch any of this either—your HTTP endpoints return 200 while your data pipelines are dead.

The core problem: Celery tasks fail by not completing successfully. The beat process looks healthy. The workers respond to pings. But the actual job has not run in 48 hours. A dead man's switch is the only pattern that detects the absence of a successful execution.

How Dead Man's Switch Monitoring Works with CronPeek

The pattern is straightforward. At the end of your task—after all the real work is done—you send an HTTP ping to CronPeek. CronPeek expects that ping at the interval you configure. If it stops arriving, CronPeek sends you an alert.

The API base URL is https://us-central1-todd-agent-prod.cloudfunctions.net/cronpeekApi. You create monitors with POST /monitors and send heartbeats with a GET, POST, or HEAD request to /ping/:token. Any HTTP method works, which keeps the integration simple from Python.

First, create a monitor:

Create a monitor via curl
curl -X POST https://us-central1-todd-agent-prod.cloudfunctions.net/cronpeekApi/monitors \
  -H "Authorization: Bearer YOUR_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{
    "name": "celery-nightly-sync",
    "interval": 86400,
    "grace_period": 900
  }'

The response gives you a ping_url:

{
  "id": "mon_k4p8q2r7n1",
  "name": "celery-nightly-sync",
  "ping_url": "https://us-central1-todd-agent-prod.cloudfunctions.net/cronpeekApi/ping/mon_k4p8q2r7n1",
  "status": "waiting",
  "interval": 86400,
  "grace_period": 900
}

Set interval to your expected run frequency in seconds. Set grace_period to how long after the deadline CronPeek should wait before alerting—useful for tasks that occasionally run a few minutes late.

Monitoring a Basic Celery Task

The simplest integration: add a requests.post() call at the end of your task function, inside the happy path. If the task raises an exception before reaching the ping, no heartbeat is sent and CronPeek alerts on the missed run.

tasks.py — basic Celery task with CronPeek ping
import requests
from celery import shared_task

CRONPEEK_PING = "https://us-central1-todd-agent-prod.cloudfunctions.net/cronpeekApi/ping/mon_k4p8q2r7n1"

@shared_task(name="sync_user_data")
def sync_user_data():
    """Sync user records from the external CRM. Runs nightly at 2 AM."""
    users = fetch_users_from_crm()
    update_local_records(users)
    send_sync_summary_email(len(users))

    # All steps succeeded — send heartbeat to CronPeek
    try:
        requests.post(CRONPEEK_PING, timeout=10)
    except requests.RequestException:
        pass  # Never let monitoring failures break the task itself

Two important details here. First, the ping is the last line of the success path. If fetch_users_from_crm() raises, the ping never fires. Second, the ping call is wrapped in its own try/except so that a CronPeek outage never causes your task to fail.

Celery Beat Monitoring—The Full Pattern

Celery Beat is a periodic task scheduler. It reads your CELERYBEAT_SCHEDULE (or database-backed schedule) and publishes tasks to the broker at the configured intervals. Beat itself is a single process, and it does not verify that tasks execute or succeed. Monitoring Beat means monitoring the tasks it triggers.

Configuring Celery Beat with a monitored task

celery.py — Beat schedule with CronPeek integration
from celery import Celery
from celery.schedules import crontab

app = Celery("myapp")

app.config_from_object("django.conf:settings", namespace="CELERY")

app.conf.beat_schedule = {
    "nightly-data-sync": {
        "task": "myapp.tasks.sync_user_data",
        "schedule": crontab(hour=2, minute=0),  # Every day at 2 AM UTC
    },
    "hourly-cache-warmer": {
        "task": "myapp.tasks.warm_cache",
        "schedule": crontab(minute=0),  # Every hour at :00
    },
    "weekly-report": {
        "task": "myapp.tasks.generate_weekly_report",
        "schedule": crontab(hour=8, minute=0, day_of_week="monday"),
    },
}
tasks.py — monitored Beat tasks
import requests
from celery import shared_task

CRONPEEK_BASE = "https://us-central1-todd-agent-prod.cloudfunctions.net/cronpeekApi/ping"

# Each task has its own monitor token
MONITORS = {
    "nightly-data-sync": "mon_k4p8q2r7n1",
    "hourly-cache-warmer": "mon_j3m7h9x2k5",
    "weekly-report":       "mon_r6w1b4v8n0",
}

def ping(token, status="success", msg=""):
    """Send a heartbeat to CronPeek. Never raises."""
    try:
        params = {"status": status}
        if msg:
            params["msg"] = msg[:256]
        requests.post(f"{CRONPEEK_BASE}/{token}", params=params, timeout=10)
    except requests.RequestException:
        pass


@shared_task(name="myapp.tasks.sync_user_data")
def sync_user_data():
    token = MONITORS["nightly-data-sync"]
    try:
        users = fetch_users_from_crm()
        update_local_records(users)
        send_sync_summary_email(len(users))
        ping(token, "success", f"synced {len(users)} users")
    except Exception as exc:
        ping(token, "failure", str(exc))
        raise  # Re-raise so Celery records the failure and retries if configured


@shared_task(name="myapp.tasks.warm_cache")
def warm_cache():
    token = MONITORS["hourly-cache-warmer"]
    try:
        count = rebuild_query_cache()
        ping(token, "success", f"{count} cache keys rebuilt")
    except Exception as exc:
        ping(token, "failure", str(exc))
        raise


@shared_task(name="myapp.tasks.generate_weekly_report")
def generate_weekly_report():
    token = MONITORS["weekly-report"]
    try:
        report_url = build_and_upload_report()
        notify_subscribers(report_url)
        ping(token, "success", f"report: {report_url}")
    except Exception as exc:
        ping(token, "failure", str(exc))
        raise

The raise after ping(token, "failure", ...) is intentional. You want Celery to see the failure, trigger its retry logic, and record the exception in your task result backend. CronPeek gives you an immediate alert; Celery still manages the retry lifecycle.

Two failure modes, two alerts: If the task runs and fails, you get an immediate CronPeek failure alert. If Beat stops scheduling entirely (beat process down, broker unreachable), the heartbeat never arrives and CronPeek alerts after your grace period. Both failure modes are covered.

Using a Decorator for Clean Celery Monitoring

If you have many Celery tasks to monitor, a decorator keeps the signal-to-noise ratio high. The task logic stays focused; the monitoring is a single annotation.

utils/cronpeek.py
import functools
import time
import requests

CRONPEEK_BASE = "https://us-central1-todd-agent-prod.cloudfunctions.net/cronpeekApi/ping"


def monitored(token):
    """
    Decorator that wraps a Celery task with CronPeek dead man's switch monitoring.

    Usage:
        @shared_task
        @monitored("mon_k4p8q2r7n1")
        def my_task():
            do_work()
    """
    def decorator(fn):
        @functools.wraps(fn)
        def wrapper(*args, **kwargs):
            start = time.monotonic()
            try:
                result = fn(*args, **kwargs)
                elapsed = int((time.monotonic() - start) * 1000)
                _ping(token, "success", f"completed in {elapsed}ms")
                return result
            except Exception as exc:
                _ping(token, "failure", str(exc))
                raise
        return wrapper
    return decorator


def _ping(token, status="success", msg=""):
    try:
        params = {"status": status}
        if msg:
            params["msg"] = msg[:256]
        requests.post(f"{CRONPEEK_BASE}/{token}", params=params, timeout=10)
    except requests.RequestException:
        pass
tasks.py — clean with decorator
from celery import shared_task
from myapp.utils.cronpeek import monitored

@shared_task(name="myapp.tasks.sync_user_data")
@monitored("mon_k4p8q2r7n1")
def sync_user_data():
    users = fetch_users_from_crm()
    update_local_records(users)
    send_sync_summary_email(len(users))

@shared_task(name="myapp.tasks.warm_cache")
@monitored("mon_j3m7h9x2k5")
def warm_cache():
    rebuild_query_cache()

@shared_task(name="myapp.tasks.generate_weekly_report")
@monitored("mon_r6w1b4v8n0")
def generate_weekly_report():
    report_url = build_and_upload_report()
    notify_subscribers(report_url)

Clean. Each task gets monitoring in one line and the task body stays focused on business logic. The decorator re-raises exceptions so Celery's retry and failure recording are unaffected.

APScheduler Integration

APScheduler is a popular in-process Python scheduler that does not require a separate broker. It runs inside your application and supports cron, interval, and date-based triggers. The same dead man's switch pattern applies.

scheduler.py — APScheduler with CronPeek monitoring
import requests
import logging
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger

logger = logging.getLogger(__name__)

CRONPEEK_BASE = "https://us-central1-todd-agent-prod.cloudfunctions.net/cronpeekApi/ping"

scheduler = BackgroundScheduler(timezone="UTC")


def ping(token, status="success", msg=""):
    try:
        params = {"status": status}
        if msg:
            params["msg"] = msg[:256]
        requests.post(f"{CRONPEEK_BASE}/{token}", params=params, timeout=10)
    except requests.RequestException as e:
        logger.warning("CronPeek ping failed: %s", e)


def nightly_invoice_job():
    """Generates and emails monthly invoices. Runs at 3 AM every day."""
    token = "mon_a2s5d8f1g4"
    try:
        count = generate_pending_invoices()
        send_invoice_emails(count)
        logger.info("Invoice job completed: %d invoices", count)
        ping(token, "success", f"{count} invoices generated")
    except Exception as exc:
        logger.exception("Invoice job failed")
        ping(token, "failure", str(exc))


def hourly_metrics_job():
    """Aggregates usage metrics into the reporting database."""
    token = "mon_h7j2k5l8m1"
    try:
        rows = aggregate_hourly_metrics()
        ping(token, "success", f"{rows} rows written")
    except Exception as exc:
        ping(token, "failure", str(exc))
        raise


# Register jobs
scheduler.add_job(
    nightly_invoice_job,
    CronTrigger(hour=3, minute=0, timezone="UTC"),
    id="nightly-invoices",
    max_instances=1,          # Prevent overlapping runs
    misfire_grace_time=300,   # Tolerate up to 5 minutes of delay
)

scheduler.add_job(
    hourly_metrics_job,
    CronTrigger(minute=0, timezone="UTC"),
    id="hourly-metrics",
    max_instances=1,
    misfire_grace_time=120,
)

scheduler.start()

Note max_instances=1 on each job. APScheduler will skip a run if the previous one is still executing, which can cause missed heartbeats that look like task failures. Set your CronPeek grace_period generously for jobs that occasionally run long, or use APScheduler's misfire_grace_time to control overlap behavior.

Django Management Command with Dead Man's Switch

Django management commands invoked via cron (system cron, Kubernetes CronJob, or Heroku Scheduler) are a common pattern for periodic Python tasks. They are simple, testable, and deploy cleanly. The failure mode: if the cron entry breaks or the Django process exits non-zero, the command silently stops running and nobody notices for days.

management/commands/sync_inventory.py
import requests
from django.core.management.base import BaseCommand, CommandError
from myapp.services import sync_inventory_from_warehouse

CRONPEEK_PING = "https://us-central1-todd-agent-prod.cloudfunctions.net/cronpeekApi/ping/mon_c9v3b6n0m2"


class Command(BaseCommand):
    help = "Sync inventory records from the warehouse API. Run every 30 minutes."

    def handle(self, *args, **options):
        self.stdout.write("Starting inventory sync...")

        try:
            result = sync_inventory_from_warehouse()
            self.stdout.write(
                self.style.SUCCESS(
                    f"Sync complete: {result.updated} updated, {result.created} created, "
                    f"{result.skipped} skipped."
                )
            )
        except Exception as exc:
            # Ping with failure status for immediate alert
            self._ping("failure", str(exc))
            raise CommandError(f"Inventory sync failed: {exc}") from exc

        # All work done — send dead man's switch heartbeat
        self._ping("success", f"{result.updated + result.created} records processed")

    def _ping(self, status, msg=""):
        try:
            params = {"status": status}
            if msg:
                params["msg"] = msg[:256]
            requests.post(CRONPEEK_PING, params=params, timeout=10)
        except requests.RequestException as e:
            self.stderr.write(f"CronPeek ping failed: {e}")

Pair this with a system cron entry:

crontab
# Sync inventory every 30 minutes — monitored by CronPeek (mon_c9v3b6n0m2)
*/30 * * * * cd /srv/myapp && /srv/myapp/.venv/bin/python manage.py sync_inventory >> /var/log/myapp/sync_inventory.log 2>&1

Set the CronPeek monitor interval to 1800 (30 minutes) with a grace_period of 300 seconds. If the cron daemon stops firing, the virtual environment breaks, or the Django command raises an unhandled exception, you get an alert within 35 minutes of the missed run.

For related patterns, see our guide on how to monitor cron jobs (simple guide) and setting up cron job failure notifications.

Monitoring Celery Tasks in Kubernetes

In Kubernetes, Celery workers typically run as Deployment pods and the Beat scheduler runs as a separate single-replica Deployment. Kubernetes will restart crashed pods, but there is a gap between the crash and the restart. During that window, periodic tasks miss their scheduled runs.

kubernetes/celery-beat.yaml — relevant section
apiVersion: apps/v1
kind: Deployment
metadata:
  name: celery-beat
spec:
  replicas: 1  # ALWAYS 1 — running multiple Beat instances double-schedules tasks
  template:
    spec:
      containers:
        - name: celery-beat
          image: myapp:latest
          command: ["celery", "-A", "myapp", "beat", "-l", "info", "--scheduler",
                    "django_celery_beat.schedulers:DatabaseScheduler"]
          livenessProbe:
            exec:
              command: ["celery", "-A", "myapp", "inspect", "ping", "-d", "celery@beat"]
            initialDelaySeconds: 30
            periodSeconds: 60
            failureThreshold: 3

The liveness probe helps Kubernetes restart a hung Beat process, but it does not tell you whether tasks are completing successfully. That is what CronPeek is for. Add heartbeat pings inside your task functions exactly as shown above. The CronPeek alert fires if the beat pod is down, if the worker pods are down, or if the task logic itself is failing—regardless of what Kubernetes sees.

For a deeper look at Kubernetes cron monitoring, see our Kubernetes cron job monitoring guide.

Error Handling: Immediate Failure Alerts

Waiting for a missed heartbeat gives you a delayed alert—interval plus grace period. For critical pipelines, you want to know immediately when a task fails, not 30 minutes later. The solution is to ping CronPeek with status=failure in your exception handler.

Immediate failure notification pattern
import requests
import logging
import traceback
from celery import shared_task

logger = logging.getLogger(__name__)

CRONPEEK_BASE = "https://us-central1-todd-agent-prod.cloudfunctions.net/cronpeekApi/ping"


@shared_task(
    name="myapp.tasks.process_payments",
    max_retries=3,
    default_retry_delay=300,  # Retry after 5 minutes
    autoretry_for=(ConnectionError, TimeoutError),
)
def process_payments():
    """
    Process pending Stripe charges. Runs every 15 minutes.
    Critical — failure means charges are not being collected.
    """
    token = "mon_p2a5y8m3e6"
    try:
        results = run_payment_processing_batch()
        failed_count = sum(1 for r in results if not r.success)

        if failed_count > 0:
            # Partial failure — ping with warning context
            _ping(token, "failure",
                  f"{failed_count}/{len(results)} charges failed")
        else:
            _ping(token, "success",
                  f"{len(results)} charges processed")

    except Exception as exc:
        # Log full traceback locally
        logger.error("Payment processing failed:\n%s", traceback.format_exc())
        # Alert immediately via CronPeek failure ping
        _ping(token, "failure", f"{type(exc).__name__}: {str(exc)[:200]}")
        raise  # Let Celery handle retries


def _ping(token, status, msg=""):
    try:
        params = {"status": status}
        if msg:
            params["msg"] = msg[:256]
        requests.post(f"{CRONPEEK_BASE}/{token}", params=params, timeout=10)
    except requests.RequestException:
        pass

This covers both immediate failures (exception raises, partial batch failures) and silent failures (task stops being scheduled entirely). For more on failure notification strategies, see our dead man's switch monitoring guide.

Using httpx for Async Celery Tasks

If you are running async Celery tasks (Celery 5+ supports async def task functions), use httpx instead of requests to avoid blocking the event loop.

Async Celery task with httpx heartbeat
import httpx
from celery import shared_task

CRONPEEK_PING = "https://us-central1-todd-agent-prod.cloudfunctions.net/cronpeekApi/ping/mon_k4p8q2r7n1"

@shared_task(name="myapp.tasks.async_data_export")
async def async_data_export():
    """Export data to S3. Uses async I/O for concurrent uploads."""
    records = await fetch_export_records()
    urls = await upload_to_s3_concurrent(records)

    async with httpx.AsyncClient(timeout=10) as client:
        try:
            await client.post(
                CRONPEEK_PING,
                params={"status": "success", "msg": f"{len(urls)} files uploaded"}
            )
        except httpx.RequestError:
            pass

Pricing: CronPeek vs Cronitor for Python Teams

A typical Django or Flask application running Celery has 10 to 30 periodic tasks: data syncs, report generators, email queues, cache warmers, subscription billing jobs, cleanup routines. Here is what monitoring all of them costs.

Service Free Tier 50 Monitors Unlimited
CronPeek 5 monitors $9/mo $29/mo
Cronitor 1 monitor ~$100/mo Custom
Dead Man's Snitch 1 snitch $199/mo Custom
Healthchecks.io 20 checks $20/mo $80/mo
Better Uptime 5 heartbeats $85/mo Custom
Save $91/mo vs Cronitor for 50 monitors

Cronitor charges roughly $2 per monitor per month. Fifty Celery Beat tasks costs $100/mo—$1,200/year. CronPeek's Starter plan covers all 50 monitors for $9/mo—$108/year. That is the same dead man's switch functionality, the same API, the same alerting, at 91% less cost.

If you need more than 50, the Pro plan at $29/mo removes the limit entirely. Monitor every Celery task, every management command, every APScheduler job, every AWS Lambda scheduled function. No per-monitor pricing means no surprise bills when you add monitors during a sprint.

The math: Cronitor for 50 monitors: $1,200/year. CronPeek for 50 monitors: $108/year. If your engineering team costs $150/hr, CronPeek saves enough in one year to cover six hours of engineering time. Spend that six hours building features instead.

Quick Start: Add Celery Monitoring in 5 Minutes

Here is the complete setup from zero to monitored:

Terminal
# 1. Create a monitor for your most critical Celery Beat task
curl -X POST https://us-central1-todd-agent-prod.cloudfunctions.net/cronpeekApi/monitors \
  -H "Authorization: Bearer YOUR_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{"name": "celery-nightly-sync", "interval": 86400, "grace_period": 900}'

# Response: {"ping_url": ".../ping/mon_YOUR_TOKEN", ...}

# 2. Set up an email alert
curl -X POST https://us-central1-todd-agent-prod.cloudfunctions.net/cronpeekApi/alerts \
  -H "Authorization: Bearer YOUR_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{"type": "email", "address": "oncall@yourcompany.com"}'

# 3. Install requests if not already present
pip install requests

# 4. Add the ping to your task (at the end of the success path):
#    import requests
#    requests.post("https://us-central1-todd-agent-prod.cloudfunctions.net/cronpeekApi/ping/mon_YOUR_TOKEN", timeout=10)

# Done. Your task is monitored. You'll know within minutes if it stops running.

Start with your most critical Celery Beat task—usually the one that would cause the most damage if it silently stopped. Add the ping, verify you see heartbeats in the CronPeek dashboard, then add monitors for the rest of your schedule. See also: monitoring scheduled tasks via API for a broader overview of the monitoring API.

Start monitoring your Celery tasks

Free tier includes 5 monitors. No credit card required. Works with Celery, APScheduler, Django management commands, and any Python scheduler that runs on a schedule.

Get started free →