Skip to content

Conversation

@dadodimauro
Copy link
Collaborator

@dadodimauro dadodimauro commented Dec 29, 2025

  • Add rabbitmq broker
  • Use abc instead of Protocols for Broker and Backend classes
  • Fix ack behaviour for redis broker

Summary by CodeRabbit

  • New Features

    • RabbitMQ backend added with DLQ support, message priority, and delayed/ETA scheduling; new RabbitMQ example demonstrating retries, scheduling, batching, and DLQ workflows.
  • Infrastructure

    • Local docker-compose now includes RabbitMQ with management UI.
  • Packaging

    • Optional RabbitMQ dependency group added to the project package config.
  • Bug Fixes

    • AT_MOST_ONCE delivery now acknowledges after processing.
  • Refactor

    • Task context retry/fail API changed from async to sync.
  • Tests

    • Unit tests expanded/parameterized to cover Redis and RabbitMQ backends.

✏️ Tip: You can customize this high-level summary in your review settings.

@dadodimauro dadodimauro requested a review from anvouk as a code owner December 29, 2025 01:44
@dadodimauro dadodimauro linked an issue Dec 29, 2025 that may be closed by this pull request
@coderabbitai
Copy link

coderabbitai bot commented Dec 29, 2025

📝 Walkthrough

Walkthrough

This PR adds RabbitMQ broker support (new Broker implementation, config, types, and examples), converts Broker/Backend interfaces from Protocol to ABCs, updates app broker creation, adjusts Redis and worker behaviors, parametrizes tests for RabbitMQ, and updates CI/dev tooling (docker-compose, pyproject).

Changes

Cohort / File(s) Summary
RabbitMQ broker & DLQ
src/chicory/broker/rabbitmq.py
New RabbitMQBroker and DLQData with connection/channel pools, delayed-message TTL→DLX handling, publish/consume/ack/nack, DLQ ops (get/replay/delete/count), queue helpers, healthcheck, and consumer lifecycle management.
Broker interface refactor
src/chicory/broker/base.py
Broker changed from ProtocolABC; all methods @abstractmethod; added _raw_message to TaskEnvelope; replay_from_dlq(..., reset_retries: bool = True) signature added.
Backend interface refactor
src/chicory/backend/base.py
Backend changed from ProtocolABC; interface methods marked @abstractmethod with ellipsis bodies; runtime_checkable removed.
Config, types, app wiring
src/chicory/config.py, src/chicory/types.py, src/chicory/app.py, src/chicory/broker/__init__.py
New RabbitMQBrokerConfig (DSN/AMQP fields, pooling, TTLs, DLQ limits); BrokerConfig.rabbitmq added; BrokerType.RABBITMQ enum and TaskMessage.priority (0–255) added; RabbitMQBroker exported and app _create_broker handles RabbitMQ.
Redis broker tweaks
src/chicory/broker/redis.py
disconnect now calls self.stop(); consume no longer auto-acks in AT_MOST_ONCE path (delivers without upfront xack).
Context API change
src/chicory/context.py
TaskContext.retry and TaskContext.fail changed from async to sync methods (signatures updated).
Worker comments
src/chicory/worker.py
Clarifying comments around AT_LEAST_ONCE ack timing and TODO for outputs validation; no behavior change.
Examples & compose
examples/rabbitmq_example.py, examples/redis_example.py, docker-compose.yml
New comprehensive RabbitMQ example script; small Redis example adjustments; docker-compose.yml adds rabbitmq:4.2-management service, healthcheck, ports 5672, 15672, and volumes rabbitmq-lib, rabbitmq-log.
Project metadata & deps
pyproject.toml
Added Python classifiers; new optional rabbitmq extras with aio-pika>=9.5.8; re-enabled TD003 Ruff check.
Tests
tests/unit/test_app.py, tests/unit/test_context.py
Parametrized test_app tests across BrokerType.REDIS and BrokerType.RABBITMQ; test_context tests converted from async to sync to match context API changes; imports updated to public Broker/Backend types.

Sequence Diagram(s)

sequenceDiagram
    actor Client
    participant RabbitMQBroker
    participant RabbitMQ
    participant Worker

    rect rgb(200,220,255)
    Note over Client,RabbitMQBroker: Publish immediate or ETA
    Client->>RabbitMQBroker: publish(TaskMessage, queue)
    alt ETA present (delayed)
        RabbitMQBroker->>RabbitMQ: publish to TTL queue (per-message TTL)
        RabbitMQ->>RabbitMQ: message TTL expires -> DLX -> main queue
    else immediate
        RabbitMQBroker->>RabbitMQ: publish to main queue
    end
    RabbitMQ->>RabbitMQBroker: publish ack
    end

    rect rgb(220,240,200)
    Note over Worker,RabbitMQBroker: Consume & process
    Worker->>RabbitMQBroker: consume(queue)
    RabbitMQBroker->>RabbitMQ: get next message
    RabbitMQ->>RabbitMQBroker: deliver raw message
    RabbitMQBroker->>Worker: yield TaskEnvelope (includes _raw_message)
    Worker->>Worker: execute task
    alt success & AT_LEAST_ONCE
        Worker->>RabbitMQBroker: ack(envelope)
        RabbitMQBroker->>RabbitMQ: acknowledge
    else failure
        Worker->>RabbitMQBroker: move_to_dlq(envelope, error)
        RabbitMQBroker->>RabbitMQ: publish DLQ payload
    end
    end
Loading
sequenceDiagram
    actor Operator
    participant RabbitMQBroker
    participant DLQ
    participant RabbitMQ

    rect rgb(240,220,220)
    Note over Operator,RabbitMQBroker: Replay from DLQ
    Operator->>RabbitMQBroker: replay_from_dlq(message_id, reset_retries=true)
    RabbitMQBroker->>DLQ: fetch DLQ message
    DLQ->>RabbitMQBroker: DLQData
    RabbitMQBroker->>RabbitMQ: republish to main queue (optionally reset retries)
    RabbitMQ->>RabbitMQBroker: publish ack
    RabbitMQBroker->>DLQ: ack/delete DLQ message
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

Possibly related PRs

Poem

🐰 I hopped into queues both new and old,

Built tunnels where delayed messages hold,
DLQs to catch the hops that fail,
Pools and configs set the trail,
Now tasks can leap — rabbit-approved and bold!

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 75.70% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'feat(broker): add rabbitmq broker' accurately describes the primary change—adding RabbitMQ broker support—which is the main feature introduced in this pull request.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch 3-rabbitmq-broker

Comment @coderabbitai help to get the list of available commands and usage tips.

@dadodimauro dadodimauro self-assigned this Dec 29, 2025
@dadodimauro dadodimauro added the enhancement New feature or request label Dec 29, 2025
@dadodimauro dadodimauro added this to the v1.0.0 milestone Dec 29, 2025
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
examples/redis_example.py (1)

414-424: Update stale module reference in logging message.

Line 422 still references examples.base:app, but the docstring at line 23 has been updated to reference examples.redis:app. This inconsistency can confuse users running the example.

🔎 Proposed fix
     logger.info("=" * 60)
     logger.info("CHICORY TASK QUEUE - COMPREHENSIVE DEMO")
     logger.info("=" * 60)
     logger.info("\nMake sure you have:")
     logger.info("  1. Redis running on localhost:6379")
     logger.info(
-        "  2. Worker running: chicory worker examples.base:app --use-dead-letter-queue"
+        "  2. Worker running: chicory worker examples.redis:app --dlq"
     )
     logger.info("\n" + "=" * 60)
tests/unit/test_app.py (1)

102-128: Test method missing test_ prefix.

The method task_registration_with_options at line 106 is missing the test_ prefix, so pytest will not discover and run it.

🔎 Proposed fix
     @pytest.mark.parametrize(
         "broker_type",
         [BrokerType.REDIS, BrokerType.RABBITMQ],
     )
-    def task_registration_with_options(self, broker_type: BrokerType) -> None:
+    def test_task_registration_with_options(self, broker_type: BrokerType) -> None:
         app = Chicory(broker=broker_type)
🧹 Nitpick comments (7)
src/chicory/broker/base.py (1)

121-122: Add docstring for consistency.

get_pending_count is missing a docstring, unlike other abstract methods. Consider adding one for consistency.

🔎 Proposed fix
     @abstractmethod
-    async def get_pending_count(self, queue: str = DEFAULT_QUEUE) -> int: ...
+    async def get_pending_count(self, queue: str = DEFAULT_QUEUE) -> int:
+        """Get number of pending (unacknowledged) messages."""
+        ...
examples/rabbitmq_example.py (2)

179-183: Unused scheduled_time parameter.

The scheduled_time parameter is never used in the function body. Either use it in the log message or remove it.

🔎 Proposed fix - use the parameter
 @app.task(name="examples.scheduled_reminder")
 async def scheduled_reminder(message: str, scheduled_time: str) -> str:
     """Task that can be scheduled for future execution."""
-    logger.info(f"Reminder triggered at {datetime.now(UTC)}: {message}")
+    logger.info(f"Reminder triggered at {datetime.now(UTC)} (scheduled for {scheduled_time}): {message}")
     return f"Reminder sent: {message}"

231-240: Unused metadata parameter.

The metadata parameter is declared but never used. Consider logging it for demonstration purposes.

🔎 Proposed fix
 async def log_analytics_event(
     event_type: str, user_id: int, metadata: dict[str, Any]
 ) -> None:
     """
     Fire-and-forget analytics logging.
     Result not stored, best-effort delivery.
     """
-    logger.info(f"Analytics: {event_type} for user {user_id}")
+    logger.info(f"Analytics: {event_type} for user {user_id}, metadata={metadata}")
     # In production: send to analytics service, logging system, etc.
     await asyncio.sleep(0.05)
src/chicory/broker/rabbitmq.py (4)

218-230: Remove unused variable and add exception chaining.

The channel variable in the _ping method is unused. Additionally, the RuntimeError should use raise ... from exc for proper exception chaining.

🔎 Proposed fix
     async def _ping(self) -> None:
         """Verify broker connectivity."""
         if not self._channel_pool:
             raise RuntimeError("Channel pool is not initialized")
 
         try:
             async with asyncio.timeout(5.0):
-                async with self._channel_pool.acquire() as channel:
+                async with self._channel_pool.acquire():
                     pass
         except TimeoutError as exc:
             raise TimeoutError("Timeout while connecting to RabbitMQ") from exc
         except Exception as exc:
             raise RuntimeError(f"Failed to connect to RabbitMQ: {exc}") from exc

296-312: Add exception chaining to RuntimeError.

When raising RuntimeError from a TimeoutError, chain the exception for better debugging.

🔎 Proposed fix
         try:
             async with asyncio.timeout(
                 self.channel_acquire_timeout
             ):  # Prevent deadlock
                 async with self._channel_pool.acquire() as channel:
                     yield channel
-        except TimeoutError:
+        except TimeoutError as exc:
             logger.error("Timeout acquiring channel from pool")
-            raise RuntimeError("Channel pool exhausted or deadlocked")
+            raise RuntimeError("Channel pool exhausted or deadlocked") from exc

374-429: Broad exception handling is acceptable but consider narrowing.

The broad except Exception blocks at lines 408 and 420 are intentional for resilience. However, consider catching more specific exceptions (e.g., aio_pika.AMQPException, pydantic.ValidationError) to avoid masking unexpected errors. The current logging is appropriate.


718-738: Remove unused variable.

The rabbit_queue variable at line 726 is assigned but never used.

🔎 Proposed fix
     async def get_pending_count(self, queue: str = DEFAULT_QUEUE) -> int:
         """
         Get the number of pending (unacknowledged) messages.
 
         Note: In RabbitMQ, this represents messages that are unacked by consumers.
         """
         async with self._acquire_channel() as channel:
             try:
-                rabbit_queue = await self._declare_queue(channel, queue)
-                # TODO @dadodimauro:  # noqa: TD003
+                await self._declare_queue(channel, queue)
                 # RabbitMQ doesn't expose unacked count via AMQP directly.
                 # This returns consumer count as a proxy - real implementation
                 # would need the Management API.
-                # For now, we return 0 and log that Management API is needed.
                 logger.debug(
                     "get_pending_count requires RabbitMQ Management API for accuracy"
                 )
                 return 0
📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between acde518 and d7c30a9.

⛔ Files ignored due to path filters (1)
  • uv.lock is excluded by !**/*.lock
📒 Files selected for processing (14)
  • docker-compose.yml
  • examples/rabbitmq_example.py
  • examples/redis_example.py
  • pyproject.toml
  • src/chicory/app.py
  • src/chicory/backend/base.py
  • src/chicory/broker/__init__.py
  • src/chicory/broker/base.py
  • src/chicory/broker/rabbitmq.py
  • src/chicory/broker/redis.py
  • src/chicory/config.py
  • src/chicory/types.py
  • src/chicory/worker.py
  • tests/unit/test_app.py
🧰 Additional context used
🧬 Code graph analysis (8)
src/chicory/backend/base.py (4)
src/chicory/types.py (3)
  • BackendStatus (202-202)
  • TaskState (15-21)
  • WorkerStats (205-231)
src/chicory/app.py (2)
  • connect (138-142)
  • disconnect (144-148)
src/chicory/broker/base.py (3)
  • connect (42-44)
  • disconnect (47-49)
  • healthcheck (139-141)
tests/unit/test_cli.py (2)
  • connect (36-37)
  • disconnect (39-40)
src/chicory/app.py (2)
src/chicory/broker/rabbitmq.py (1)
  • RabbitMQBroker (41-782)
src/chicory/types.py (1)
  • BrokerType (234-236)
src/chicory/broker/redis.py (3)
src/chicory/broker/rabbitmq.py (1)
  • stop (451-453)
src/chicory/worker.py (1)
  • stop (110-161)
src/chicory/broker/base.py (1)
  • stop (74-76)
src/chicory/broker/rabbitmq.py (3)
src/chicory/types.py (2)
  • DeliveryMode (24-26)
  • TaskMessage (123-147)
src/chicory/broker/base.py (11)
  • Broker (38-141)
  • DLQMessage (28-35)
  • TaskEnvelope (18-24)
  • publish (52-54)
  • connect (42-44)
  • disconnect (47-49)
  • stop (74-76)
  • consume (57-59)
  • ack (62-64)
  • nack (67-71)
  • move_to_dlq (79-86)
src/chicory/config.py (3)
  • dsn (97-110)
  • dsn (220-233)
  • dsn (275-288)
src/chicory/broker/base.py (5)
src/chicory/broker/redis.py (5)
  • connect (44-51)
  • disconnect (53-61)
  • publish (89-110)
  • ack (231-238)
  • healthcheck (526-535)
src/chicory/app.py (2)
  • connect (138-142)
  • disconnect (144-148)
src/chicory/backend/base.py (3)
  • connect (14-16)
  • disconnect (19-21)
  • healthcheck (66-68)
tests/unit/test_cli.py (2)
  • connect (36-37)
  • disconnect (39-40)
src/chicory/types.py (1)
  • TaskMessage (123-147)
src/chicory/broker/__init__.py (1)
src/chicory/broker/rabbitmq.py (1)
  • RabbitMQBroker (41-782)
examples/rabbitmq_example.py (5)
src/chicory/result.py (2)
  • AsyncResult (13-71)
  • get (28-53)
src/chicory/types.py (3)
  • RetryBackoff (36-41)
  • RetryPolicy (44-120)
  • TaskMessage (123-147)
src/chicory/app.py (2)
  • Chicory (26-148)
  • task (108-130)
src/chicory/context.py (4)
  • TaskContext (13-84)
  • remaining_retries (79-84)
  • retry (22-64)
  • fail (66-68)
src/chicory/task.py (2)
  • delay (108-130)
  • send (132-139)
tests/unit/test_app.py (6)
src/chicory/app.py (6)
  • backend (105-106)
  • broker (101-102)
  • Chicory (26-148)
  • get_task (132-136)
  • connect (138-142)
  • disconnect (144-148)
src/chicory/backend/base.py (3)
  • Backend (10-68)
  • connect (14-16)
  • disconnect (19-21)
src/chicory/broker/base.py (3)
  • Broker (38-141)
  • connect (42-44)
  • disconnect (47-49)
src/chicory/types.py (2)
  • BrokerType (234-236)
  • BackendType (239-240)
src/chicory/exceptions.py (1)
  • TaskNotFoundError (10-13)
tests/unit/test_cli.py (2)
  • connect (36-37)
  • disconnect (39-40)
🪛 Ruff (0.14.10)
src/chicory/broker/rabbitmq.py

89-89: Avoid specifying long messages outside the exception class

(TRY003)


221-221: Avoid specifying long messages outside the exception class

(TRY003)


225-225: Local variable channel is assigned to but never used

Remove assignment to unused variable channel

(F841)


228-228: Avoid specifying long messages outside the exception class

(TRY003)


230-230: Avoid specifying long messages outside the exception class

(TRY003)


257-257: Unused noqa directive (non-enabled: TD003)

Remove unused noqa directive

(RUF100)


302-302: Avoid specifying long messages outside the exception class

(TRY003)


312-312: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)


312-312: Avoid specifying long messages outside the exception class

(TRY003)


408-408: Do not catch blind exception: Exception

(BLE001)


420-420: Do not catch blind exception: Exception

(BLE001)


431-431: Unused method argument: queue

(ARG002)


436-436: Avoid specifying long messages outside the exception class

(TRY003)


441-441: Unused method argument: queue

(ARG002)


447-447: Avoid specifying long messages outside the exception class

(TRY003)


473-473: Consider moving this statement to an else block

(TRY300)


474-474: Do not catch blind exception: Exception

(BLE001)


570-570: Do not catch blind exception: Exception

(BLE001)


646-646: Consider moving this statement to an else block

(TRY300)


648-648: Do not catch blind exception: Exception

(BLE001)


696-696: Consider moving this statement to an else block

(TRY300)


698-698: Do not catch blind exception: Exception

(BLE001)


715-715: Do not catch blind exception: Exception

(BLE001)


726-726: Local variable rabbit_queue is assigned to but never used

Remove assignment to unused variable rabbit_queue

(F841)


727-727: Unused noqa directive (non-enabled: TD003)

Remove unused noqa directive

(RUF100)


735-735: Consider moving this statement to an else block

(TRY300)


736-736: Do not catch blind exception: Exception

(BLE001)


745-745: Consider moving this statement to an else block

(TRY300)


746-746: Do not catch blind exception: Exception

(BLE001)


755-755: Consider moving this statement to an else block

(TRY300)


756-756: Do not catch blind exception: Exception

(BLE001)


766-766: Consider moving this statement to an else block

(TRY300)


767-767: Do not catch blind exception: Exception

(BLE001)


781-781: Do not catch blind exception: Exception

(BLE001)

examples/rabbitmq_example.py

111-111: Avoid specifying long messages outside the exception class

(TRY003)


167-167: Avoid specifying long messages outside the exception class

(TRY003)


169-169: Avoid specifying long messages outside the exception class

(TRY003)


180-180: Unused function argument: scheduled_time

(ARG001)


232-232: Unused function argument: metadata

(ARG001)


260-260: Avoid specifying long messages outside the exception class

(TRY003)

🔇 Additional comments (33)
src/chicory/broker/redis.py (1)

53-61: LGTM: Graceful shutdown sequence.

Calling self.stop() before closing connections ensures the consume loop stops gracefully before cleanup. This aligns with the worker shutdown sequence in src/chicory/worker.py (line 127) where broker.stop() is called before waiting for tasks to complete.

pyproject.toml (2)

21-27: LGTM: Improved package metadata.

The additional classifiers improve package discoverability and accurately describe the implementation and typing support.


41-43: No action needed. The aio-pika>=9.5.8 dependency is valid and secure. Version 9.5.8 is the latest stable release with no known security vulnerabilities.

src/chicory/app.py (2)

7-7: LGTM: RabbitMQ broker import.

The import correctly adds RabbitMQBroker to the broker factory.


81-85: LGTM: Consistent broker factory pattern.

The RabbitMQ broker initialization follows the same pattern as the Redis broker, using the appropriate config and delivery mode.

src/chicory/types.py (2)

134-140: LGTM: RabbitMQ priority queue support.

The priority field is correctly constrained to RabbitMQ's supported range (0-255) and appropriately optional. This enables priority queue functionality without breaking existing Redis-based workflows.


234-236: LGTM: Broker type extension.

The RABBITMQ enum member correctly extends broker type options.

docker-compose.yml (2)

17-29: LGTM: Standard RabbitMQ service configuration.

The service configuration follows RabbitMQ best practices:

  • Uses official rabbitmq:4.2-management image
  • Exposes AMQP (5672) and management UI (15672) ports
  • Includes appropriate healthcheck using rabbitmq-diagnostics
  • Persists data via volumes

31-35: LGTM: Data persistence volumes.

The volumes ensure RabbitMQ data and logs persist across container restarts.

src/chicory/broker/__init__.py (1)

1-7: LGTM: Public API extension.

The RabbitMQBroker is correctly imported and exposed in the public API alongside the existing broker implementations.

src/chicory/worker.py (2)

315-317: LGTM: Clarified acknowledgment semantics.

The comment clearly documents that AT_LEAST_ONCE acknowledgment occurs only after successful processing, which is the correct semantic guarantee.


380-382: LGTM: TODO comment with issue reference.

The TODO now includes a proper issue reference, complying with the TD003 linting rule enabled in this PR.

src/chicory/backend/base.py (1)

1-68: LGTM: Protocol to ABC conversion.

The change from Protocol to ABC is architecturally sound and aligns with the broader refactoring mentioned in the PR description. This enforces explicit inheritance and ensures concrete implementations must implement all abstract methods. All backend implementations (RedisBackend) have been properly updated to inherit from Backend and implement all 11 abstract methods. For a 0.1.0 version, this breaking change is acceptable.

src/chicory/config.py (5)

6-13: LGTM!

Import updates are clean and consistent with existing patterns. The addition of AmqpDsn for RabbitMQ URL validation follows the established RedisDsn pattern.


113-134: LGTM!

The RabbitMQBrokerConfig class follows the established RedisBrokerConfig pattern with appropriate environment variable prefix and sensible defaults for host, port, username, password, vhost, and URL.


135-210: Well-structured configuration options.

The pool sizes, prefetch count, queue limits, TTLs, and other settings provide good configurability for production deployments. The field constraints (ge, gt, le) are appropriate.


345-346: LGTM!

The rabbitmq field addition to BrokerConfig mirrors the existing redis field pattern.


212-233: No action required—vhost handling in DSN construction is correct.

When self.vhost is None, AmqpDsn.build() receives path=None, which omits the path from the URL. This is the correct approach: RabbitMQ treats an absent vhost path as the default vhost (/), so allowing the path to be omitted when vhost is None enables the expected default behavior.

src/chicory/broker/base.py (4)

3-5: LGTM!

Clean migration from Protocol to ABC with proper imports.


17-24: LGTM!

The _raw_message field appropriately uses the underscore prefix to indicate it's for internal broker use. The object | None type is sufficiently flexible for different broker implementations.


38-76: LGTM!

The core broker methods (connect, disconnect, publish, consume, ack, nack, stop) are properly declared as abstract with clear docstrings.


78-141: LGTM!

DLQ operations and queue introspection methods are well-defined with appropriate signatures and docstrings.

examples/rabbitmq_example.py (3)

1-51: LGTM!

Clear documentation header explaining prerequisites and usage. The imports and logging setup are appropriate for an example file.


57-61: LGTM!

Clean app initialization with RabbitMQ broker and Redis backend.


269-456: LGTM!

The demo functions provide comprehensive coverage of task queue features including basic tasks, retry policies, scheduled tasks, batch processing, DLQ operations, and monitoring.

tests/unit/test_app.py (4)

10-11: LGTM!

Import updates correctly include the new broker abstractions (Broker, RabbitMQBroker, RedisBroker).


17-31: LGTM!

Good addition of BrokerType.RABBITMQ to the broker creation test parametrization.


131-154: LGTM!

Task retrieval tests are properly parametrized for both broker types.


157-227: LGTM!

Connect/disconnect tests are well-structured with proper mocking and broker type parametrization.

src/chicory/broker/rabbitmq.py (4)

30-38: LGTM!

The DLQData model cleanly captures all necessary metadata for DLQ entries.


44-81: LGTM!

Well-organized initialization with all configuration options properly stored. The dual pool architecture (connection pool for publishing, dedicated connection for consuming) is a sensible design.


771-782: LGTM!

The healthcheck implementation correctly uses _ping and returns appropriate BrokerStatus for different failure modes.


431-449: Queue parameter in ack/nack is required by the base Broker interface.

The queue parameter is unused in RabbitMQ's ack and nack methods, but it must be included because the base class defines this signature. Unlike RabbitMQ (which uses channel/delivery_tag), Redis requires the queue parameter to identify the stream (see redis.py lines 235, 247). The parameter is acceptable here and does not need to be removed.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (2)
examples/rabbitmq_example.py (1)

420-424: Inconsistent worker command in documentation.

Line 423 references examples.base:app but should reference examples.rabbitmq_example:app to match the header documentation at line 24 and point to this RabbitMQ example.

🔎 Proposed fix
     logger.info("\nMake sure you have:")
     logger.info("  1. Redis running on localhost:6379")
+    logger.info("  2. RabbitMQ running on localhost:5672")
     logger.info(
-        "  2. Worker running: chicory worker examples.base:app --use-dead-letter-queue"
+        "  3. Worker running: chicory worker examples.rabbitmq_example:app --dlq"
     )
src/chicory/broker/rabbitmq.py (1)

243-267: Convert expiration to milliseconds for RabbitMQ TTL.

The expiration parameter at line 257 is set to delay_seconds (a float in seconds), but RabbitMQ and aio-pika expect the TTL in milliseconds as an integer. This will cause delayed messages to be processed at incorrect times.

🔎 Proposed fix
     delay_seconds = (message.eta - datetime.now(UTC)).total_seconds()

+    # Convert to milliseconds for RabbitMQ TTL
+    expiration_ms = int(delay_seconds * 1000)
+
     await self._declare_delayed_infrastructure(channel, queue)

     # Publish with per-message TTL
     await channel.default_exchange.publish(
         aio_pika.Message(
             body=data,
             content_type="application/json",
             delivery_mode=aio_pika.DeliveryMode.PERSISTENT
             if self.durable_queues
             else aio_pika.DeliveryMode.NOT_PERSISTENT,
             message_id=message.id,
             timestamp=datetime.now(UTC),
-            expiration=delay_seconds,  # TTL for this specific message
+            expiration=expiration_ms,  # TTL in milliseconds
             priority=message.priority,

Based on learnings, aio-pika will automatically convert the int to the required string format via encode_expiration().

🧹 Nitpick comments (1)
src/chicory/broker/rabbitmq.py (1)

728-728: Remove unused noqa directive.

The # noqa: TD003 directive is unused since the TD003 rule (TODO formatting) is not enabled. The comment can be simplified.

🔎 Proposed fix
-                # TODO @dadodimauro:  # noqa: TD003
+                # TODO @dadodimauro:
                 # RabbitMQ doesn't expose unacked count via AMQP directly.
📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between d7c30a9 and 7c3cc95.

📒 Files selected for processing (6)
  • examples/rabbitmq_example.py
  • examples/redis_example.py
  • src/chicory/broker/rabbitmq.py
  • src/chicory/context.py
  • tests/unit/test_app.py
  • tests/unit/test_context.py
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-12-29T15:11:37.782Z
Learnt from: dadodimauro
Repo: dadodimauro/chicory PR: 18
File: src/chicory/broker/rabbitmq.py:86-93
Timestamp: 2025-12-29T15:11:37.782Z
Learning: When using aio-pika with a connection pool, acquire a connection from the pool and create a channel from that connection; do not close the connection when releasing it back to the pool. Pool.acquire() returns the connection to the pool for reuse, and RobustConnection objects stay open across acquire/release cycles. This pattern is demonstrated in official aio-pika examples and should be applied to files under src/chicory/broker (e.g., rabbitmq.py).

Applied to files:

  • src/chicory/broker/rabbitmq.py
🧬 Code graph analysis (4)
tests/unit/test_context.py (2)
src/chicory/context.py (3)
  • TaskContext (13-84)
  • retry (22-64)
  • fail (66-68)
src/chicory/exceptions.py (1)
  • RetryError (22-37)
examples/redis_example.py (1)
src/chicory/context.py (2)
  • retry (22-64)
  • fail (66-68)
src/chicory/broker/rabbitmq.py (3)
src/chicory/types.py (2)
  • DeliveryMode (24-26)
  • TaskMessage (123-147)
src/chicory/broker/base.py (5)
  • Broker (38-141)
  • DLQMessage (28-35)
  • TaskEnvelope (18-24)
  • ack (62-64)
  • nack (67-71)
src/chicory/config.py (3)
  • dsn (97-110)
  • dsn (220-233)
  • dsn (275-288)
examples/rabbitmq_example.py (5)
src/chicory/result.py (2)
  • AsyncResult (13-71)
  • get (28-53)
src/chicory/types.py (4)
  • RetryBackoff (36-41)
  • RetryPolicy (44-120)
  • ValidationMode (29-33)
  • TaskMessage (123-147)
src/chicory/context.py (4)
  • TaskContext (13-84)
  • remaining_retries (79-84)
  • retry (22-64)
  • fail (66-68)
src/chicory/task.py (2)
  • delay (108-130)
  • send (132-139)
src/chicory/broker/rabbitmq.py (3)
  • publish (339-371)
  • get_dlq_count (709-717)
  • get_dlq_messages (529-578)
🪛 Ruff (0.14.10)
src/chicory/broker/rabbitmq.py

89-89: Avoid specifying long messages outside the exception class

(TRY003)


221-221: Avoid specifying long messages outside the exception class

(TRY003)


225-225: Local variable channel is assigned to but never used

Remove assignment to unused variable channel

(F841)


228-228: Avoid specifying long messages outside the exception class

(TRY003)


230-230: Avoid specifying long messages outside the exception class

(TRY003)


301-301: Avoid specifying long messages outside the exception class

(TRY003)


311-311: Avoid specifying long messages outside the exception class

(TRY003)


407-407: Do not catch blind exception: Exception

(BLE001)


419-419: Do not catch blind exception: Exception

(BLE001)


430-430: Unused method argument: queue

(ARG002)


435-435: Avoid specifying long messages outside the exception class

(TRY003)


440-440: Unused method argument: queue

(ARG002)


446-446: Avoid specifying long messages outside the exception class

(TRY003)


472-472: Consider moving this statement to an else block

(TRY300)


473-473: Do not catch blind exception: Exception

(BLE001)


569-569: Do not catch blind exception: Exception

(BLE001)


646-646: Consider moving this statement to an else block

(TRY300)


648-648: Do not catch blind exception: Exception

(BLE001)


697-697: Consider moving this statement to an else block

(TRY300)


699-699: Do not catch blind exception: Exception

(BLE001)


716-716: Do not catch blind exception: Exception

(BLE001)


727-727: Local variable rabbit_queue is assigned to but never used

Remove assignment to unused variable rabbit_queue

(F841)


728-728: Unused noqa directive (non-enabled: TD003)

Remove unused noqa directive

(RUF100)


736-736: Consider moving this statement to an else block

(TRY300)


737-737: Do not catch blind exception: Exception

(BLE001)


746-746: Consider moving this statement to an else block

(TRY300)


747-747: Do not catch blind exception: Exception

(BLE001)


756-756: Consider moving this statement to an else block

(TRY300)


757-757: Do not catch blind exception: Exception

(BLE001)


767-767: Consider moving this statement to an else block

(TRY300)


768-768: Do not catch blind exception: Exception

(BLE001)


782-782: Do not catch blind exception: Exception

(BLE001)

examples/rabbitmq_example.py

111-111: Avoid specifying long messages outside the exception class

(TRY003)


167-167: Avoid specifying long messages outside the exception class

(TRY003)


169-169: Avoid specifying long messages outside the exception class

(TRY003)


180-180: Unused function argument: scheduled_time

(ARG001)


232-232: Unused function argument: metadata

(ARG001)


260-260: Avoid specifying long messages outside the exception class

(TRY003)

🔇 Additional comments (5)
examples/redis_example.py (1)

23-26: LGTM! Correctly updated for synchronous context methods.

The removal of await keywords from ctx.retry() and ctx.fail() correctly reflects the conversion of these methods from async to sync in TaskContext. The usage instructions are also appropriately updated to reference the Redis example paths.

Also applies to: 139-142

src/chicory/context.py (1)

22-68: LGTM! Clean conversion to synchronous methods.

Converting retry() and fail() from async to synchronous methods is logical since both immediately raise exceptions rather than performing I/O operations. This simplifies the API and removes unnecessary await keywords from calling code.

Note: This is a breaking API change for existing code that awaits these methods, but the PR demonstrates that all internal callers have been updated accordingly.

tests/unit/test_context.py (1)

11-137: LGTM! Tests correctly updated for synchronous context methods.

All test methods have been properly converted to synchronous functions, removing await keywords from context.retry() and context.fail() calls. The test scenarios and assertions remain unchanged, maintaining coverage while aligning with the updated API.

tests/unit/test_app.py (1)

10-11: LGTM! Excellent test coverage for RabbitMQ broker.

The test suite has been properly expanded to cover both Redis and RabbitMQ brokers through parameterization. This ensures consistent behavior verification across both broker implementations for task registration, retrieval, and connection lifecycle operations.

Also applies to: 17-31, 87-128, 132-154, 159-227

src/chicory/broker/rabbitmq.py (1)

41-93: Well-structured RabbitMQ broker implementation.

The implementation follows best practices:

  • Correct aio-pika connection/channel pooling pattern (as per learnings)
  • Proper separation of publisher and consumer channels
  • Robust error handling with reconnection logic
  • DLQ operations correctly use task_id for message lookup
  • Delayed message infrastructure using TTL + DLX pattern

The broad exception catches in error handling paths are acceptable for maintaining broker resilience.

Based on learnings, the connection pool pattern where channels are created from pooled connections is the official aio-pika approach, and RobustConnection objects remain open across acquire/release cycles.

Also applies to: 110-217, 269-338, 373-528, 529-783

@dadodimauro dadodimauro merged commit 744a318 into main Dec 29, 2025
15 checks passed
@dadodimauro dadodimauro deleted the 3-rabbitmq-broker branch December 29, 2025 15:24
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

RabbitMQ broker

2 participants