From bc66f23cb852674f9fc451dc9a005ad273ff9127 Mon Sep 17 00:00:00 2001 From: Carlos Antonio da Silva Date: Mon, 10 Nov 2025 14:45:50 -0300 Subject: [PATCH] Collect & report task metadata for Celery This additional task metadata will help troubleshooting an issue with some queue time spikes that we're investigating. With each queue we'll add information about the task and the timestamps that were used to calculate queue time, so it can hopefully help us identify the source of the queue time spikes. --- judoscale/celery/collector.py | 15 +++++++++++--- judoscale/core/metric.py | 1 + judoscale/core/reporter.py | 5 +++++ tests/test_collectors.py | 37 +++++++++++++++++++++++++++++++++++ tests/test_reporter.py | 21 +++++++++++++++++++- 5 files changed, 75 insertions(+), 4 deletions(-) diff --git a/judoscale/celery/collector.py b/judoscale/celery/collector.py index a94e010..f835267 100644 --- a/judoscale/celery/collector.py +++ b/judoscale/celery/collector.py @@ -154,10 +154,19 @@ def collect(self) -> List[Metric]: for queue in self.queues: if result := self.oldest_task_and_timestamp(queue): task, timestamp = result + logger.debug(f"Task: {timestamp} | {task}") if timestamp: - metrics.append( - Metric.for_queue(queue_name=queue, oldest_job_ts=timestamp) - ) + metric = Metric.for_queue(queue_name=queue, oldest_job_ts=timestamp) + metric.report_metadata = { + "task": task.get("headers", {}).get("task"), + "id": task.get("headers", {}).get("id"), + "eta": task.get("headers", {}).get("eta"), + "retries": task.get("headers", {}).get("retries"), + "published_at": task.get("properties", {}).get("published_at"), + "timestamp": timestamp, + } + + metrics.append(metric) else: task_id = task.get("id", None) logger.warning( diff --git a/judoscale/core/metric.py b/judoscale/core/metric.py index 6fc5df8..1c1ee72 100644 --- a/judoscale/core/metric.py +++ b/judoscale/core/metric.py @@ -34,6 +34,7 @@ class Metric: value: int queue_name: Optional[str] = None measurement: str = "qt" + report_metadata: Optional[dict] = None @property def as_tuple(self) -> Tuple[int, int, str, Optional[str]]: diff --git a/judoscale/core/reporter.py b/judoscale/core/reporter.py index 67ae219..7b55517 100644 --- a/judoscale/core/reporter.py +++ b/judoscale/core/reporter.py @@ -128,6 +128,11 @@ def _build_report(self, metrics: List[Metric]): "config": self.config.for_report, "adapters": dict(adapter.as_tuple for adapter in self.adapters), "metrics": [metric.as_tuple for metric in metrics], + "metadata": dict( + (metric.queue_name, metric.report_metadata) + for metric in metrics + if metric.report_metadata is not None + ), } diff --git a/tests/test_collectors.py b/tests/test_collectors.py index 10195a4..c3c2b32 100644 --- a/tests/test_collectors.py +++ b/tests/test_collectors.py @@ -405,6 +405,43 @@ def mock_lindex(queue, _): assert metrics[3].queue_name == "foo" assert metrics[3].value == approx(60000, abs=100) + def test_collect_metadata(self, worker_1, celery): + now = time.time() + celery.connection_for_read().channel().client.scan_iter.return_value = [b"foo"] + celery.connection_for_read().channel().client.lindex.return_value = bytes( + json.dumps( + { + "id": "123abc", + "headers": { + "eta": None, + "retries": 0, + "id": "123abc", + "task": "my.task", + }, + "properties": {"published_at": now - 60}, + } + ), + "utf-8", + ) + + collector = CeleryMetricsCollector(worker_1, celery) + metrics = collector.collect() + metrics = sorted(metrics, key=lambda m: m.queue_name) + + assert len(metrics) == 1 + + assert metrics[0].measurement == "qt" + assert metrics[0].queue_name == "foo" + assert metrics[0].value == approx(60000, abs=100) + assert metrics[0].report_metadata == { + "id": "123abc", + "task": "my.task", + "eta": None, + "retries": 0, + "published_at": now - 60, + "timestamp": now - 60, + } + class TestRQMetricsCollector: def test_adapter_config(self, render_worker): diff --git a/tests/test_reporter.py b/tests/test_reporter.py index 9f2b3fd..d1d42ec 100644 --- a/tests/test_reporter.py +++ b/tests/test_reporter.py @@ -32,10 +32,29 @@ def test_build_report(self, reporter): report = reporter._build_report([metric]) assert sorted(list(report.keys())) == sorted( - ["adapters", "config", "container", "pid", "metrics"] + ["adapters", "config", "container", "pid", "metrics", "metadata"] ) assert len(report["metrics"]) == 1 assert report["metrics"][0] == (1355314320, 123, "test", None) + assert report["metadata"] == dict() + + def test_build_report_metadata(self, reporter): + ts = datetime.fromisoformat("2012-12-12T12:12:00+00:00").timestamp() + + metric = Metric(measurement="qt", timestamp=ts, value=123, queue_name="q1") + metric.report_metadata = {"task": "my.task1", "id": "1", "published_at": 999} + + metric2 = Metric(measurement="qt", timestamp=ts, value=456, queue_name="q2") + metric2.report_metadata = {"task": "my.task2", "id": "2", "published_at": 987} + + metric3 = Metric(measurement="qt", timestamp=ts, value=789, queue_name="nope") + + report = reporter._build_report([metric, metric2, metric3]) + + assert report["metadata"] == { + "q1": {"task": "my.task1", "id": "1", "published_at": 999}, + "q2": {"task": "my.task2", "id": "2", "published_at": 987}, + } def test_no_explicit_adapter(self, reporter): assert len(reporter.adapters) == 1