pubsub: send message receipts#2580
pubsub: send message receipts#2580pongad merged 6 commits intogoogleapis:masterfrom pongad:pubsub-receipt
Conversation
pongad
left a comment
There was a problem hiding this comment.
load test shows no regression
| subscriber.stopAsync().awaitTerminated(); | ||
| } | ||
|
|
||
| // @Test |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
| pendingReceipts.drainTo(receiptsToSend.ackIds); | ||
| logger.log(Level.FINER, "Sending {0} receipts", receiptsToSend.ackIds.size()); | ||
| if (!receiptsToSend.ackIds.isEmpty()) { | ||
| modifyAckDeadlinesToSend.add(receiptsToSend); |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
|
@mdietz94 please take a look |
mdietz94
left a comment
There was a problem hiding this comment.
In general, it may be easier to instead just schedule the pending extension job to run immediately the first time instead of in stream ack deadline seconds, and then you won't need to explicitly keep track of pending receipts.
| } | ||
| destination = pendingAcks; | ||
| // Record the latency rounded to the next closest integer. | ||
| ackLatencyDistribution.record( |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
| public void run() { | ||
| try { | ||
| processOutstandingAckOperations(); | ||
| } catch (Exception e) { |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This commit also simplifies acks/nacks logic.
It is accessed from executor threads and must to synced to guarantee property updates are observed.
If the user code throws an exception, we catch this exception and nack the message. This logic cannot be tested correctly. The test uses a MessageReceiver that can notify us that messages have been processed, so that we can "advanceTime". The receiver can only notify us that it's throwing an exception BEFORE the exception is actually thrown out of the method. (Even with finally-clause, the statements in the clause runs before the giving control back to the caller.) Consequently, we're notifying too soon: the message has not been processed yet as the exception might not have been caught.
|
@mdietz94 Thank you for the review, PTAL.
I plan to soon make extension job run on schedule, kind of like what's ack/nack/receipt is doing in this PR. I found a pretty simple way to make it work with periodic stream deadline modification. However, I don't think I can make it work with immediate modack scheduling that receipt would need. If this PR looks "right but sub-optimal" to you, do you think we can submit this and follow up in another PR? I think this will give us better throughput; timezone makes code review latency rather high. I had to add a few changes to address some flakes. Looks like there's still one flakey test. I after analyzing the logs, I believe the problem is in the test itself, not the library, though I can't pinpoint where. I think the best course of action is to finish simplifying the library, then we can simplify the test to have fewer moving parts. I can take this on later as well. |
mdietz94
left a comment
There was a problem hiding this comment.
LGTM, though as we simplify the subscriber I think it would be good to combine some of the logic for mod ack/receipt as discussed.
|
ack. (no pun intended) |
🤖 I have created a release *beep* *boop* --- <details><summary>2.38.1</summary> ## [2.38.1](googleapis/sdk-platform-java@v2.38.0...v2.38.1) (2024-03-15) ### Bug Fixes * **deps:** add detector-resource-support dependencies ([#2559](googleapis/sdk-platform-java#2559)) ([037824b](googleapis/sdk-platform-java@037824b)) * Update shopping and chat common protos. ([#2580](googleapis/sdk-platform-java#2580)) ([692634e](googleapis/sdk-platform-java@692634e)) ### Dependencies * update google api dependencies ([#2582](googleapis/sdk-platform-java#2582)) ([32e242a](googleapis/sdk-platform-java@32e242a)) </details> --- This PR was generated with [Release Please](https://github.com/googleapis/release-please). See [documentation](https://github.com/googleapis/release-please#release-please). Co-authored-by: release-please[bot] <55107282+release-please[bot]@users.noreply.github.com> Co-authored-by: Blake Li <blakeli@google.com>
This commit also simplifies acks/nacks logic.