Skip to content

Conversation

@dadodimauro
Copy link
Collaborator

@dadodimauro dadodimauro commented Dec 31, 2025

  • Updated RedisBackend to use aclose() for closing connections and made state, result, heartbeat, and workers set key methods static.
  • Added abstract method purge_dlq to Broker class for deleting all messages from the Dead Letter Queue.
  • Refactored RabbitMQBroker to implement purge_dlq and improved error handling for DLQ messages.
  • Enhanced RedisBroker to use aclose() for closing connections and made stream, delayed, and DLQ key methods static.
  • Modified Task class to improve context parameter detection and input validation handling.
  • Added integration tests for various task execution scenarios, including retries, DLQ operations, and backend interactions.
  • Updated unit tests for task delay and send methods to ensure proper behavior with validation modes.
  • Added pytest-xdist for parallel test execution.

Summary by CodeRabbit

  • New Features

    • Added Dead Letter Queue purge to permanently remove failed messages
    • Enabled synchronous task delay/send flows
  • Tests

    • Added a comprehensive integration test suite for execution, retries, DLQ, scheduling, concurrency, and edge cases
    • Introduced parallel test execution and expanded unit test matrices across Python 3.11–3.14 and multiple OSes
  • Chores

    • Refactored CI workflows for separate PR/unit/full-test/post-merge coverage runs and Codecov reporting
    • Added distinct unit/integration test targets and parallel test runner support

✏️ Tip: You can customize this high-level summary in your review settings.

…and static methods

- Updated RedisBackend to use aclose() for closing connections and made state, result, heartbeat, and workers set key methods static.
- Added abstract method purge_dlq to Broker class for deleting all messages from the Dead Letter Queue.
- Refactored RabbitMQBroker to implement purge_dlq and improved error handling for DLQ messages.
- Enhanced RedisBroker to use aclose() for closing connections and made stream, delayed, and DLQ key methods static.
- Modified Task class to improve context parameter detection and input validation handling.
- Added integration tests for various task execution scenarios, including retries, DLQ operations, and backend interactions.
- Updated unit tests for task delay and send methods to ensure proper behavior with validation modes.
- Added pytest-xdist for parallel test execution.
@dadodimauro dadodimauro added this to the v1.0.0 milestone Dec 31, 2025
@dadodimauro dadodimauro self-assigned this Dec 31, 2025
@dadodimauro dadodimauro added the enhancement New feature or request label Dec 31, 2025
@dadodimauro dadodimauro requested a review from anvouk as a code owner December 31, 2025 00:11
@coderabbitai
Copy link

coderabbitai bot commented Dec 31, 2025

Caution

Review failed

The pull request is closed.

📝 Walkthrough

Walkthrough

Reorganizes CI into three workflows, splits test targets (unit/integration), refactors broker/backend helpers to staticmethods and async close semantics, adds Broker.purge_dlq and RabbitMQ purge/robust DLQ handling, changes Task annotation inspection and non-async delay support, and adds extensive integration tests and fixtures.

Changes

Cohort / File(s) Summary
CI workflows
\.github/workflows/merge-gate.yml, \.github/workflows/post-merge-coverage.yml, \.github/workflows/pr-unit-tests.yml, \.github/workflows/test.yml
Adds three specialized workflows (merge-gate, post-merge-coverage, pr-unit-tests) and removes the legacy test.yml. Matrices and triggers adjusted for PRs, merge groups, and post-merge coverage upload.
Make/test config
Makefile, pyproject.toml, requirements.txt
Adds test-unit and test-integration targets; updates test-fast/test-slow; adds pytest-xdist and an integration pytest marker for parallelized and partitioned test runs.
Redis backend changes
src/chicory/backend/redis.py, src/chicory/broker/redis.py
Convert key-helper instance methods to @staticmethod; remove auto_close_connection_pool arg; switch from close()/disconnect() to async aclose() semantics for client/pool.
Broker interface and RabbitMQ DLQ
src/chicory/broker/base.py, src/chicory/broker/rabbitmq.py
Add abstract purge_dlq() to Broker; RabbitMQBroker implements purge_dlq (async), converts name helpers to @staticmethod, adds DLQ routing/force options, and improves DLQ message parsing fallback for native DLX messages and error handling.
Task signature & validation
src/chicory/task.py, tests/unit/test_task.py
Replace get_type_hints with inspect.signature-based annotation handling; add _is_context_param(); support string annotations; infer return type from signature; remove async-only guard so sync tasks can use delay/send; update unit tests accordingly.
Test scaffolding & integration suite
tests/conftest.py, tests/integration/test_integration.py
Add _setup_app, parameterized chicory_app fixture for broker/backend combinations, a chicory_worker fixture, and a comprehensive integration test suite (~25 tests) covering DLQ, retries, health, backend storage, scheduling, concurrency, and broker-specific behaviors.
Small refactors / helpers
src/chicory/broker/..., src/chicory/backend/...
Multiple private helper methods converted to @staticmethod across broker/backend modules; call sites adjusted to static invocation; async cleanup semantics standardized.

Sequence Diagram(s)

(omitted)

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

Possibly related PRs

Suggested reviewers

  • anvouk

Poem

🐇 I hopped through code, both near and far,
New gates and tests beneath the star.
Static keys that skip the self,
DLQs swept clean upon the shelf.
Integration blossoms — hop, review, hurrah!

Pre-merge checks and finishing touches

❌ Failed checks (2 warnings)
Check name Status Explanation Resolution
Title check ⚠️ Warning The title focuses narrowly on 'add integration test' but the PR makes extensive changes across multiple areas: CI/CD workflows, test infrastructure, broker/backend refactoring, task improvements, and Makefile updates. While integration tests are added, they represent only a portion of the overall changeset. Revise the title to reflect the primary scope, such as 'refactor: consolidate CI workflows and add integration tests' or 'feat: add integration test suite with broker/backend improvements' to better represent all significant changes.
Docstring Coverage ⚠️ Warning Docstring coverage is 36.47% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (1 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.

📜 Recent review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 82d9181 and 0de449d.

📒 Files selected for processing (1)
  • tests/unit/test_task.py

Comment @coderabbitai help to get the list of available commands and usage tips.

@dadodimauro dadodimauro removed the request for review from anvouk December 31, 2025 00:17
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (7)
Makefile (1)

27-31: Consider adding parallelism to test-integration or documenting the design choice.

test-unit uses -n auto for parallel execution, but test-integration does not. This may be intentional (integration tests often require sequential execution due to shared external resources like Redis/RabbitMQ), but it's worth confirming this is the desired behavior.

If integration tests can safely run in parallel:

 test-integration: lint
-	uv run pytest -vv -m "integration"
+	uv run pytest -vv -n auto -m "integration"
src/chicory/task.py (1)

33-46: Consider consolidating context detection with _is_context_param.

The logic for detecting TaskContext is duplicated here and in _is_context_param() (lines 73-78). You could simplify by reusing the helper method.

🔎 Proposed refactor
         # Check if first param is TaskContext using inspect.signature
-        # This works regardless of whether TaskContext is imported in TYPE_CHECKING
         sig = inspect.signature(fn)
         params = list(sig.parameters.values())
-        self.has_context = False
-        if len(params) > 0:
-            first_param = params[0]
-            annotation = first_param.annotation
-            # Check if annotation is TaskContext (runtime)
-            # or string 'TaskContext' (TYPE_CHECKING)
-            if annotation is TaskContext or (
-                isinstance(annotation, str) and annotation == "TaskContext"
-            ):
-                self.has_context = True
+        self.has_context = len(params) > 0 and self._is_context_param(params[0])

Note: This requires moving _is_context_param above or making it a @staticmethod since it's called before full initialization.

.github/workflows/pr-unit-tests.yml (1)

3-7: Consider adding reopened event type.

The workflow triggers on opened and synchronize, but missing reopened means tests won't run if a PR is closed and reopened.

🔎 Proposed fix
 on:
   pull_request:
     types:
       - opened
       - synchronize
+      - reopened
tests/conftest.py (1)

75-81: Consider isolating worker config modification.

Line 77 modifies chicory_app.config.worker.use_dead_letter_queue directly on the shared app config. If tests run in parallel with pytest-xdist, this could cause race conditions since the same chicory_app fixture instance might be shared.

Consider creating a dedicated WorkerConfig instance:

🔎 Suggested approach
 @pytest_asyncio.fixture
 async def chicory_worker(chicory_app: Chicory) -> AsyncGenerator[Worker]:
-    chicory_app.config.worker.use_dead_letter_queue = True
-    worker = Worker(chicory_app)
+    from chicory.config import WorkerConfig
+    worker_config = WorkerConfig(use_dead_letter_queue=True)
+    worker = Worker(chicory_app, config=worker_config)
     await worker.start()
     yield worker
     await worker.stop(timeout=5)
src/chicory/broker/rabbitmq.py (1)

782-793: Consider using _ for intentionally unused variable.

The rabbit_queue variable is assigned purely for its side effect (queue declaration). You could use _ to silence the linter and signal intent:

🔎 Suggested fix
-                rabbit_queue = await self._declare_queue(
+                _ = await self._declare_queue(
                     channel, queue, dlq_routing=True, force=True
                 )
tests/integration/test_integration.py (2)

504-523: Task redefinition in loop is safe but could be clearer.

The flaky function is redefined in each loop iteration. This works because:

  1. Each iteration uses a unique task_name
  2. The delay() and get() complete within the same iteration

However, it's somewhat fragile. Consider using a factory function or ensuring the task variable is not reused:

🔎 Suggested improvement for clarity
for backoff in [RetryBackoff.LINEAR, RetryBackoff.EXPONENTIAL, RetryBackoff.FIXED]:
    task_name = f"test.backoff.{backoff}.{test_id}.{uuid.uuid4().hex}"

    # Use a unique function per iteration to avoid confusion
    async def make_flaky_task(ctx: TaskContext) -> str:
        if ctx.retries < 2:
            raise ValueError("Fail")
        return "Success"

    registered_task = app.task(
        name=task_name,
        retry_policy=RetryPolicy(max_retries=2, retry_delay=0.1, backoff=backoff),
    )(make_flaky_task)

    result = await registered_task.delay()
    assert await result.get(timeout=10) == "Success"

594-613: Consider removing redundant assertion.

Line 613 assert True is a no-op. The test implicitly passes if no exception is raised. Either remove it or add a meaningful assertion (e.g., verify the message was moved to DLQ or queue is empty).

🔎 Suggested fix
     # Consume but nack with requeue=False
     async for envelope in chicory_app.broker.consume():
         await chicory_app.broker.nack(envelope, requeue=False)
         break

-    # Just verify it doesn't crash
-    assert True
+    # Verify message was not requeued
+    size = await chicory_app.broker.get_queue_size()
+    assert size == 0
📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between b2fa123 and 82d9181.

⛔ Files ignored due to path filters (1)
  • uv.lock is excluded by !**/*.lock
📒 Files selected for processing (14)
  • .github/workflows/merge-gate.yml
  • .github/workflows/post-merge-coverage.yml
  • .github/workflows/pr-unit-tests.yml
  • .github/workflows/test.yml
  • Makefile
  • pyproject.toml
  • src/chicory/backend/redis.py
  • src/chicory/broker/base.py
  • src/chicory/broker/rabbitmq.py
  • src/chicory/broker/redis.py
  • src/chicory/task.py
  • tests/conftest.py
  • tests/integration/test_integration.py
  • tests/unit/test_task.py
💤 Files with no reviewable changes (1)
  • .github/workflows/test.yml
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-12-29T15:11:37.782Z
Learnt from: dadodimauro
Repo: dadodimauro/chicory PR: 18
File: src/chicory/broker/rabbitmq.py:86-93
Timestamp: 2025-12-29T15:11:37.782Z
Learning: When using aio-pika with a connection pool, acquire a connection from the pool and create a channel from that connection; do not close the connection when releasing it back to the pool. Pool.acquire() returns the connection to the pool for reuse, and RobustConnection objects stay open across acquire/release cycles. This pattern is demonstrated in official aio-pika examples and should be applied to files under src/chicory/broker (e.g., rabbitmq.py).

Applied to files:

  • src/chicory/broker/base.py
  • src/chicory/broker/redis.py
  • src/chicory/broker/rabbitmq.py
🧬 Code graph analysis (4)
src/chicory/broker/base.py (2)
src/chicory/broker/rabbitmq.py (1)
  • purge_dlq (757-766)
src/chicory/broker/redis.py (1)
  • purge_dlq (442-455)
tests/conftest.py (3)
src/chicory/app.py (5)
  • Chicory (26-169)
  • broker (122-123)
  • backend (126-127)
  • connect (159-163)
  • disconnect (165-169)
src/chicory/types.py (2)
  • BackendType (239-244)
  • BrokerType (234-236)
src/chicory/worker.py (2)
  • Worker (37-586)
  • start (87-108)
src/chicory/task.py (2)
tests/unit/test_worker.py (16)
  • fn (118-119)
  • fn (139-140)
  • fn (158-159)
  • fn (213-214)
  • fn (244-245)
  • fn (277-278)
  • fn (310-311)
  • fn (340-341)
  • fn (360-362)
  • fn (379-380)
  • fn (404-405)
  • fn (425-426)
  • fn (445-446)
  • fn (469-470)
  • fn (501-502)
  • fn (543-544)
src/chicory/context.py (1)
  • TaskContext (13-84)
src/chicory/broker/rabbitmq.py (3)
src/chicory/types.py (1)
  • TaskMessage (123-147)
src/chicory/broker/base.py (3)
  • DLQMessage (28-35)
  • purge_dlq (117-119)
  • get_dlq_count (122-124)
src/chicory/broker/redis.py (2)
  • purge_dlq (442-455)
  • get_dlq_count (457-467)
🪛 Ruff (0.14.10)
tests/conftest.py

48-48: Avoid specifying long messages outside the exception class

(TRY003)

tests/integration/test_integration.py

56-56: Unused function argument: clean_queue

(ARG001)


82-82: Unused function argument: clean_queue

(ARG001)


94-94: Avoid specifying long messages outside the exception class

(TRY003)


104-104: Unused function argument: clean_queue

(ARG001)


111-111: Avoid specifying long messages outside the exception class

(TRY003)


146-146: Unused function argument: clean_queue

(ARG001)


264-264: Unused function argument: clean_queue

(ARG001)


292-292: Unused function argument: clean_queue

(ARG001)


308-308: Unused function argument: clean_queue

(ARG001)


355-355: Unused function argument: clean_queue

(ARG001)


404-404: Unused function argument: clean_queue

(ARG001)


458-458: Unused function argument: clean_queue

(ARG001)


471-471: Unused function argument: clean_queue

(ARG001)


506-506: Unused function argument: clean_queue

(ARG001)


577-577: Unused function argument: clean_queue

(ARG001)


596-596: Unused function argument: clean_queue

(ARG001)


618-618: Unused function argument: clean_queue

(ARG001)

src/chicory/broker/rabbitmq.py

567-567: Do not catch blind exception: Exception

(BLE001)


629-629: Do not catch blind exception: Exception

(BLE001)


634-634: Do not catch blind exception: Exception

(BLE001)


660-660: Do not catch blind exception: Exception

(BLE001)


721-721: Do not catch blind exception: Exception

(BLE001)


726-726: Do not catch blind exception: Exception

(BLE001)


763-763: Consider moving this statement to an else block

(TRY300)


764-764: Do not catch blind exception: Exception

(BLE001)


782-782: Local variable rabbit_queue is assigned to but never used

Remove assignment to unused variable rabbit_queue

(F841)

🔇 Additional comments (23)
pyproject.toml (2)

76-76: LGTM!

Adding pytest-xdist enables parallel test execution which aligns well with the new Makefile targets using -n auto.


100-103: LGTM!

The slow and integration markers are well-defined with clear deselect hints, enabling flexible test filtering in CI and local development.

.github/workflows/post-merge-coverage.yml (1)

1-32: LGTM!

The post-merge coverage workflow is well-structured. Running the full test suite on a single Python version (3.14) for coverage upload is sensible since the merge gate already validates across all supported versions.

src/chicory/broker/redis.py (2)

62-72: LGTM! Converting key helpers to static methods.

These methods don't use instance state, so @staticmethod is appropriate. This improves clarity and allows calling them without an instance if needed.


56-60: LGTM! Using aclose() for proper async cleanup.

The switch from close() to aclose() is correct for async Redis client cleanup in redis-py>=7.1.0. This ensures proper async context handling during disconnection, with both the client and connection pool being properly released.

src/chicory/backend/redis.py (2)

22-33: LGTM! Consistent async lifecycle management.

The simplified client initialization and use of aclose() for both client and pool mirrors the broker implementation, maintaining consistency across Redis-based components.


35-50: LGTM! Static method refactoring for key helpers.

Consistent with the broker changes. The _workers_set_key() method correctly takes no parameters since it returns a fixed key.

src/chicory/task.py (3)

73-78: LGTM! Robust context parameter detection.

The helper correctly handles both runtime TaskContext type and string annotation "TaskContext" (from TYPE_CHECKING or from __future__ import annotations).


80-110: LGTM! Improved input model building for string annotations.

Using inspect.signature directly with string annotations avoids issues with typing.get_type_hints() when TYPE_CHECKING imports are used. The arbitrary_types_allowed=True config is necessary to support non-standard types in annotations.


112-130: LGTM! Input validation properly filters context parameters.

The updated _validate_inputs correctly excludes TaskContext parameters before binding args and validating with Pydantic.

.github/workflows/pr-unit-tests.yml (1)

9-29: LGTM!

Running unit tests on Ubuntu-only for PRs provides fast feedback, while the merge-gate workflow ensures full OS/Python matrix coverage before merging.

tests/conftest.py (2)

28-56: Well-structured app setup helper.

The _setup_app function properly configures broker/backend combinations and handles Postgres-specific table creation. A few observations:

  1. Line 47-48: Accessing _engine is acceptable for test infrastructure, though you may want to add a public method to DatabaseBackend for schema initialization.
  2. The function purges both queue and DLQ before returning, ensuring test isolation.

59-72: LGTM!

The parameterized fixture provides good coverage across broker/backend combinations with proper cleanup.

src/chicory/broker/base.py (1)

116-119: LGTM!

The abstract method is well-defined and consistent with the existing interface patterns. The signature matches the concrete implementations in RedisBroker and RabbitMQBroker.

tests/unit/test_task.py (1)

197-216: LGTM!

The test correctly verifies that non-async tasks can be delayed and publish messages to the broker. The setup properly mocks the broker and validates the expected behavior.

src/chicory/broker/rabbitmq.py (4)

94-112: LGTM!

Converting these naming helpers to @staticmethod is appropriate since they don't depend on instance state. This improves testability and makes the pure nature of these functions explicit.


557-575: Robust fallback handling for native DLX messages.

The dual-path parsing logic correctly handles both manually-moved DLQ messages (wrapped in DLQData) and native DLX messages (raw TaskMessage). The fallback defaults are sensible.

The broad except Exception (flagged by static analysis) is acceptable here since you're parsing untrusted message bodies and need to handle any deserialization failure gracefully.


625-635: Consistent fallback pattern.

The nested try-except for parsing DLQData then falling back to TaskMessage is consistent with get_dlq_messages. Setting current_task_id = None on complete parse failure is safe—the message simply won't match.


757-766: LGTM!

The purge_dlq implementation correctly uses RabbitMQ's queue purge API and handles errors gracefully. The pattern matches the Redis implementation.

tests/integration/test_integration.py (4)

32-51: LGTM!

The clean_queue fixture pattern is idiomatic pytest—declaring it as a parameter ensures it runs even though the value isn't used directly. The unique_task_id and unique_task_name fixtures provide good test isolation.


54-78: Good coverage of async and sync task execution.

The test properly verifies both async and sync tasks complete successfully and return expected values. Using uuid.uuid4().hex for task name isolation is a solid pattern.


102-141: Comprehensive DLQ workflow test.

Good coverage of the full DLQ lifecycle: failure → DLQ → replay → delete. The filtering by task_name handles potential parallel test interference well.

Note: The asyncio.sleep(1.0) waits (lines 118, 134) could cause flakiness under load. Consider polling-based waits with timeout for more robust CI runs, but this is acceptable for initial integration tests.


616-629: Good concurrency test.

The test correctly verifies that all 10 tasks complete successfully and return expected values. Results are collected in dispatch order (not completion order), so the assertion is valid even if tasks complete out-of-order internally.

@dadodimauro dadodimauro merged commit 005f805 into main Dec 31, 2025
5 of 6 checks passed
@dadodimauro dadodimauro deleted the integration-tests branch December 31, 2025 00:52
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants