diff --git a/judoscale/celery/collector.py b/judoscale/celery/collector.py index a94e010..f835267 100644 --- a/judoscale/celery/collector.py +++ b/judoscale/celery/collector.py @@ -154,10 +154,19 @@ def collect(self) -> List[Metric]: for queue in self.queues: if result := self.oldest_task_and_timestamp(queue): task, timestamp = result + logger.debug(f"Task: {timestamp} | {task}") if timestamp: - metrics.append( - Metric.for_queue(queue_name=queue, oldest_job_ts=timestamp) - ) + metric = Metric.for_queue(queue_name=queue, oldest_job_ts=timestamp) + metric.report_metadata = { + "task": task.get("headers", {}).get("task"), + "id": task.get("headers", {}).get("id"), + "eta": task.get("headers", {}).get("eta"), + "retries": task.get("headers", {}).get("retries"), + "published_at": task.get("properties", {}).get("published_at"), + "timestamp": timestamp, + } + + metrics.append(metric) else: task_id = task.get("id", None) logger.warning( diff --git a/judoscale/core/metric.py b/judoscale/core/metric.py index 6fc5df8..1c1ee72 100644 --- a/judoscale/core/metric.py +++ b/judoscale/core/metric.py @@ -34,6 +34,7 @@ class Metric: value: int queue_name: Optional[str] = None measurement: str = "qt" + report_metadata: Optional[dict] = None @property def as_tuple(self) -> Tuple[int, int, str, Optional[str]]: diff --git a/judoscale/core/reporter.py b/judoscale/core/reporter.py index 67ae219..7b55517 100644 --- a/judoscale/core/reporter.py +++ b/judoscale/core/reporter.py @@ -128,6 +128,11 @@ def _build_report(self, metrics: List[Metric]): "config": self.config.for_report, "adapters": dict(adapter.as_tuple for adapter in self.adapters), "metrics": [metric.as_tuple for metric in metrics], + "metadata": dict( + (metric.queue_name, metric.report_metadata) + for metric in metrics + if metric.report_metadata is not None + ), } diff --git a/tests/test_collectors.py b/tests/test_collectors.py index 10195a4..c3c2b32 100644 --- a/tests/test_collectors.py +++ b/tests/test_collectors.py @@ -405,6 +405,43 @@ def mock_lindex(queue, _): assert metrics[3].queue_name == "foo" assert metrics[3].value == approx(60000, abs=100) + def test_collect_metadata(self, worker_1, celery): + now = time.time() + celery.connection_for_read().channel().client.scan_iter.return_value = [b"foo"] + celery.connection_for_read().channel().client.lindex.return_value = bytes( + json.dumps( + { + "id": "123abc", + "headers": { + "eta": None, + "retries": 0, + "id": "123abc", + "task": "my.task", + }, + "properties": {"published_at": now - 60}, + } + ), + "utf-8", + ) + + collector = CeleryMetricsCollector(worker_1, celery) + metrics = collector.collect() + metrics = sorted(metrics, key=lambda m: m.queue_name) + + assert len(metrics) == 1 + + assert metrics[0].measurement == "qt" + assert metrics[0].queue_name == "foo" + assert metrics[0].value == approx(60000, abs=100) + assert metrics[0].report_metadata == { + "id": "123abc", + "task": "my.task", + "eta": None, + "retries": 0, + "published_at": now - 60, + "timestamp": now - 60, + } + class TestRQMetricsCollector: def test_adapter_config(self, render_worker): diff --git a/tests/test_reporter.py b/tests/test_reporter.py index 9f2b3fd..d1d42ec 100644 --- a/tests/test_reporter.py +++ b/tests/test_reporter.py @@ -32,10 +32,29 @@ def test_build_report(self, reporter): report = reporter._build_report([metric]) assert sorted(list(report.keys())) == sorted( - ["adapters", "config", "container", "pid", "metrics"] + ["adapters", "config", "container", "pid", "metrics", "metadata"] ) assert len(report["metrics"]) == 1 assert report["metrics"][0] == (1355314320, 123, "test", None) + assert report["metadata"] == dict() + + def test_build_report_metadata(self, reporter): + ts = datetime.fromisoformat("2012-12-12T12:12:00+00:00").timestamp() + + metric = Metric(measurement="qt", timestamp=ts, value=123, queue_name="q1") + metric.report_metadata = {"task": "my.task1", "id": "1", "published_at": 999} + + metric2 = Metric(measurement="qt", timestamp=ts, value=456, queue_name="q2") + metric2.report_metadata = {"task": "my.task2", "id": "2", "published_at": 987} + + metric3 = Metric(measurement="qt", timestamp=ts, value=789, queue_name="nope") + + report = reporter._build_report([metric, metric2, metric3]) + + assert report["metadata"] == { + "q1": {"task": "my.task1", "id": "1", "published_at": 999}, + "q2": {"task": "my.task2", "id": "2", "published_at": 987}, + } def test_no_explicit_adapter(self, reporter): assert len(reporter.adapters) == 1