diff --git a/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/MessageReceiverSnippets.java b/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/MessageReceiverSnippets.java new file mode 100644 index 000000000000..cfedfb8cea42 --- /dev/null +++ b/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/MessageReceiverSnippets.java @@ -0,0 +1,61 @@ +/* + * Copyright 2017 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * EDITING INSTRUCTIONS + * This file is referenced in Subscriber's javadoc. Any change to this file should be reflected in + * PubSub's javadoc. + */ + +package com.google.cloud.examples.pubsub.snippets; + +import com.google.cloud.pubsub.spi.v1.AckReply; +import com.google.cloud.pubsub.spi.v1.AckReplyConsumer; +import com.google.cloud.pubsub.spi.v1.MessageReceiver; +import com.google.pubsub.v1.PubsubMessage; +import java.util.concurrent.BlockingQueue; + +public class MessageReceiverSnippets { + private final BlockingQueue blockingQueue; + + public MessageReceiverSnippets(BlockingQueue blockingQueue) { + this.blockingQueue = blockingQueue; + } + + /** + * This {@code MessageReceiver} passes all messages to a {@link BlockingQueue}. + * This method can be called concurrently from multiple threads, + * so it is important that the queue be thread-safe. + * + * This example is for illustration. Implementations may directly process messages + * instead of sending them to queues. + */ + // [TARGET receiveMessage(PubsubMessage, AckReplyConsumer)] + public MessageReceiver messageReceiver() { + // [START receiveMessage] + MessageReceiver receiver = new MessageReceiver() { + public void receiveMessage(final PubsubMessage message, final AckReplyConsumer consumer) { + if (blockingQueue.offer(message)) { + consumer.accept(AckReply.ACK, null); + } else { + consumer.accept(AckReply.NACK, null); + } + } + }; + // [END receiveMessage] + return receiver; + } +} diff --git a/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/SubscriberSnippets.java b/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/SubscriberSnippets.java new file mode 100644 index 000000000000..13bc5a6c6cdd --- /dev/null +++ b/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/SubscriberSnippets.java @@ -0,0 +1,68 @@ +/* + * Copyright 2017 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * EDITING INSTRUCTIONS + * This file is referenced in Subscriber's javadoc. Any change to this file should be reflected in + * PubSub's javadoc. + */ + +package com.google.cloud.examples.pubsub.snippets; + +import com.google.api.gax.core.RpcFuture; +import com.google.cloud.pubsub.spi.v1.MessageReceiver; +import com.google.cloud.pubsub.spi.v1.Subscriber; +import com.google.pubsub.v1.SubscriptionName; +import java.util.concurrent.Executor; + +public class SubscriberSnippets { + + private final SubscriptionName subscription; + private final MessageReceiver receiver; + private final RpcFuture done; + private final Executor executor; + + public SubscriberSnippets( + SubscriptionName subscription, + MessageReceiver receiver, + RpcFuture done, + Executor executor) { + this.subscription = subscription; + this.receiver = receiver; + this.done = done; + this.executor = executor; + } + + /** + * Example of receiving a specific number of messages. + */ + // [TARGET startAsync()] + public void startAsync() throws Exception { + // [START startAsync] + Subscriber subscriber = Subscriber.newBuilder(subscription, receiver).build(); + subscriber.addListener(new Subscriber.SubscriberListener() { + public void failed(Subscriber.State from, Throwable failure) { + // Handle error. + } + }, executor); + subscriber.startAsync(); + + // Wait for a stop signal. + done.get(); + subscriber.stopAsync().awaitTerminated(); + // [END startAsync] + } +} diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageReceiver.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageReceiver.java index 52dd6ea8262b..15dfedc34aa1 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageReceiver.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageReceiver.java @@ -23,6 +23,25 @@ public interface MessageReceiver { /** * Called when a message is received by the subscriber. The implementation must arrange for {@link * AckReplyConsumer#accept} to be called after processing the {@code message}. + * + *

This {@code MessageReceiver} passes all messages to a {@link BlockingQueue}. + * This method can be called concurrently from multiple threads, + * so it is important that the queue be thread-safe. + * + * This example is for illustration. Implementations may directly process messages + * instead of sending them to queues. + *

 {@code
+   * MessageReceiver receiver = new MessageReceiver() {
+   *   public void receiveMessage(final PubsubMessage message, final AckReplyConsumer consumer) {
+   *     if (blockingQueue.offer(message)) {
+   *       consumer.accept(AckReply.ACK, null);
+   *     } else {
+   *       consumer.accept(AckReply.NACK, null);
+   *     }
+   *   }
+   * };
+   * }
+ * */ void receiveMessage(final PubsubMessage message, final AckReplyConsumer consumer); } diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java index 946fe8aeef8f..902226db408f 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java @@ -75,36 +75,6 @@ * *

If no credentials are provided, the {@link Subscriber} will use application default * credentials through {@link GoogleCredentials#getApplicationDefault}. - * - *

For example, a {@link Subscriber} can be constructed and used to receive messages as follows: - * - *


- * MessageReceiver receiver = new MessageReceiver() {
- *   @Override
- *   public void receiveMessage(PubsubMessage message, SettableFuture<AckReply> response) {
- *     // ... process message ...
- *     return response.set(AckReply.ACK);
- *   }
- * }
- *
- * Subscriber subscriber =
- *     Subscriber.newBuilder(MY_SUBSCRIPTION, receiver)
- *         .setMaxBundleAcks(100)
- *         .build();
- *
- * subscriber.startAsync();
- *
- * // ... recommended, listen for fatal errors that break the subscriber streaming ...
- * subscriber.addListener(new Listener() {
- *   @Override
- *   public void failed(State from, Throwable failure) {
- *     System.out.println("Subscriber failed with error: " + failure);
- *   }
- * }, Executors.newSingleThreadExecutor());
- *
- * // ... and when done with the subscriber ...
- * subscriber.stopAsync();
- * 
*/ public class Subscriber { private static final int THREADS_PER_CHANNEL = 5; @@ -207,6 +177,25 @@ public boolean isRunning() { return impl.isRunning(); } + /** + * Initiates service startup and returns immediately. + * + *

Example of receiving a specific number of messages. + *

 {@code
+   * Subscriber subscriber = Subscriber.newBuilder(subscription, receiver).build();
+   * subscriber.addListener(new Subscriber.SubscriberListener() {
+   *   public void failed(Subscriber.State from, Throwable failure) {
+   *     // Handle error.
+   *   }
+   * }, executor);
+   * subscriber.startAsync();
+   * 
+   * // Wait for a stop signal.
+   * done.get();
+   * subscriber.stopAsync().awaitTerminated();
+   * }
+ * + */ public Subscriber startAsync() { impl.startAsync(); return this;