pubsub: create experimental message dispatcher#2572
Closed
pongad wants to merge 3 commits intogoogleapis:masterfrom
pongad:pubsub-dup
Closed
pubsub: create experimental message dispatcher#2572pongad wants to merge 3 commits intogoogleapis:masterfrom pongad:pubsub-dup
pongad wants to merge 3 commits intogoogleapis:masterfrom
pongad:pubsub-dup
Conversation
pongad
commented
Nov 2, 2017
| || builder.getModifyDeadlineAckIdsCount() == MAX_CHANGE_PER_REQUEST; | ||
| } | ||
|
|
||
| void processReceivedMessages(List<ReceivedMessage> messages, Runnable doneCallback) { |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
| workMessages(); | ||
| } | ||
|
|
||
| private synchronized void workMessages() { |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
We emphasize simplicity, not optimal performance. Of course, performance is still important, hence the many concurrent data structures used here. Major differences from the original and some justifications: - It modacks immediately after receiving messages to take advantage of server-side duplicate-mitigation feature that should soon be available. - In the original impl, we create many dispatchers. They share some data structures and keep others separate. I find this too confusing. The new impl creates only one dispatcher, to be shared among all connections. This obviously increases contention but should be at least partially alleviated by some lock-free algorithms used here. - It makes deadline a constant of 1 minute. With the dup-mitigation feature, the server might need to wait on the order of minutes before it could redeliver messages. I opine that setting the deadline to 1 minute shouldn't drastically worsen the redelivery latency. Also unlike the original, it does not periodically adjust deadline. I have some ideas on how this could be simply implemented; we can add this feature back if necessary. - Modack time is also set to 1 minute and doesn't exponentially back off. Since the deadline is already 1 minute, it seems silly to bicker over a few extra seconds. [1] - Modacks run on fixed schedule, giving 15 seconds padding, and modacks all pending messages, not just the ones about to expire. While clearly suboptimal, it's not very expensive since it only happens once every 45 seconds. [1] This caused a bug. If the padding is set too large, we'd schedule modacks to occur in the past, creating a modack storm. I believe the benefits of reduced complexity outweighs the cost. Load test shows the current implementation still has no trouble catching up with the publisher.
pongad
commented
Nov 2, 2017
| } | ||
|
|
||
| for (ReceivedMessage message : messages) { | ||
| modAcks.add(ModAckItem.create(message.getAckId(), DEADLINE_EXTENSION_SEC)); |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
mdietz94
reviewed
Nov 2, 2017
| builder.addModifyDeadlineSeconds(modAck.seconds()); | ||
| } | ||
|
|
||
| return builder.getAckIdsCount() == MAX_CHANGE_PER_REQUEST |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
mdietz94
reviewed
Nov 2, 2017
| private void sendRequest(StreamingPullRequest request) { | ||
| Connection connection = null; | ||
| try { | ||
| connection = connections.take(); |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
|
It's unfortunately much more expensive server side to process messages from
a different stream. We do support it so users can continue to process
messages after a stream dies, but we won't be able to scale if every stream
is doing this.
…On Nov 2, 2017 9:36 PM, "Michael Darakananda" ***@***.***> wrote:
***@***.**** commented on this pull request.
------------------------------
In google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/
v1/MessageDispatcher2.java
<#2572 (comment)>
:
> + @OverRide
+ public void run() {
+ try {
+ receiver.receiveMessage(item.message.getMessage(), consumer);
+ } catch (Exception e) {
+ consumer.throwException(e);
+ }
+ }
+ });
+ }
+ }
+
+ private void sendRequest(StreamingPullRequest request) {
+ Connection connection = null;
+ try {
+ connection = connections.take();
I thought I was being smart round-robin-ing the connections :(
Could you provide some context here? The current logic seems to work fine
on both desktop and loadtest. Might it not work in the future?
—
You are receiving this because you are subscribed to this thread.
Reply to this email directly, view it on GitHub
<#2572 (comment)>,
or mute the thread
<https://github.com/notifications/unsubscribe-auth/ABvXakDzdWgDcEiKQLPS_-_XbJD2sSxNks5sym4MgaJpZM4QPKmh>
.
|
Contributor
Author
|
superseded by #2580 |
chingor13
pushed a commit
that referenced
this pull request
Feb 20, 2026
#2572) * chore: Update generation configuration at Mon Jul 22 18:44:15 UTC 2024 * chore: Update generation configuration at Wed Jul 24 02:15:53 UTC 2024 * chore: Update generation configuration at Thu Jul 25 02:16:04 UTC 2024 * chore: Update generation configuration at Fri Jul 26 02:15:55 UTC 2024
chingor13
pushed a commit
that referenced
this pull request
Mar 24, 2026
In this PR we fix the `deep-remove` and `deep-preserve` regexes in `.OwlBot.yaml` files in the monorepo by creating a temporary `.OwlBot.hermetic.yaml` with the corrected paths. ### Why? Because the paths specified in `deep-remove-regex` and `deep-preserve-regex` [start from the library name](https://github.com/googleapis/google-cloud-java/blob/14f7146c98fdf03de2f113215e03347a21e83f9a/java-alloydb/.OwlBot.yaml#L17), so if we run `copy-code` from inside a library, these regexes won't match any files. ### What about deep-copy? We don't need to correct the library path because `copy-code` sends the files from `googleapis-gen` (or our built temp folder in our case) to the `owl-bot-staging` folder. These regexes don't deal with the monorepo, they deal with the source repo, which for our case is the temp folder we build after calling generate_library. This is why `deep-copy` was always being "respected". ### Proof After running `python generate_repo.py generate --generation-config-yaml google-cloud-java/generation_config.yaml --repository-path google-cloud-java --target-library-api-shortname merchantapi &> out`, we get  --------- Co-authored-by: Joe Wang <106995533+JoeWang1127@users.noreply.github.com>
chingor13
pushed a commit
that referenced
this pull request
Mar 24, 2026
🤖 I have created a release *beep* *boop* --- <details><summary>2.39.0</summary> ## [2.39.0](googleapis/sdk-platform-java@v2.38.1...v2.39.0) (2024-04-18) ### Features * add `libraries_bom_version` to generation configuration ([#2639](googleapis/sdk-platform-java#2639)) ([76eb62b](googleapis/sdk-platform-java@76eb62b)) * Add ChannelPoolSettings Getter for gRPC's ChannelProvider ([#2612](googleapis/sdk-platform-java#2612)) ([c8dae8f](googleapis/sdk-platform-java@c8dae8f)) * add config change ([#2604](googleapis/sdk-platform-java#2604)) ([a8c93b5](googleapis/sdk-platform-java@a8c93b5)) * add entry point ([#2616](googleapis/sdk-platform-java#2616)) ([f5492bb](googleapis/sdk-platform-java@f5492bb)) * add generation config comparator ([#2587](googleapis/sdk-platform-java#2587)) ([3461166](googleapis/sdk-platform-java@3461166)) * Add JavadocJar Task to build.gradle for self service libraries ([#2593](googleapis/sdk-platform-java#2593)) ([a66df18](googleapis/sdk-platform-java@a66df18)) * Client/StubSettings' getEndpoint() returns the resolved endpoint ([#2440](googleapis/sdk-platform-java#2440)) ([7cf0d8f](googleapis/sdk-platform-java@7cf0d8f)) * generate selected libraries ([#2598](googleapis/sdk-platform-java#2598)) ([e4572d1](googleapis/sdk-platform-java@e4572d1)) * Validate the Universe Domain inside Java-Core ([#2592](googleapis/sdk-platform-java#2592)) ([a5e1141](googleapis/sdk-platform-java@a5e1141)) ### Bug Fixes * add main to `generate_repo.py` ([#2607](googleapis/sdk-platform-java#2607)) ([df39521](googleapis/sdk-platform-java@df39521)) * correct deep-remove and deep-preserve regexes ([#2572](googleapis/sdk-platform-java#2572)) ([7d59fa1](googleapis/sdk-platform-java@7d59fa1)) * first attempt should use the min of RPC timeout and total timeout ([#2641](googleapis/sdk-platform-java#2641)) ([806a52e](googleapis/sdk-platform-java@806a52e)) * remove duplicated calls to AutoValue builders ([#2636](googleapis/sdk-platform-java#2636)) ([c883b8f](googleapis/sdk-platform-java@c883b8f)) * remove unnecessary slf4j and AbstractGoogleClientRequest native image configs ([9cb7c09](googleapis/sdk-platform-java@9cb7c09)) * remove unnecessary slf4j and AbstractGoogleClientRequest native image configs ([#2628](googleapis/sdk-platform-java#2628)) ([9cb7c09](googleapis/sdk-platform-java@9cb7c09)) ### Dependencies * update arrow.version to v15.0.2 ([#2589](googleapis/sdk-platform-java#2589)) ([0947407](googleapis/sdk-platform-java@0947407)) * update dependency com.google.cloud.opentelemetry:detector-resources-support to v0.28.0 ([#2649](googleapis/sdk-platform-java#2649)) ([079bcff](googleapis/sdk-platform-java@079bcff)) * update dependency gitpython to v3.1.41 [security] ([#2625](googleapis/sdk-platform-java#2625)) ([018e9a9](googleapis/sdk-platform-java@018e9a9)) * update dependency net.bytebuddy:byte-buddy to v1.14.13 ([#2646](googleapis/sdk-platform-java#2646)) ([5a79cd2](googleapis/sdk-platform-java@5a79cd2)) * update dependency org.threeten:threeten-extra to v1.8.0 ([#2650](googleapis/sdk-platform-java#2650)) ([5666adb](googleapis/sdk-platform-java@5666adb)) * update dependency org.threeten:threetenbp to v1.6.9 ([#2602](googleapis/sdk-platform-java#2602)) ([767eec1](googleapis/sdk-platform-java@767eec1)) * update dependency org.threeten:threetenbp to v1.6.9 ([#2665](googleapis/sdk-platform-java#2665)) ([563d8ae](googleapis/sdk-platform-java@563d8ae)) * update google api dependencies ([#2584](googleapis/sdk-platform-java#2584)) ([22733b1](googleapis/sdk-platform-java@22733b1)) * update googleapis/java-cloud-bom digest to 7071341 ([#2608](googleapis/sdk-platform-java#2608)) ([8f34fe7](googleapis/sdk-platform-java@8f34fe7)) * update netty dependencies to v4.1.109.final ([#2597](googleapis/sdk-platform-java#2597)) ([6b6939e](googleapis/sdk-platform-java@6b6939e)) * update opentelemetry-java monorepo to v1.37.0 ([#2652](googleapis/sdk-platform-java#2652)) ([708cf7f](googleapis/sdk-platform-java@708cf7f)) * update protobuf dependencies to v3.25.3 ([#2491](googleapis/sdk-platform-java#2491)) ([e2235a8](googleapis/sdk-platform-java@e2235a8)) * update slf4j monorepo to v2.0.13 ([#2647](googleapis/sdk-platform-java#2647)) ([2e3d813](googleapis/sdk-platform-java@2e3d813)) </details> --- This PR was generated with [Release Please](https://github.com/googleapis/release-please). See [documentation](https://github.com/googleapis/release-please#release-please). Co-authored-by: release-please[bot] <55107282+release-please[bot]@users.noreply.github.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
EXPERIMENTAL
I pulled it into a separate class so it may coexist with the original.
We can submit this, but it needs more testing before production use.
We emphasize simplicity, not optimal performance.
Of course, performance is still important,
hence the many concurrent data structures used here.
Major differences from the original and some justifications:
to take advantage of server-side duplicate-mitigation
feature that should soon be available.
They share some data structures and keep others separate.
I find this too confusing.
The new impl creates only one dispatcher, to be shared
among all connections. This obviously increases contention
but should be at least partially alleviated by some lock-free
algorithms used here.
With the dup-mitigation feature, the server might need to
wait on the order of minutes before it could redeliver messages.
I opine that setting the deadline to 1 minute shouldn't drastically
worsen the redelivery latency.
Also unlike the original, it does not periodically adjust deadline.
I have some ideas on how this could be simply implemented;
we can add this feature back if necessary.
off. Since the deadline is already 1 minute, it seems silly to
bicker over a few extra seconds. [1]
and modacks all pending messages, not just the ones about to expire.
While clearly suboptimal, it's not very expensive since it only
happens once every 45 seconds.
[1] This caused a bug. If the padding is set too large, we'd
schedule modacks to occur in the past, creating a modack storm.
I believe the benefits of reduced complexity outweighs the cost.
Load test shows the current implementation still has no trouble
catching up with the publisher.