From 5ee8bd39882b5747319c7e6bdf6f6901b08eb528 Mon Sep 17 00:00:00 2001 From: Michael Darakananda Date: Mon, 13 Feb 2017 17:56:42 +1100 Subject: [PATCH 1/4] add Subscriber snippet The snippet is very long, but the shortest I can make it without loosing details. --- .../pubsub/snippets/SubscriberSnippets.java | 100 ++++++++++++++++++ .../cloud/pubsub/spi/v1/Subscriber.java | 90 ++++++++++------ 2 files changed, 160 insertions(+), 30 deletions(-) create mode 100644 google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/SubscriberSnippets.java diff --git a/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/SubscriberSnippets.java b/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/SubscriberSnippets.java new file mode 100644 index 000000000000..6464308b194d --- /dev/null +++ b/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/SubscriberSnippets.java @@ -0,0 +1,100 @@ +/* + * Copyright 2017 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * EDITING INSTRUCTIONS + * This file is referenced in Subscriber's javadoc. Any change to this file should be reflected in + * PubSub's javadoc. + */ + +package com.google.cloud.examples.pubsub.snippets; + +import com.google.cloud.pubsub.spi.v1.AckReply; +import com.google.cloud.pubsub.spi.v1.AckReplyConsumer; +import com.google.cloud.pubsub.spi.v1.MessageReceiver; +import com.google.cloud.pubsub.spi.v1.Subscriber; +import com.google.pubsub.v1.PubsubMessage; +import com.google.pubsub.v1.SubscriptionName; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.Executor; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +public class SubscriberSnippets { + /** + * Example of receiving a specific number of messages. + */ + // [TARGET startAsync()] + // [VARIABLE "my_project_name"] + // [VARIABLE "my_subscription_name"] + // [VARIABLE 3] + public void startAsync(String projectName, String subscriptionName, int receiveNum) throws Exception { + // [START startAsync] + SubscriptionName subscription = SubscriptionName.create(projectName, subscriptionName); + final Lock lock = new ReentrantLock(); + final Condition doneCondition = lock.newCondition(); + final AtomicInteger pendingReceives = new AtomicInteger(receiveNum); + final AtomicBoolean done = new AtomicBoolean(); + + MessageReceiver receiver = new MessageReceiver() { + public void receiveMessage(final PubsubMessage message, final AckReplyConsumer consumer) { + System.out.println("got message: " + message); + consumer.accept(AckReply.ACK, null); + if (pendingReceives.decrementAndGet() != 0) { + return; + } + lock.lock(); + try { + done.set(true); + doneCondition.signal(); + } finally { + lock.unlock(); + } + } + }; + + Subscriber subscriber = Subscriber.newBuilder(subscription, receiver).build(); + subscriber.addListener(new Subscriber.SubscriberListener() { + public void failed(Subscriber.State from, Throwable failure) { + System.err.println(failure); + lock.lock(); + try { + done.set(true); + doneCondition.signal(); + } finally { + lock.unlock(); + } + } + }, new Executor() { + public void execute(Runnable command) { + command.run(); + } + }); + subscriber.startAsync(); + lock.lock(); + try { + while (!done.get()) { + doneCondition.await(); + } + } finally { + lock.unlock(); + } + subscriber.stopAsync().awaitTerminated(); + // [END startAsync] + } +} diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java index 81239fa9a8fe..5890efe198a3 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java @@ -75,36 +75,6 @@ * *

If no credentials are provided, the {@link Subscriber} will use application default * credentials through {@link GoogleCredentials#getApplicationDefault}. - * - *

For example, a {@link Subscriber} can be constructed and used to receive messages as follows: - * - *


- * MessageReceiver receiver = new MessageReceiver() {
- *   @Override
- *   public void receiveMessage(PubsubMessage message, SettableFuture<AckReply> response) {
- *     // ... process message ...
- *     return response.set(AckReply.ACK);
- *   }
- * }
- *
- * Subscriber subscriber =
- *     Subscriber.newBuilder(MY_SUBSCRIPTION, receiver)
- *         .setMaxBundleAcks(100)
- *         .build();
- *
- * subscriber.startAsync();
- *
- * // ... recommended, listen for fatal errors that break the subscriber streaming ...
- * subscriber.addListener(new Listener() {
- *   @Override
- *   public void failed(State from, Throwable failure) {
- *     System.out.println("Subscriber failed with error: " + failure);
- *   }
- * }, Executors.newSingleThreadExecutor());
- *
- * // ... and when done with the subscriber ...
- * subscriber.stopAsync();
- * 
*/ public class Subscriber { private static final int THREADS_PER_CHANNEL = 5; @@ -207,6 +177,66 @@ public boolean isRunning() { return impl.isRunning(); } + /** + * Initiates service startup and returns immediately. + * + *

Example of receiving a specific number of messages. + *

 {@code
+   * String projectName = "my_project_name";
+   * String subscriptionName = "my_subscription_name";
+   * int receiveNum = 3;
+   * SubscriptionName subscription = SubscriptionName.create(projectName, subscriptionName);
+   * final Lock lock = new ReentrantLock();
+   * final Condition doneCondition = lock.newCondition();
+   * final AtomicInteger pendingReceives = new AtomicInteger(receiveNum);
+   * final AtomicBoolean done = new AtomicBoolean();
+   *
+   * MessageReceiver receiver = new MessageReceiver() {
+   *   public void receiveMessage(final PubsubMessage message, final AckReplyConsumer consumer) {
+   *     System.out.println("got message: " + message);
+   *     consumer.accept(AckReply.ACK, null);
+   *     if (pendingReceives.decrementAndGet() != 0) {
+   *       return;
+   *     }
+   *     lock.lock();
+   *     try {
+   *       done.set(true);
+   *       doneCondition.signal();
+   *     } finally {
+   *       lock.unlock();
+   *     }
+   *   }
+   * };
+   *
+   * Subscriber subscriber = Subscriber.newBuilder(subscription, receiver).build();
+   * subscriber.addListener(new Subscriber.SubscriberListener() {
+   *   public void failed(Subscriber.State from, Throwable failure) {
+   *     System.err.println(failure);
+   *     lock.lock();
+   *     try {
+   *       done.set(true);
+   *       doneCondition.signal();
+   *     } finally {
+   *       lock.unlock();
+   *     }
+   *   }
+   * }, new Executor() {
+   *   public void execute(Runnable command) {
+   *     command.run();
+   *   }
+   * });
+   * subscriber.startAsync();
+   * lock.lock();
+   * try {
+   *   while (!done.get()) {
+   *     doneCondition.await();
+   *   }
+   * } finally {
+   *   lock.unlock();
+   * }
+   * subscriber.stopAsync().awaitTerminated();
+   * }
+ */ public Subscriber startAsync() { impl.startAsync(); return this; From 4bd5d48058bf916573c4a916e7e2f6b1fecebda2 Mon Sep 17 00:00:00 2001 From: Michael Darakananda Date: Mon, 20 Feb 2017 14:46:14 +1100 Subject: [PATCH 2/4] pr comment --- .../pubsub/snippets/SubscriberSnippets.java | 50 ++++++------------- pom.xml | 2 +- 2 files changed, 17 insertions(+), 35 deletions(-) diff --git a/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/SubscriberSnippets.java b/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/SubscriberSnippets.java index 6464308b194d..50c712945ad0 100644 --- a/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/SubscriberSnippets.java +++ b/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/SubscriberSnippets.java @@ -22,20 +22,25 @@ package com.google.cloud.examples.pubsub.snippets; +import com.google.api.gax.core.SettableRpcFuture; import com.google.cloud.pubsub.spi.v1.AckReply; import com.google.cloud.pubsub.spi.v1.AckReplyConsumer; import com.google.cloud.pubsub.spi.v1.MessageReceiver; import com.google.cloud.pubsub.spi.v1.Subscriber; import com.google.pubsub.v1.PubsubMessage; import com.google.pubsub.v1.SubscriptionName; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.Executor; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.TimeUnit; public class SubscriberSnippets { + + private final SubscriptionName subscription; + + public SubscriberSnippets(SubscriptionName subscription) { + this.subscription = subscription; + } + /** * Example of receiving a specific number of messages. */ @@ -43,27 +48,17 @@ public class SubscriberSnippets { // [VARIABLE "my_project_name"] // [VARIABLE "my_subscription_name"] // [VARIABLE 3] - public void startAsync(String projectName, String subscriptionName, int receiveNum) throws Exception { + public void startAsync(int receiveNum) throws Exception { // [START startAsync] - SubscriptionName subscription = SubscriptionName.create(projectName, subscriptionName); - final Lock lock = new ReentrantLock(); - final Condition doneCondition = lock.newCondition(); final AtomicInteger pendingReceives = new AtomicInteger(receiveNum); - final AtomicBoolean done = new AtomicBoolean(); + final SettableRpcFuture done = new SettableRpcFuture<>(); MessageReceiver receiver = new MessageReceiver() { public void receiveMessage(final PubsubMessage message, final AckReplyConsumer consumer) { System.out.println("got message: " + message); consumer.accept(AckReply.ACK, null); - if (pendingReceives.decrementAndGet() != 0) { - return; - } - lock.lock(); - try { - done.set(true); - doneCondition.signal(); - } finally { - lock.unlock(); + if (pendingReceives.decrementAndGet() == 0) { + done.set(null); } } }; @@ -71,14 +66,7 @@ public void receiveMessage(final PubsubMessage message, final AckReplyConsumer c Subscriber subscriber = Subscriber.newBuilder(subscription, receiver).build(); subscriber.addListener(new Subscriber.SubscriberListener() { public void failed(Subscriber.State from, Throwable failure) { - System.err.println(failure); - lock.lock(); - try { - done.set(true); - doneCondition.signal(); - } finally { - lock.unlock(); - } + done.setException(failure); } }, new Executor() { public void execute(Runnable command) { @@ -86,14 +74,8 @@ public void execute(Runnable command) { } }); subscriber.startAsync(); - lock.lock(); - try { - while (!done.get()) { - doneCondition.await(); - } - } finally { - lock.unlock(); - } + + done.get(10, TimeUnit.MINUTES); subscriber.stopAsync().awaitTerminated(); // [END startAsync] } diff --git a/pom.xml b/pom.xml index 1f10ef53ea21..8b117f9ffe01 100644 --- a/pom.xml +++ b/pom.xml @@ -92,7 +92,7 @@ github 0.6.0 1.0.3 - 0.1.0 + 0.1.1 0.1.5 0.9.3-alpha-SNAPSHOT 0.9.3-beta-SNAPSHOT From 4f95f595684ed43a808301ab06f10ea436e32a10 Mon Sep 17 00:00:00 2001 From: Michael Darakananda Date: Tue, 21 Feb 2017 15:31:06 +1100 Subject: [PATCH 3/4] update snippet --- .../pubsub/snippets/SubscriberSnippets.java | 2 - .../cloud/pubsub/spi/v1/Subscriber.java | 42 ++++--------------- 2 files changed, 9 insertions(+), 35 deletions(-) diff --git a/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/SubscriberSnippets.java b/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/SubscriberSnippets.java index 50c712945ad0..3ec29d362e07 100644 --- a/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/SubscriberSnippets.java +++ b/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/SubscriberSnippets.java @@ -45,8 +45,6 @@ public SubscriberSnippets(SubscriptionName subscription) { * Example of receiving a specific number of messages. */ // [TARGET startAsync()] - // [VARIABLE "my_project_name"] - // [VARIABLE "my_subscription_name"] // [VARIABLE 3] public void startAsync(int receiveNum) throws Exception { // [START startAsync] diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java index e1366e2c0397..0eac4950be01 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java @@ -182,43 +182,24 @@ public boolean isRunning() { * *

Example of receiving a specific number of messages. *

 {@code
-   * String projectName = "my_project_name";
-   * String subscriptionName = "my_subscription_name";
    * int receiveNum = 3;
-   * SubscriptionName subscription = SubscriptionName.create(projectName, subscriptionName);
-   * final Lock lock = new ReentrantLock();
-   * final Condition doneCondition = lock.newCondition();
    * final AtomicInteger pendingReceives = new AtomicInteger(receiveNum);
-   * final AtomicBoolean done = new AtomicBoolean();
-   *
+   * final SettableRpcFuture done = new SettableRpcFuture<>();
+   * 
    * MessageReceiver receiver = new MessageReceiver() {
    *   public void receiveMessage(final PubsubMessage message, final AckReplyConsumer consumer) {
    *     System.out.println("got message: " + message);
    *     consumer.accept(AckReply.ACK, null);
-   *     if (pendingReceives.decrementAndGet() != 0) {
-   *       return;
-   *     }
-   *     lock.lock();
-   *     try {
-   *       done.set(true);
-   *       doneCondition.signal();
-   *     } finally {
-   *       lock.unlock();
+   *     if (pendingReceives.decrementAndGet() == 0) {
+   *       done.set(null);
    *     }
    *   }
    * };
-   *
+   * 
    * Subscriber subscriber = Subscriber.newBuilder(subscription, receiver).build();
    * subscriber.addListener(new Subscriber.SubscriberListener() {
    *   public void failed(Subscriber.State from, Throwable failure) {
-   *     System.err.println(failure);
-   *     lock.lock();
-   *     try {
-   *       done.set(true);
-   *       doneCondition.signal();
-   *     } finally {
-   *       lock.unlock();
-   *     }
+   *     done.setException(failure);
    *   }
    * }, new Executor() {
    *   public void execute(Runnable command) {
@@ -226,16 +207,11 @@ public boolean isRunning() {
    *   }
    * });
    * subscriber.startAsync();
-   * lock.lock();
-   * try {
-   *   while (!done.get()) {
-   *     doneCondition.await();
-   *   }
-   * } finally {
-   *   lock.unlock();
-   * }
+   * 
+   * done.get(10, TimeUnit.MINUTES);
    * subscriber.stopAsync().awaitTerminated();
    * }
+ * */ public Subscriber startAsync() { impl.startAsync(); From 3717f023e478c1c51b94d85a35b29c5d80be4e65 Mon Sep 17 00:00:00 2001 From: Michael Darakananda Date: Wed, 22 Feb 2017 15:23:19 +1100 Subject: [PATCH 4/4] pr comment --- .../snippets/MessageReceiverSnippets.java | 61 +++++++++++++++++++ .../pubsub/snippets/SubscriberSnippets.java | 46 ++++++-------- .../cloud/pubsub/spi/v1/MessageReceiver.java | 19 ++++++ .../cloud/pubsub/spi/v1/Subscriber.java | 25 ++------ 4 files changed, 101 insertions(+), 50 deletions(-) create mode 100644 google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/MessageReceiverSnippets.java diff --git a/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/MessageReceiverSnippets.java b/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/MessageReceiverSnippets.java new file mode 100644 index 000000000000..cfedfb8cea42 --- /dev/null +++ b/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/MessageReceiverSnippets.java @@ -0,0 +1,61 @@ +/* + * Copyright 2017 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * EDITING INSTRUCTIONS + * This file is referenced in Subscriber's javadoc. Any change to this file should be reflected in + * PubSub's javadoc. + */ + +package com.google.cloud.examples.pubsub.snippets; + +import com.google.cloud.pubsub.spi.v1.AckReply; +import com.google.cloud.pubsub.spi.v1.AckReplyConsumer; +import com.google.cloud.pubsub.spi.v1.MessageReceiver; +import com.google.pubsub.v1.PubsubMessage; +import java.util.concurrent.BlockingQueue; + +public class MessageReceiverSnippets { + private final BlockingQueue blockingQueue; + + public MessageReceiverSnippets(BlockingQueue blockingQueue) { + this.blockingQueue = blockingQueue; + } + + /** + * This {@code MessageReceiver} passes all messages to a {@link BlockingQueue}. + * This method can be called concurrently from multiple threads, + * so it is important that the queue be thread-safe. + * + * This example is for illustration. Implementations may directly process messages + * instead of sending them to queues. + */ + // [TARGET receiveMessage(PubsubMessage, AckReplyConsumer)] + public MessageReceiver messageReceiver() { + // [START receiveMessage] + MessageReceiver receiver = new MessageReceiver() { + public void receiveMessage(final PubsubMessage message, final AckReplyConsumer consumer) { + if (blockingQueue.offer(message)) { + consumer.accept(AckReply.ACK, null); + } else { + consumer.accept(AckReply.NACK, null); + } + } + }; + // [END receiveMessage] + return receiver; + } +} diff --git a/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/SubscriberSnippets.java b/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/SubscriberSnippets.java index 3ec29d362e07..13bc5a6c6cdd 100644 --- a/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/SubscriberSnippets.java +++ b/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/SubscriberSnippets.java @@ -22,58 +22,46 @@ package com.google.cloud.examples.pubsub.snippets; -import com.google.api.gax.core.SettableRpcFuture; -import com.google.cloud.pubsub.spi.v1.AckReply; -import com.google.cloud.pubsub.spi.v1.AckReplyConsumer; +import com.google.api.gax.core.RpcFuture; import com.google.cloud.pubsub.spi.v1.MessageReceiver; import com.google.cloud.pubsub.spi.v1.Subscriber; -import com.google.pubsub.v1.PubsubMessage; import com.google.pubsub.v1.SubscriptionName; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.Executor; -import java.util.concurrent.TimeUnit; public class SubscriberSnippets { private final SubscriptionName subscription; + private final MessageReceiver receiver; + private final RpcFuture done; + private final Executor executor; - public SubscriberSnippets(SubscriptionName subscription) { + public SubscriberSnippets( + SubscriptionName subscription, + MessageReceiver receiver, + RpcFuture done, + Executor executor) { this.subscription = subscription; + this.receiver = receiver; + this.done = done; + this.executor = executor; } /** * Example of receiving a specific number of messages. */ // [TARGET startAsync()] - // [VARIABLE 3] - public void startAsync(int receiveNum) throws Exception { + public void startAsync() throws Exception { // [START startAsync] - final AtomicInteger pendingReceives = new AtomicInteger(receiveNum); - final SettableRpcFuture done = new SettableRpcFuture<>(); - - MessageReceiver receiver = new MessageReceiver() { - public void receiveMessage(final PubsubMessage message, final AckReplyConsumer consumer) { - System.out.println("got message: " + message); - consumer.accept(AckReply.ACK, null); - if (pendingReceives.decrementAndGet() == 0) { - done.set(null); - } - } - }; - Subscriber subscriber = Subscriber.newBuilder(subscription, receiver).build(); subscriber.addListener(new Subscriber.SubscriberListener() { public void failed(Subscriber.State from, Throwable failure) { - done.setException(failure); - } - }, new Executor() { - public void execute(Runnable command) { - command.run(); + // Handle error. } - }); + }, executor); subscriber.startAsync(); - done.get(10, TimeUnit.MINUTES); + // Wait for a stop signal. + done.get(); subscriber.stopAsync().awaitTerminated(); // [END startAsync] } diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageReceiver.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageReceiver.java index 52dd6ea8262b..15dfedc34aa1 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageReceiver.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageReceiver.java @@ -23,6 +23,25 @@ public interface MessageReceiver { /** * Called when a message is received by the subscriber. The implementation must arrange for {@link * AckReplyConsumer#accept} to be called after processing the {@code message}. + * + *

This {@code MessageReceiver} passes all messages to a {@link BlockingQueue}. + * This method can be called concurrently from multiple threads, + * so it is important that the queue be thread-safe. + * + * This example is for illustration. Implementations may directly process messages + * instead of sending them to queues. + *

 {@code
+   * MessageReceiver receiver = new MessageReceiver() {
+   *   public void receiveMessage(final PubsubMessage message, final AckReplyConsumer consumer) {
+   *     if (blockingQueue.offer(message)) {
+   *       consumer.accept(AckReply.ACK, null);
+   *     } else {
+   *       consumer.accept(AckReply.NACK, null);
+   *     }
+   *   }
+   * };
+   * }
+ * */ void receiveMessage(final PubsubMessage message, final AckReplyConsumer consumer); } diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java index 0eac4950be01..902226db408f 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java @@ -182,33 +182,16 @@ public boolean isRunning() { * *

Example of receiving a specific number of messages. *

 {@code
-   * int receiveNum = 3;
-   * final AtomicInteger pendingReceives = new AtomicInteger(receiveNum);
-   * final SettableRpcFuture done = new SettableRpcFuture<>();
-   * 
-   * MessageReceiver receiver = new MessageReceiver() {
-   *   public void receiveMessage(final PubsubMessage message, final AckReplyConsumer consumer) {
-   *     System.out.println("got message: " + message);
-   *     consumer.accept(AckReply.ACK, null);
-   *     if (pendingReceives.decrementAndGet() == 0) {
-   *       done.set(null);
-   *     }
-   *   }
-   * };
-   * 
    * Subscriber subscriber = Subscriber.newBuilder(subscription, receiver).build();
    * subscriber.addListener(new Subscriber.SubscriberListener() {
    *   public void failed(Subscriber.State from, Throwable failure) {
-   *     done.setException(failure);
-   *   }
-   * }, new Executor() {
-   *   public void execute(Runnable command) {
-   *     command.run();
+   *     // Handle error.
    *   }
-   * });
+   * }, executor);
    * subscriber.startAsync();
    * 
-   * done.get(10, TimeUnit.MINUTES);
+   * // Wait for a stop signal.
+   * done.get();
    * subscriber.stopAsync().awaitTerminated();
    * }
*