Monitor Celery Tasks and Python Scheduled Jobs with CronPeek
Your Celery Beat scheduler is running. The workers are running. But are your periodic tasks actually executing and succeeding? A worker process can be alive and dequeuing tasks while your beat schedule silently stops triggering, your Redis broker quietly drops messages, or your task logic raises an exception that Celery swallows and retries into oblivion. This guide shows you how to add dead man's switch monitoring to every Python scheduling pattern—Celery tasks, Celery Beat, APScheduler, and Django management commands—so you find out immediately when something breaks.
Why Celery Monitoring Is Harder Than It Looks
Celery is powerful and it is also a distributed system with a lot of moving parts. You have a beat scheduler producing tasks, a broker (usually Redis or RabbitMQ) queuing them, and one or more workers consuming them. Any of these layers can fail independently, and none of them will tell you in a way that surfaces as an obvious error.
Here are the failure modes that catch Python developers off guard:
- Beat process crash. The Celery Beat scheduler is a single process. If it crashes or is not restarted after a deploy, your entire periodic task schedule stops silently. Workers stay up. The broker is healthy. The app looks fine. Nothing runs.
- Worker starvation. Your tasks pile up in the queue faster than workers can process them. Beat keeps scheduling. Workers keep consuming. But the effective execution rate drops, and time-sensitive tasks run hours late. No exception is raised anywhere.
- Silent task exceptions. Celery retries failed tasks by default. If your task raises an exception, Celery retries it up to the configured max. Once retries are exhausted, the task moves to a failure state—but unless you have explicit failure handling, this produces no alert. Your nightly data sync just stopped working three days ago.
- Broker connection loss. Redis goes down for 30 seconds during a failover. Beat tries to publish tasks, fails, and depending on your configuration either drops them or retries. Workers reconnect but the tasks that were missed during the outage are gone.
- Timezone and DST bugs. Celery Beat has a known issue with daylight saving time transitions when
CELERY_TIMEZONEis set. Tasks can run twice or skip entirely around DST boundaries. You only find out when stakeholders ask why last quarter's rollup is missing. - Task timeouts. Your task acquires a database lock, the query hangs, and the task eventually times out. The soft timeout exception is raised, the task is marked failed, and the next run is scheduled. If this pattern repeats, your task never completes successfully—it just keeps timing out.
Celery Flower shows you task history and worker status, but it does not alert you when a periodic task stops completing. Traditional uptime monitoring does not catch any of this either—your HTTP endpoints return 200 while your data pipelines are dead.
The core problem: Celery tasks fail by not completing successfully. The beat process looks healthy. The workers respond to pings. But the actual job has not run in 48 hours. A dead man's switch is the only pattern that detects the absence of a successful execution.
How Dead Man's Switch Monitoring Works with CronPeek
The pattern is straightforward. At the end of your task—after all the real work is done—you send an HTTP ping to CronPeek. CronPeek expects that ping at the interval you configure. If it stops arriving, CronPeek sends you an alert.
The API base URL is https://us-central1-todd-agent-prod.cloudfunctions.net/cronpeekApi. You create monitors with POST /monitors and send heartbeats with a GET, POST, or HEAD request to /ping/:token. Any HTTP method works, which keeps the integration simple from Python.
First, create a monitor:
Create a monitor via curlcurl -X POST https://us-central1-todd-agent-prod.cloudfunctions.net/cronpeekApi/monitors \
-H "Authorization: Bearer YOUR_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"name": "celery-nightly-sync",
"interval": 86400,
"grace_period": 900
}'
The response gives you a ping_url:
{
"id": "mon_k4p8q2r7n1",
"name": "celery-nightly-sync",
"ping_url": "https://us-central1-todd-agent-prod.cloudfunctions.net/cronpeekApi/ping/mon_k4p8q2r7n1",
"status": "waiting",
"interval": 86400,
"grace_period": 900
}
Set interval to your expected run frequency in seconds. Set grace_period to how long after the deadline CronPeek should wait before alerting—useful for tasks that occasionally run a few minutes late.
Monitoring a Basic Celery Task
The simplest integration: add a requests.post() call at the end of your task function, inside the happy path. If the task raises an exception before reaching the ping, no heartbeat is sent and CronPeek alerts on the missed run.
import requests
from celery import shared_task
CRONPEEK_PING = "https://us-central1-todd-agent-prod.cloudfunctions.net/cronpeekApi/ping/mon_k4p8q2r7n1"
@shared_task(name="sync_user_data")
def sync_user_data():
"""Sync user records from the external CRM. Runs nightly at 2 AM."""
users = fetch_users_from_crm()
update_local_records(users)
send_sync_summary_email(len(users))
# All steps succeeded — send heartbeat to CronPeek
try:
requests.post(CRONPEEK_PING, timeout=10)
except requests.RequestException:
pass # Never let monitoring failures break the task itself
Two important details here. First, the ping is the last line of the success path. If fetch_users_from_crm() raises, the ping never fires. Second, the ping call is wrapped in its own try/except so that a CronPeek outage never causes your task to fail.
Celery Beat Monitoring—The Full Pattern
Celery Beat is a periodic task scheduler. It reads your CELERYBEAT_SCHEDULE (or database-backed schedule) and publishes tasks to the broker at the configured intervals. Beat itself is a single process, and it does not verify that tasks execute or succeed. Monitoring Beat means monitoring the tasks it triggers.
Configuring Celery Beat with a monitored task
celery.py — Beat schedule with CronPeek integrationfrom celery import Celery
from celery.schedules import crontab
app = Celery("myapp")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.conf.beat_schedule = {
"nightly-data-sync": {
"task": "myapp.tasks.sync_user_data",
"schedule": crontab(hour=2, minute=0), # Every day at 2 AM UTC
},
"hourly-cache-warmer": {
"task": "myapp.tasks.warm_cache",
"schedule": crontab(minute=0), # Every hour at :00
},
"weekly-report": {
"task": "myapp.tasks.generate_weekly_report",
"schedule": crontab(hour=8, minute=0, day_of_week="monday"),
},
}
tasks.py — monitored Beat tasks
import requests
from celery import shared_task
CRONPEEK_BASE = "https://us-central1-todd-agent-prod.cloudfunctions.net/cronpeekApi/ping"
# Each task has its own monitor token
MONITORS = {
"nightly-data-sync": "mon_k4p8q2r7n1",
"hourly-cache-warmer": "mon_j3m7h9x2k5",
"weekly-report": "mon_r6w1b4v8n0",
}
def ping(token, status="success", msg=""):
"""Send a heartbeat to CronPeek. Never raises."""
try:
params = {"status": status}
if msg:
params["msg"] = msg[:256]
requests.post(f"{CRONPEEK_BASE}/{token}", params=params, timeout=10)
except requests.RequestException:
pass
@shared_task(name="myapp.tasks.sync_user_data")
def sync_user_data():
token = MONITORS["nightly-data-sync"]
try:
users = fetch_users_from_crm()
update_local_records(users)
send_sync_summary_email(len(users))
ping(token, "success", f"synced {len(users)} users")
except Exception as exc:
ping(token, "failure", str(exc))
raise # Re-raise so Celery records the failure and retries if configured
@shared_task(name="myapp.tasks.warm_cache")
def warm_cache():
token = MONITORS["hourly-cache-warmer"]
try:
count = rebuild_query_cache()
ping(token, "success", f"{count} cache keys rebuilt")
except Exception as exc:
ping(token, "failure", str(exc))
raise
@shared_task(name="myapp.tasks.generate_weekly_report")
def generate_weekly_report():
token = MONITORS["weekly-report"]
try:
report_url = build_and_upload_report()
notify_subscribers(report_url)
ping(token, "success", f"report: {report_url}")
except Exception as exc:
ping(token, "failure", str(exc))
raise
The raise after ping(token, "failure", ...) is intentional. You want Celery to see the failure, trigger its retry logic, and record the exception in your task result backend. CronPeek gives you an immediate alert; Celery still manages the retry lifecycle.
Two failure modes, two alerts: If the task runs and fails, you get an immediate CronPeek failure alert. If Beat stops scheduling entirely (beat process down, broker unreachable), the heartbeat never arrives and CronPeek alerts after your grace period. Both failure modes are covered.
Using a Decorator for Clean Celery Monitoring
If you have many Celery tasks to monitor, a decorator keeps the signal-to-noise ratio high. The task logic stays focused; the monitoring is a single annotation.
utils/cronpeek.pyimport functools
import time
import requests
CRONPEEK_BASE = "https://us-central1-todd-agent-prod.cloudfunctions.net/cronpeekApi/ping"
def monitored(token):
"""
Decorator that wraps a Celery task with CronPeek dead man's switch monitoring.
Usage:
@shared_task
@monitored("mon_k4p8q2r7n1")
def my_task():
do_work()
"""
def decorator(fn):
@functools.wraps(fn)
def wrapper(*args, **kwargs):
start = time.monotonic()
try:
result = fn(*args, **kwargs)
elapsed = int((time.monotonic() - start) * 1000)
_ping(token, "success", f"completed in {elapsed}ms")
return result
except Exception as exc:
_ping(token, "failure", str(exc))
raise
return wrapper
return decorator
def _ping(token, status="success", msg=""):
try:
params = {"status": status}
if msg:
params["msg"] = msg[:256]
requests.post(f"{CRONPEEK_BASE}/{token}", params=params, timeout=10)
except requests.RequestException:
pass
tasks.py — clean with decorator
from celery import shared_task
from myapp.utils.cronpeek import monitored
@shared_task(name="myapp.tasks.sync_user_data")
@monitored("mon_k4p8q2r7n1")
def sync_user_data():
users = fetch_users_from_crm()
update_local_records(users)
send_sync_summary_email(len(users))
@shared_task(name="myapp.tasks.warm_cache")
@monitored("mon_j3m7h9x2k5")
def warm_cache():
rebuild_query_cache()
@shared_task(name="myapp.tasks.generate_weekly_report")
@monitored("mon_r6w1b4v8n0")
def generate_weekly_report():
report_url = build_and_upload_report()
notify_subscribers(report_url)
Clean. Each task gets monitoring in one line and the task body stays focused on business logic. The decorator re-raises exceptions so Celery's retry and failure recording are unaffected.
APScheduler Integration
APScheduler is a popular in-process Python scheduler that does not require a separate broker. It runs inside your application and supports cron, interval, and date-based triggers. The same dead man's switch pattern applies.
scheduler.py — APScheduler with CronPeek monitoringimport requests
import logging
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
logger = logging.getLogger(__name__)
CRONPEEK_BASE = "https://us-central1-todd-agent-prod.cloudfunctions.net/cronpeekApi/ping"
scheduler = BackgroundScheduler(timezone="UTC")
def ping(token, status="success", msg=""):
try:
params = {"status": status}
if msg:
params["msg"] = msg[:256]
requests.post(f"{CRONPEEK_BASE}/{token}", params=params, timeout=10)
except requests.RequestException as e:
logger.warning("CronPeek ping failed: %s", e)
def nightly_invoice_job():
"""Generates and emails monthly invoices. Runs at 3 AM every day."""
token = "mon_a2s5d8f1g4"
try:
count = generate_pending_invoices()
send_invoice_emails(count)
logger.info("Invoice job completed: %d invoices", count)
ping(token, "success", f"{count} invoices generated")
except Exception as exc:
logger.exception("Invoice job failed")
ping(token, "failure", str(exc))
def hourly_metrics_job():
"""Aggregates usage metrics into the reporting database."""
token = "mon_h7j2k5l8m1"
try:
rows = aggregate_hourly_metrics()
ping(token, "success", f"{rows} rows written")
except Exception as exc:
ping(token, "failure", str(exc))
raise
# Register jobs
scheduler.add_job(
nightly_invoice_job,
CronTrigger(hour=3, minute=0, timezone="UTC"),
id="nightly-invoices",
max_instances=1, # Prevent overlapping runs
misfire_grace_time=300, # Tolerate up to 5 minutes of delay
)
scheduler.add_job(
hourly_metrics_job,
CronTrigger(minute=0, timezone="UTC"),
id="hourly-metrics",
max_instances=1,
misfire_grace_time=120,
)
scheduler.start()
Note max_instances=1 on each job. APScheduler will skip a run if the previous one is still executing, which can cause missed heartbeats that look like task failures. Set your CronPeek grace_period generously for jobs that occasionally run long, or use APScheduler's misfire_grace_time to control overlap behavior.
Django Management Command with Dead Man's Switch
Django management commands invoked via cron (system cron, Kubernetes CronJob, or Heroku Scheduler) are a common pattern for periodic Python tasks. They are simple, testable, and deploy cleanly. The failure mode: if the cron entry breaks or the Django process exits non-zero, the command silently stops running and nobody notices for days.
management/commands/sync_inventory.pyimport requests
from django.core.management.base import BaseCommand, CommandError
from myapp.services import sync_inventory_from_warehouse
CRONPEEK_PING = "https://us-central1-todd-agent-prod.cloudfunctions.net/cronpeekApi/ping/mon_c9v3b6n0m2"
class Command(BaseCommand):
help = "Sync inventory records from the warehouse API. Run every 30 minutes."
def handle(self, *args, **options):
self.stdout.write("Starting inventory sync...")
try:
result = sync_inventory_from_warehouse()
self.stdout.write(
self.style.SUCCESS(
f"Sync complete: {result.updated} updated, {result.created} created, "
f"{result.skipped} skipped."
)
)
except Exception as exc:
# Ping with failure status for immediate alert
self._ping("failure", str(exc))
raise CommandError(f"Inventory sync failed: {exc}") from exc
# All work done — send dead man's switch heartbeat
self._ping("success", f"{result.updated + result.created} records processed")
def _ping(self, status, msg=""):
try:
params = {"status": status}
if msg:
params["msg"] = msg[:256]
requests.post(CRONPEEK_PING, params=params, timeout=10)
except requests.RequestException as e:
self.stderr.write(f"CronPeek ping failed: {e}")
Pair this with a system cron entry:
crontab# Sync inventory every 30 minutes — monitored by CronPeek (mon_c9v3b6n0m2)
*/30 * * * * cd /srv/myapp && /srv/myapp/.venv/bin/python manage.py sync_inventory >> /var/log/myapp/sync_inventory.log 2>&1
Set the CronPeek monitor interval to 1800 (30 minutes) with a grace_period of 300 seconds. If the cron daemon stops firing, the virtual environment breaks, or the Django command raises an unhandled exception, you get an alert within 35 minutes of the missed run.
For related patterns, see our guide on how to monitor cron jobs (simple guide) and setting up cron job failure notifications.
Monitoring Celery Tasks in Kubernetes
In Kubernetes, Celery workers typically run as Deployment pods and the Beat scheduler runs as a separate single-replica Deployment. Kubernetes will restart crashed pods, but there is a gap between the crash and the restart. During that window, periodic tasks miss their scheduled runs.
apiVersion: apps/v1
kind: Deployment
metadata:
name: celery-beat
spec:
replicas: 1 # ALWAYS 1 — running multiple Beat instances double-schedules tasks
template:
spec:
containers:
- name: celery-beat
image: myapp:latest
command: ["celery", "-A", "myapp", "beat", "-l", "info", "--scheduler",
"django_celery_beat.schedulers:DatabaseScheduler"]
livenessProbe:
exec:
command: ["celery", "-A", "myapp", "inspect", "ping", "-d", "celery@beat"]
initialDelaySeconds: 30
periodSeconds: 60
failureThreshold: 3
The liveness probe helps Kubernetes restart a hung Beat process, but it does not tell you whether tasks are completing successfully. That is what CronPeek is for. Add heartbeat pings inside your task functions exactly as shown above. The CronPeek alert fires if the beat pod is down, if the worker pods are down, or if the task logic itself is failing—regardless of what Kubernetes sees.
For a deeper look at Kubernetes cron monitoring, see our Kubernetes cron job monitoring guide.
Error Handling: Immediate Failure Alerts
Waiting for a missed heartbeat gives you a delayed alert—interval plus grace period. For critical pipelines, you want to know immediately when a task fails, not 30 minutes later. The solution is to ping CronPeek with status=failure in your exception handler.
import requests
import logging
import traceback
from celery import shared_task
logger = logging.getLogger(__name__)
CRONPEEK_BASE = "https://us-central1-todd-agent-prod.cloudfunctions.net/cronpeekApi/ping"
@shared_task(
name="myapp.tasks.process_payments",
max_retries=3,
default_retry_delay=300, # Retry after 5 minutes
autoretry_for=(ConnectionError, TimeoutError),
)
def process_payments():
"""
Process pending Stripe charges. Runs every 15 minutes.
Critical — failure means charges are not being collected.
"""
token = "mon_p2a5y8m3e6"
try:
results = run_payment_processing_batch()
failed_count = sum(1 for r in results if not r.success)
if failed_count > 0:
# Partial failure — ping with warning context
_ping(token, "failure",
f"{failed_count}/{len(results)} charges failed")
else:
_ping(token, "success",
f"{len(results)} charges processed")
except Exception as exc:
# Log full traceback locally
logger.error("Payment processing failed:\n%s", traceback.format_exc())
# Alert immediately via CronPeek failure ping
_ping(token, "failure", f"{type(exc).__name__}: {str(exc)[:200]}")
raise # Let Celery handle retries
def _ping(token, status, msg=""):
try:
params = {"status": status}
if msg:
params["msg"] = msg[:256]
requests.post(f"{CRONPEEK_BASE}/{token}", params=params, timeout=10)
except requests.RequestException:
pass
This covers both immediate failures (exception raises, partial batch failures) and silent failures (task stops being scheduled entirely). For more on failure notification strategies, see our dead man's switch monitoring guide.
Using httpx for Async Celery Tasks
If you are running async Celery tasks (Celery 5+ supports async def task functions), use httpx instead of requests to avoid blocking the event loop.
import httpx
from celery import shared_task
CRONPEEK_PING = "https://us-central1-todd-agent-prod.cloudfunctions.net/cronpeekApi/ping/mon_k4p8q2r7n1"
@shared_task(name="myapp.tasks.async_data_export")
async def async_data_export():
"""Export data to S3. Uses async I/O for concurrent uploads."""
records = await fetch_export_records()
urls = await upload_to_s3_concurrent(records)
async with httpx.AsyncClient(timeout=10) as client:
try:
await client.post(
CRONPEEK_PING,
params={"status": "success", "msg": f"{len(urls)} files uploaded"}
)
except httpx.RequestError:
pass
Pricing: CronPeek vs Cronitor for Python Teams
A typical Django or Flask application running Celery has 10 to 30 periodic tasks: data syncs, report generators, email queues, cache warmers, subscription billing jobs, cleanup routines. Here is what monitoring all of them costs.
| Service | Free Tier | 50 Monitors | Unlimited |
|---|---|---|---|
| CronPeek | 5 monitors | $9/mo | $29/mo |
| Cronitor | 1 monitor | ~$100/mo | Custom |
| Dead Man's Snitch | 1 snitch | $199/mo | Custom |
| Healthchecks.io | 20 checks | $20/mo | $80/mo |
| Better Uptime | 5 heartbeats | $85/mo | Custom |
Cronitor charges roughly $2 per monitor per month. Fifty Celery Beat tasks costs $100/mo—$1,200/year. CronPeek's Starter plan covers all 50 monitors for $9/mo—$108/year. That is the same dead man's switch functionality, the same API, the same alerting, at 91% less cost.
If you need more than 50, the Pro plan at $29/mo removes the limit entirely. Monitor every Celery task, every management command, every APScheduler job, every AWS Lambda scheduled function. No per-monitor pricing means no surprise bills when you add monitors during a sprint.
The math: Cronitor for 50 monitors: $1,200/year. CronPeek for 50 monitors: $108/year. If your engineering team costs $150/hr, CronPeek saves enough in one year to cover six hours of engineering time. Spend that six hours building features instead.
Quick Start: Add Celery Monitoring in 5 Minutes
Here is the complete setup from zero to monitored:
Terminal# 1. Create a monitor for your most critical Celery Beat task
curl -X POST https://us-central1-todd-agent-prod.cloudfunctions.net/cronpeekApi/monitors \
-H "Authorization: Bearer YOUR_API_KEY" \
-H "Content-Type: application/json" \
-d '{"name": "celery-nightly-sync", "interval": 86400, "grace_period": 900}'
# Response: {"ping_url": ".../ping/mon_YOUR_TOKEN", ...}
# 2. Set up an email alert
curl -X POST https://us-central1-todd-agent-prod.cloudfunctions.net/cronpeekApi/alerts \
-H "Authorization: Bearer YOUR_API_KEY" \
-H "Content-Type: application/json" \
-d '{"type": "email", "address": "oncall@yourcompany.com"}'
# 3. Install requests if not already present
pip install requests
# 4. Add the ping to your task (at the end of the success path):
# import requests
# requests.post("https://us-central1-todd-agent-prod.cloudfunctions.net/cronpeekApi/ping/mon_YOUR_TOKEN", timeout=10)
# Done. Your task is monitored. You'll know within minutes if it stops running.
Start with your most critical Celery Beat task—usually the one that would cause the most damage if it silently stopped. Add the ping, verify you see heartbeats in the CronPeek dashboard, then add monitors for the rest of your schedule. See also: monitoring scheduled tasks via API for a broader overview of the monitoring API.
Start monitoring your Celery tasks
Free tier includes 5 monitors. No credit card required. Works with Celery, APScheduler, Django management commands, and any Python scheduler that runs on a schedule.
Get started free →