Skip to content

Implement RPUSH for priority messages#75

Open
hmacr wants to merge 3 commits intomainfrom
ser-1137
Open

Implement RPUSH for priority messages#75
hmacr wants to merge 3 commits intomainfrom
ser-1137

Conversation

@hmacr
Copy link
Contributor

@hmacr hmacr commented Mar 17, 2026

Closes SER-1137

Summary by CodeRabbit

  • New Features

    • Added optional priority parameter to queue enqueue functionality, allowing messages to be prioritized in the queue processing order.
  • Tests

    • Added comprehensive unit and end-to-end test suites covering priority enqueuing behavior and queue consumption order.
  • Chores

    • Updated test infrastructure configuration to support unit test autoloading and execution.

@coderabbitai
Copy link

coderabbitai bot commented Mar 17, 2026

Important

Review skipped

Auto reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: b7c00f26-f4af-488d-9559-dfaada7c78db

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review

Walkthrough

This change introduces a priority parameter to the Queue system's enqueue API. The modification adds an optional boolean $priority parameter (default false) to the enqueue method across the Publisher interface and its implementations (AMQP, Pool, and Redis brokers). In the Redis implementation, the priority flag conditionally selects push direction: right-push for priority jobs and left-push for normal jobs. Test configurations are updated to support both E2E and unit tests, with new test files covering priority behavior in Redis brokers and existing test base classes expanded with priority test scenarios.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 5.88% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'Implement RPUSH for priority messages' directly and specifically describes the main change in the changeset—adding priority parameter support with Redis RPUSH logic for priority-ordered message handling.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch ser-1137
📝 Coding Plan
  • Generate coding plan for human review comments

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Tip

You can validate your CodeRabbit configuration file in your editor.

If your editor has YAML language server, you can enable auto-completion and validation by adding # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json at the top of your CodeRabbit configuration file.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/Queue/Broker/AMQP.php (1)

138-149: ⚠️ Potential issue | 🟠 Major

Priority parameter is unused in AMQP enqueue and requires two changes to work.

The $priority parameter is declared but ignored. RabbitMQ message priority requires both queue-side and message-side configuration:

  1. Queue declaration must include x-max-priority (e.g., via broker->setQueueArgument('x-max-priority', '10') before consuming)
  2. Published message must set the priority property based on the parameter
Suggested fix for message publishing
-        $message = new AMQPMessage(json_encode($payload), ['content_type' => 'application/json', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
+        $properties = [
+            'content_type' => 'application/json',
+            'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
+        ];
+        if ($priority) {
+            $properties['priority'] = 9;
+        }
+        $message = new AMQPMessage(json_encode($payload), $properties);

Queue setup is not automatic and must be configured externally (e.g., broker->setQueueArgument('x-max-priority', '10')). Without queue-side configuration, message-side priority is ignored by RabbitMQ.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/Queue/Broker/AMQP.php` around lines 138 - 149, The enqueue method
currently ignores the $priority parameter; update AMQP::enqueue to pass a
numeric priority into the AMQPMessage properties (e.g., include 'priority' =>
(int)$priority or map true to a default priority) when creating AMQPMessage, and
keep using withChannel(...) and basic_publish as-is; also document or enforce
that the broker queue must be declared with the x-max-priority argument (e.g.,
via broker->setQueueArgument('x-max-priority','10') before consuming) because
RabbitMQ ignores message-side priority without that queue-side configuration.
🧹 Nitpick comments (3)
src/Queue/Publisher.php (1)

14-14: Update the method docblock to include $priority.

The signature changed, but the PHPDoc still only documents $queue and $payload.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/Queue/Publisher.php` at line 14, Update the PHPDoc for the enqueue method
to document the new $priority parameter: in Publisher::enqueue add a `@param` bool
$priority description (e.g., whether the message should be treated as high
priority) and keep existing `@param` annotations for Queue $queue and array
$payload plus a `@return` bool annotation; ensure the docblock types match the
method signature and briefly describe $priority's behavior.
tests/Queue/E2E/Adapter/Base.php (1)

89-98: This test validates API acceptance, not priority behavior.

Right now it only asserts successful enqueue. Consider either asserting observable ordering behavior here or renaming it to reflect smoke coverage (e.g., “accepts priority flag”).

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/Queue/E2E/Adapter/Base.php` around lines 89 - 98, The test
testEnqueuePriority currently only checks that
publisher->enqueue($this->getQueue(), ..., priority: true) returns true and
doesn't verify ordering; update it to either assert actual priority behavior by
enqueuing a low-priority item then a high-priority item and dequeueing (use
getPublisher(), enqueue(), getQueue(), and whatever dequeue/consume helper
exists) to assert the high-priority message is received first, or if you only
intend to cover acceptance, rename the test to something like
testAcceptsPriorityFlag and keep the simple enqueue assertion and sleep; pick
one approach and make the corresponding change to the test name and/or add
dequeue assertions that validate ordering.
tests/Queue/E2E/Adapter/RedisPriorityTest.php (1)

53-61: Consider adding defensive assertions before array access.

Unlike line 50, subsequent pops lack assertNotFalse checks. If rightPopArray returns false unexpectedly, the test will fail with an unclear "cannot access array on bool" error rather than a meaningful assertion failure.

♻️ Suggested improvement
         // The remaining three should be normal jobs (consumed oldest-first).
         $second = $this->connection->rightPopArray($key, 1);
+        $this->assertNotFalse($second, 'Expected normal-1 job but queue was empty');
         $this->assertSame('normal-1', $second['payload']['order']);

         $third = $this->connection->rightPopArray($key, 1);
+        $this->assertNotFalse($third, 'Expected normal-2 job but queue was empty');
         $this->assertSame('normal-2', $third['payload']['order']);

         $fourth = $this->connection->rightPopArray($key, 1);
+        $this->assertNotFalse($fourth, 'Expected normal-3 job but queue was empty');
         $this->assertSame('normal-3', $fourth['payload']['order']);
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/Queue/E2E/Adapter/RedisPriorityTest.php` around lines 53 - 61, The test
calls rightPopArray and immediately indexes into the result for $second, $third,
and $fourth; add defensive assertions (e.g., assertNotFalse or assertIsArray)
after each call to rightPopArray($key, 1) to ensure the returned value is not
false before accessing ['payload']['order']. Update the assertions around the
variables $second, $third, and $fourth (and keep consistency with the earlier
check on $first) so failures produce clear assertion messages rather than PHP
type errors.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@tests/Queue/E2E/Adapter/RedisPriorityTest.php`:
- Around line 63-64: The test uses $this->connection->rightPopArray($key, 0)
which blocks indefinitely on an empty queue; change the timeout argument passed
to rightPopArray in this assertion (and anywhere similar, e.g. setUp) to a
non-zero value (e.g. 1) so it returns false immediately on empty queues instead
of blocking; locate calls to rightPopArray and update the second parameter from
0 to a small positive integer.
- Around line 29-33: The drain loop uses $this->connection->rightPopArray($key,
0) which calls brPop and blocks forever on an empty queue; change the timeout
from 0 to a small non-zero value (e.g., 1) so rightPopArray($key, 1) will return
false when no items remain and the loop can exit—update the loop that references
$key and $this->connection->rightPopArray to use a non-zero timeout and ensure
the loop breaks when false is returned.

In `@tests/Queue/Unit/Broker/RedisBrokerTest.php`:
- Around line 1-97: The file fails Pint/PSR-12 formatting; run the PSR-12
formatter (e.g., vendor/bin/pint or php-cs-fixer) on RedisBrokerTest.php or
apply PSR-12 fixes manually: ensure method declarations (setUp,
testEnqueueNormalUsesLeftPush, testEnqueuePriorityFalseUsesLeftPush,
testEnqueuePriorityUsesRightPush, testEnqueuePriorityPayloadHasRequiredFields)
follow PSR-12 spacing and brace rules, fix spacing around visibility, return
types, type hints and closures, and normalize import order/blank lines so the
linter passes.
- Around line 12-19: The $connection property is typed as Connection but holds a
PHPUnit mock, causing static analysis errors when calling mock methods like
expects(); update the property PHPDoc to indicate an intersection type
(Connection&\PHPUnit\Framework\MockObject\MockObject) so static analyzers know
it is both a Connection and a MockObject—add the PHPDoc above the property
(private Connection $connection) and leave the runtime type as-is; no code logic
changes required, only the docblock in the class (referencing the $connection
property and the setUp() method where createMock(Connection::class) is used).

---

Outside diff comments:
In `@src/Queue/Broker/AMQP.php`:
- Around line 138-149: The enqueue method currently ignores the $priority
parameter; update AMQP::enqueue to pass a numeric priority into the AMQPMessage
properties (e.g., include 'priority' => (int)$priority or map true to a default
priority) when creating AMQPMessage, and keep using withChannel(...) and
basic_publish as-is; also document or enforce that the broker queue must be
declared with the x-max-priority argument (e.g., via
broker->setQueueArgument('x-max-priority','10') before consuming) because
RabbitMQ ignores message-side priority without that queue-side configuration.

---

Nitpick comments:
In `@src/Queue/Publisher.php`:
- Line 14: Update the PHPDoc for the enqueue method to document the new
$priority parameter: in Publisher::enqueue add a `@param` bool $priority
description (e.g., whether the message should be treated as high priority) and
keep existing `@param` annotations for Queue $queue and array $payload plus a
`@return` bool annotation; ensure the docblock types match the method signature
and briefly describe $priority's behavior.

In `@tests/Queue/E2E/Adapter/Base.php`:
- Around line 89-98: The test testEnqueuePriority currently only checks that
publisher->enqueue($this->getQueue(), ..., priority: true) returns true and
doesn't verify ordering; update it to either assert actual priority behavior by
enqueuing a low-priority item then a high-priority item and dequeueing (use
getPublisher(), enqueue(), getQueue(), and whatever dequeue/consume helper
exists) to assert the high-priority message is received first, or if you only
intend to cover acceptance, rename the test to something like
testAcceptsPriorityFlag and keep the simple enqueue assertion and sleep; pick
one approach and make the corresponding change to the test name and/or add
dequeue assertions that validate ordering.

In `@tests/Queue/E2E/Adapter/RedisPriorityTest.php`:
- Around line 53-61: The test calls rightPopArray and immediately indexes into
the result for $second, $third, and $fourth; add defensive assertions (e.g.,
assertNotFalse or assertIsArray) after each call to rightPopArray($key, 1) to
ensure the returned value is not false before accessing ['payload']['order'].
Update the assertions around the variables $second, $third, and $fourth (and
keep consistency with the earlier check on $first) so failures produce clear
assertion messages rather than PHP type errors.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: bcd07776-c717-492a-bb12-a4819dfaa92d

📥 Commits

Reviewing files that changed from the base of the PR and between ffdc931 and fd9637f.

📒 Files selected for processing (9)
  • composer.json
  • phpunit.xml
  • src/Queue/Broker/AMQP.php
  • src/Queue/Broker/Pool.php
  • src/Queue/Broker/Redis.php
  • src/Queue/Publisher.php
  • tests/Queue/E2E/Adapter/Base.php
  • tests/Queue/E2E/Adapter/RedisPriorityTest.php
  • tests/Queue/Unit/Broker/RedisBrokerTest.php

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant