Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions judoscale/celery/collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,10 +154,19 @@ def collect(self) -> List[Metric]:
for queue in self.queues:
if result := self.oldest_task_and_timestamp(queue):
task, timestamp = result
logger.debug(f"Task: {timestamp} | {task}")
if timestamp:
metrics.append(
Metric.for_queue(queue_name=queue, oldest_job_ts=timestamp)
)
metric = Metric.for_queue(queue_name=queue, oldest_job_ts=timestamp)
metric.report_metadata = {
"task": task.get("headers", {}).get("task"),
"id": task.get("headers", {}).get("id"),
"eta": task.get("headers", {}).get("eta"),
"retries": task.get("headers", {}).get("retries"),
"published_at": task.get("properties", {}).get("published_at"),
"timestamp": timestamp,
}

metrics.append(metric)
else:
task_id = task.get("id", None)
logger.warning(
Expand Down
1 change: 1 addition & 0 deletions judoscale/core/metric.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class Metric:
value: int
queue_name: Optional[str] = None
measurement: str = "qt"
report_metadata: Optional[dict] = None

@property
def as_tuple(self) -> Tuple[int, int, str, Optional[str]]:
Expand Down
5 changes: 5 additions & 0 deletions judoscale/core/reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,11 @@ def _build_report(self, metrics: List[Metric]):
"config": self.config.for_report,
"adapters": dict(adapter.as_tuple for adapter in self.adapters),
"metrics": [metric.as_tuple for metric in metrics],
"metadata": dict(
(metric.queue_name, metric.report_metadata)
for metric in metrics
if metric.report_metadata is not None
),
}


Expand Down
37 changes: 37 additions & 0 deletions tests/test_collectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,43 @@ def mock_lindex(queue, _):
assert metrics[3].queue_name == "foo"
assert metrics[3].value == approx(60000, abs=100)

def test_collect_metadata(self, worker_1, celery):
now = time.time()
celery.connection_for_read().channel().client.scan_iter.return_value = [b"foo"]
celery.connection_for_read().channel().client.lindex.return_value = bytes(
json.dumps(
{
"id": "123abc",
"headers": {
"eta": None,
"retries": 0,
"id": "123abc",
"task": "my.task",
},
"properties": {"published_at": now - 60},
}
),
"utf-8",
)

collector = CeleryMetricsCollector(worker_1, celery)
metrics = collector.collect()
metrics = sorted(metrics, key=lambda m: m.queue_name)

assert len(metrics) == 1

assert metrics[0].measurement == "qt"
assert metrics[0].queue_name == "foo"
assert metrics[0].value == approx(60000, abs=100)
assert metrics[0].report_metadata == {
"id": "123abc",
"task": "my.task",
"eta": None,
"retries": 0,
"published_at": now - 60,
"timestamp": now - 60,
}


class TestRQMetricsCollector:
def test_adapter_config(self, render_worker):
Expand Down
21 changes: 20 additions & 1 deletion tests/test_reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,29 @@ def test_build_report(self, reporter):
report = reporter._build_report([metric])

assert sorted(list(report.keys())) == sorted(
["adapters", "config", "container", "pid", "metrics"]
["adapters", "config", "container", "pid", "metrics", "metadata"]
)
assert len(report["metrics"]) == 1
assert report["metrics"][0] == (1355314320, 123, "test", None)
assert report["metadata"] == dict()

def test_build_report_metadata(self, reporter):
ts = datetime.fromisoformat("2012-12-12T12:12:00+00:00").timestamp()

metric = Metric(measurement="qt", timestamp=ts, value=123, queue_name="q1")
metric.report_metadata = {"task": "my.task1", "id": "1", "published_at": 999}

metric2 = Metric(measurement="qt", timestamp=ts, value=456, queue_name="q2")
metric2.report_metadata = {"task": "my.task2", "id": "2", "published_at": 987}

metric3 = Metric(measurement="qt", timestamp=ts, value=789, queue_name="nope")

report = reporter._build_report([metric, metric2, metric3])

assert report["metadata"] == {
"q1": {"task": "my.task1", "id": "1", "published_at": 999},
"q2": {"task": "my.task2", "id": "2", "published_at": 987},
}

def test_no_explicit_adapter(self, reporter):
assert len(reporter.adapters) == 1
Expand Down