From f5bb1033e3f2856aca696f1be6a7183abd66e486 Mon Sep 17 00:00:00 2001 From: Paddy Byers Date: Fri, 13 Nov 2015 08:30:33 +0000 Subject: [PATCH 1/4] Add support for batch rest publish API --- .../main/java/io/ably/lib/rest/AblyRest.java | 66 ++++++-- .../main/java/io/ably/lib/types/Message.java | 38 +++++ .../io/ably/lib/types/MessageSerializer.java | 28 +++- .../io/ably/lib/types/PublishResponse.java | 150 +++++++++++++++++ .../test/rest/RestChannelBulkPublishTest.java | 153 ++++++++++++++++++ .../java/io/ably/lib/test/rest/RestSuite.java | 1 + 6 files changed, 423 insertions(+), 13 deletions(-) create mode 100644 lib/src/main/java/io/ably/lib/types/PublishResponse.java create mode 100644 lib/src/test/java/io/ably/lib/test/rest/RestChannelBulkPublishTest.java diff --git a/lib/src/main/java/io/ably/lib/rest/AblyRest.java b/lib/src/main/java/io/ably/lib/rest/AblyRest.java index eb2542928..681e412a9 100644 --- a/lib/src/main/java/io/ably/lib/rest/AblyRest.java +++ b/lib/src/main/java/io/ably/lib/rest/AblyRest.java @@ -12,18 +12,8 @@ import io.ably.lib.http.HttpPaginatedQuery; import io.ably.lib.http.HttpUtils; import io.ably.lib.http.PaginatedQuery; -import io.ably.lib.types.AblyException; -import io.ably.lib.types.AsyncHttpPaginatedResponse; -import io.ably.lib.types.AsyncPaginatedResult; -import io.ably.lib.types.Callback; -import io.ably.lib.types.ChannelOptions; -import io.ably.lib.types.ClientOptions; -import io.ably.lib.types.ErrorInfo; -import io.ably.lib.types.HttpPaginatedResponse; -import io.ably.lib.types.PaginatedResult; -import io.ably.lib.types.Param; -import io.ably.lib.types.Stats; -import io.ably.lib.types.StatsReader; +import io.ably.lib.types.*; +import io.ably.lib.util.Crypto; import io.ably.lib.util.Log; import io.ably.lib.util.Serialisation; @@ -225,4 +215,56 @@ protected void onAuthUpdated(String token, boolean waitForResponse) throws AblyE protected void onAuthError(ErrorInfo errorInfo) { /* Default is to do nothing. Overridden by subclass. */ } + + /** + * Publish a messages on one or more channels. When there are + * messages to be sent on multiple channels simultaneously, + * it is more efficient to use this method to publish them in + * a single request, as compared with publishing via multiple + * independent requests. + * @throws AblyException + */ + public PublishResponse[] publish(Message.Batch[] pubSpecs, ChannelOptions channelOptions) throws AblyException { + return publishImpl(pubSpecs, channelOptions).sync(); + } + + public void publishAsync(Message.Batch[] pubSpecs, ChannelOptions channelOptions, final Callback callback) throws AblyException { + publishImpl(pubSpecs, channelOptions).async(callback); + } + + private Http.Request publishImpl(final Message.Batch[] pubSpecs, ChannelOptions channelOptions) throws AblyException { + boolean hasClientSuppliedId = false; + for(Message.Batch spec : pubSpecs) { + for(Message message : spec.messages) { + /* handle message ids */ + /* RSL1k2 */ + hasClientSuppliedId |= (message.id != null); + /* RTL6g3 */ + auth.checkClientId(message, true, false); + message.encode(channelOptions); + } + if(!hasClientSuppliedId && options.idempotentRestPublishing) { + /* RSL1k1: populate the message id with a library-generated id */ + String messageId = Crypto.getRandomMessageId(); + for (int i = 0; i < spec.messages.length; i++) { + spec.messages[i].id = messageId + ':' + i; + } + } + } + return http.request(new Http.Execute() { + @Override + public void execute(HttpScheduler http, final Callback callback) throws AblyException { + HttpCore.RequestBody requestBody = options.useBinaryProtocol ? MessageSerializer.asMsgpackRequest(pubSpecs) : MessageSerializer.asJSONRequest(pubSpecs); + http.post("/messages", HttpUtils.defaultAcceptHeaders(options.useBinaryProtocol), null, requestBody, new HttpCore.ResponseHandler() { + @Override + public PublishResponse[] handleResponse(HttpCore.Response response, ErrorInfo error) throws AblyException { + if(error != null) { + throw AblyException.fromErrorInfo(error); + } + return PublishResponse.getBulkPublishResponseHandler(response.statusCode).handleResponseBody(response.contentType, response.body); + } + }, true, callback); + } + }); + } } diff --git a/lib/src/main/java/io/ably/lib/types/Message.java b/lib/src/main/java/io/ably/lib/types/Message.java index e3c6bec3f..19bee8e5f 100644 --- a/lib/src/main/java/io/ably/lib/types/Message.java +++ b/lib/src/main/java/io/ably/lib/types/Message.java @@ -2,6 +2,7 @@ import java.io.IOException; import java.lang.reflect.Type; +import java.util.Collection; import org.msgpack.core.MessageFormat; import org.msgpack.core.MessagePacker; @@ -92,6 +93,43 @@ Message readMsgpack(MessageUnpacker unpacker) throws IOException { return this; } + /** + * A specification for a collection of messages to be sent using the batch API + * @author paddy + * + */ + public static class Batch { + public String[] channels; + public Message[] messages; + + public Batch(String channel, Message[] messages) { + if(channel == null || channel.isEmpty()) throw new IllegalArgumentException("A Batch spec cannot have an empty set of channels"); + if(messages == null || messages.length == 0) throw new IllegalArgumentException("A Batch spec cannot have an empty set of messages"); + this.channels = new String[] { channel }; + this.messages = messages; + } + + public Batch(String[] channels, Message[] messages) { + if(channels == null || channels.length == 0) throw new IllegalArgumentException("A Batch spec cannot have an empty set of channels"); + if(messages == null || messages.length == 0) throw new IllegalArgumentException("A Batch spec cannot have an empty set of messages"); + this.channels = channels; + this.messages = messages; + } + + public Batch(Collection channels, Collection messages) { + this(channels.toArray(new String[channels.size()]), messages.toArray(new Message[messages.size()])); + } + + public void writeMsgpack(MessagePacker packer) throws IOException { + packer.packMapHeader(2); + packer.packString("channels"); + packer.packArrayHeader(channels.length); + for(String ch : channels) packer.packString(ch); + packer.packString("messages"); + MessageSerializer.writeMsgpackArray(messages, packer); + } + } + static Message fromMsgpack(MessageUnpacker unpacker) throws IOException { return (new Message()).readMsgpack(unpacker); } diff --git a/lib/src/main/java/io/ably/lib/types/MessageSerializer.java b/lib/src/main/java/io/ably/lib/types/MessageSerializer.java index 8c444fc74..dde4a13b2 100644 --- a/lib/src/main/java/io/ably/lib/types/MessageSerializer.java +++ b/lib/src/main/java/io/ably/lib/types/MessageSerializer.java @@ -70,6 +70,29 @@ public static void writeMsgpackArray(Message[] messages, MessagePacker packer) { } catch(IOException e) {} } + public static HttpCore.RequestBody asMsgpackRequest(Message.Batch[] pubSpecs) { + return new HttpUtils.ByteArrayRequestBody(writeMsgpackArray(pubSpecs), "application/x-msgpack"); + } + + static byte[] writeMsgpackArray(Message.Batch[] pubSpecs) { + try { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + MessagePacker packer = Serialisation.msgpackPackerConfig.newPacker(out); + writeMsgpackArray(pubSpecs, packer); + packer.flush(); + return out.toByteArray(); + } catch(IOException e) { return null; } + } + + static void writeMsgpackArray(Message.Batch[] pubSpecs, MessagePacker packer) throws IOException { + try { + int count = pubSpecs.length; + packer.packArrayHeader(count); + for(Message.Batch spec : pubSpecs) + spec.writeMsgpack(packer); + } catch(IOException e) {} + } + /**************************************** * JSON decode ****************************************/ @@ -90,6 +113,10 @@ public static HttpCore.RequestBody asJsonRequest(Message[] messages) { return new HttpUtils.JsonRequestBody(Serialisation.gson.toJson(messages)); } + public static HttpCore.RequestBody asJSONRequest(Message.Batch[] pubSpecs) { + return new HttpUtils.JsonRequestBody(Serialisation.gson.toJson(pubSpecs)); + } + /**************************************** * BodyHandler ****************************************/ @@ -129,6 +156,5 @@ else if("application/x-msgpack".equals(contentType)) } private static HttpCore.BodyHandler messageResponseHandler = new MessageBodyHandler(null); - private static final String TAG = MessageSerializer.class.getName(); } diff --git a/lib/src/main/java/io/ably/lib/types/PublishResponse.java b/lib/src/main/java/io/ably/lib/types/PublishResponse.java new file mode 100644 index 000000000..072db4b48 --- /dev/null +++ b/lib/src/main/java/io/ably/lib/types/PublishResponse.java @@ -0,0 +1,150 @@ +package io.ably.lib.types; + +import io.ably.lib.http.HttpCore; +import io.ably.lib.util.Log; +import io.ably.lib.util.Serialisation; +import org.msgpack.core.MessageFormat; +import org.msgpack.core.MessageUnpacker; + +import java.io.IOException; + +/**************************************** + * PublishResponse + ****************************************/ + +public class PublishResponse { + public ErrorInfo error; + public String channel; + public String id; + + private static PublishResponse[] fromJSONArray(byte[] json) { + return Serialisation.gson.fromJson(new String(json), PublishResponse[].class); + } + + private static PublishResponse fromMsgpack(MessageUnpacker unpacker) throws IOException { + return (new PublishResponse()).readMsgpack(unpacker); + } + + private static PublishResponse[] fromMsgpackArray(byte[] msgpack) throws IOException { + MessageUnpacker unpacker = Serialisation.msgpackUnpackerConfig.newUnpacker(msgpack); + return fromMsgpackArray(unpacker); + } + + private static PublishResponse[] fromMsgpackArray(MessageUnpacker unpacker) throws IOException { + int count = unpacker.unpackArrayHeader(); + PublishResponse[] result = new PublishResponse[count]; + for(int j = 0; j < count; j++) { + result[j] = PublishResponse.fromMsgpack(unpacker); + } + return result; + } + + private PublishResponse readMsgpack(MessageUnpacker unpacker) throws IOException { + int fieldCount = unpacker.unpackMapHeader(); + for(int i = 0; i < fieldCount; i++) { + String fieldName = unpacker.unpackString().intern(); + MessageFormat fieldFormat = unpacker.getNextFormat(); + if(fieldFormat.equals(MessageFormat.NIL)) { unpacker.unpackNil(); continue; } + + switch(fieldName) { + case "error": + error = ErrorInfo.fromMsgpack(unpacker); + break; + case "channel": + channel = unpacker.unpackString(); + break; + case "id": + id = unpacker.unpackString(); + break; + default: + Log.v(TAG, "Unexpected field: " + fieldName); + unpacker.skipValue(); + } + } + return this; + } + + public static HttpCore.BodyHandler getBulkPublishResponseHandler(int statusCode) { + return (statusCode < 300) ? bulkResponseBodyHandler : batchErrorBodyHandler; + } + + private static class BatchErrorResponse { + public ErrorInfo error; + public PublishResponse[] batchResponse; + + static BatchErrorResponse readJSON(byte[] json) { + return Serialisation.gson.fromJson(new String(json), BatchErrorResponse.class); + } + + static BatchErrorResponse readMsgpack(byte[] msgpack) throws IOException { + MessageUnpacker unpacker = Serialisation.msgpackUnpackerConfig.newUnpacker(msgpack); + return (new BatchErrorResponse()).readMsgpack(unpacker); + } + + BatchErrorResponse readMsgpack(MessageUnpacker unpacker) throws IOException { + int fieldCount = unpacker.unpackMapHeader(); + for(int i = 0; i < fieldCount; i++) { + String fieldName = unpacker.unpackString().intern(); + MessageFormat fieldFormat = unpacker.getNextFormat(); + if(fieldFormat.equals(MessageFormat.NIL)) { unpacker.unpackNil(); continue; } + + switch(fieldName) { + case "error": + error = ErrorInfo.fromMsgpack(unpacker); + break; + case "batchResponse": + batchResponse = PublishResponse.fromMsgpackArray(unpacker); + break; + default: + Log.v(TAG, "Unexpected field: " + fieldName); + unpacker.skipValue(); + } + } + return this; + } + } + + private static class BulkResponseBodyHandler implements HttpCore.BodyHandler { + @Override + public PublishResponse[] handleResponseBody(String contentType, byte[] body) throws AblyException { + try { + if("application/json".equals(contentType)) { + return PublishResponse.fromJSONArray(body); + } else if("application/x-msgpack".equals(contentType)) { + return PublishResponse.fromMsgpackArray(body); + } + return null; + } catch(IOException e) { + throw AblyException.fromThrowable(e); + } + } + } + + private static class BatchErrorBodyHandler implements HttpCore.BodyHandler { + @Override + public PublishResponse[] handleResponseBody(String contentType, byte[] body) throws AblyException { + try { + BatchErrorResponse response = null; + if("application/json".equals(contentType)) { + response = BatchErrorResponse.readJSON(body); + } else if("application/x-msgpack".equals(contentType)) { + response = BatchErrorResponse.readMsgpack(body); + } + if(response == null) { + return null; + } + if(response.error != null && response.error.code != 40020) { + throw AblyException.fromErrorInfo(response.error); + } + return response.batchResponse; + } catch(IOException e) { + throw AblyException.fromThrowable(e); + } + } + } + + private static HttpCore.BodyHandler batchErrorBodyHandler = new BatchErrorBodyHandler(); + private static HttpCore.BodyHandler bulkResponseBodyHandler = new BulkResponseBodyHandler(); + + private static final String TAG = MessageSerializer.class.getName(); +} diff --git a/lib/src/test/java/io/ably/lib/test/rest/RestChannelBulkPublishTest.java b/lib/src/test/java/io/ably/lib/test/rest/RestChannelBulkPublishTest.java new file mode 100644 index 000000000..ebe2bb0cc --- /dev/null +++ b/lib/src/test/java/io/ably/lib/test/rest/RestChannelBulkPublishTest.java @@ -0,0 +1,153 @@ +package io.ably.lib.test.rest; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Random; + +import io.ably.lib.test.common.ParameterizedTest; +import io.ably.lib.types.*; +import org.junit.Before; +import org.junit.Test; + +import io.ably.lib.rest.AblyRest; + +public class RestChannelBulkPublishTest extends ParameterizedTest { + + private AblyRest ably; + + @Before + public void setUpBefore() throws Exception { + ClientOptions opts = createOptions(testVars.keys[0].keyStr); + ably = new AblyRest(opts); + } + + /** + * Publish a single message on multiple channels + * + * The payload constructed has the form + * [ + * { + * channel: [ , , ... ], + * message: [{ data: }] + * } + * ] + * + * It publishes the given message on all of the given channels. + */ + @Test + public void bulk_publish_multiple_channels_simple() { + /* first, publish some messages */ + int channelCount = 5; + ArrayList channels = new ArrayList(); + for(int i = 0; i < channelCount; i++) + channels.add("persisted:" + randomString()); + + Message message = new Message(null, "bulk_publish_multiple_channels_simple"); + Message.Batch payload = new Message.Batch(channels, Collections.singleton(message)); + + try { + PublishResponse[] result = ably.publish(new Message.Batch[] { payload }, null); + } catch(AblyException e) { + e.printStackTrace(); + fail("bulkpublish_multiple_channels_simple: Unexpected exception"); + return; + } + + /* get the history for this channel */ + try { + for(String channel : channels) { + PaginatedResult messages = ably.channels.get(channel).history(null); + assertNotNull("Expected non-null messages", messages); + assertEquals("Expected 1 message", messages.items().length, 1); + /* verify message contents */ + assertEquals("Expect message data to be expected String", messages.items()[0].data, message.data); + } + } catch (AblyException e) { + e.printStackTrace(); + fail("bulkpublish_multiple_channels_simple: Unexpected exception"); + return; + } + } + + /** + * Publish a multiple messages on multiple channels + * + * The payload constructed has the form + * [ + * { + * channel: [ ], + * message: [ + * { data: }, + * { data: }, + * { data: }, + * ... + * ] + * }, + * { + * channel: [ ], + * message: [ + * { data: }, + * { data: }, + * { data: }, + * ... + * ] + * }, + * ... + * ] + * + * It publishes the given messages on the associated channels. + */ + @Test + public void bulk_publish_multiple_channels_multiple_messages() { + /* first, publish some messages */ + int channelCount = 5; + int messageCount = 6; + String baseMessageText = "bulk_publish_multiple_channels_multiple_messages"; + ArrayList payload = new ArrayList(); + + ArrayList rndMessageTexts = new ArrayList(); + for(int i = 0; i < messageCount; i++) { + rndMessageTexts.add(randomString()); + } + + ArrayList channels = new ArrayList(); + for(int i = 0; i < channelCount; i++) { + String channel = "persisted:" + randomString(); + channels.add(channel); + ArrayList messages = new ArrayList(); + for(int j = 0; j < messageCount; j++) + messages.add(new Message(null, baseMessageText + '-' + channel + '-' + rndMessageTexts.get(j))); + payload.add(new Message.Batch(Collections.singleton(channel), messages)); + } + + try { + ably.publish(payload.toArray(new Message.Batch[payload.size()]), null); + } catch(AblyException e) { + e.printStackTrace(); + fail("bulk_publish_multiple_channels_multiple_messages: Unexpected exception"); + return; + } + + /* get the history for this channel */ + try { + for(String channel : channels) { + PaginatedResult messages = ably.channels.get(channel).history(new Param[] {new Param("direction", "forwards")}); + assertNotNull("Expected non-null messages", messages); + assertEquals("Expected correct number of messages", messages.items().length, messageCount); + /* verify message contents */ + for(int i = 0; i < messageCount; i++) + assertEquals("Expect message data to be expected String", messages.items()[i].data, baseMessageText + '-' + channel + '-' + rndMessageTexts.get(i)); + } + } catch (AblyException e) { + e.printStackTrace(); + fail("bulk_publish_multiple_channels_multiple_messages: Unexpected exception"); + return; + } + } + + private String randomString() { return String.valueOf(new Random().nextDouble()).substring(2); } +} diff --git a/lib/src/test/java/io/ably/lib/test/rest/RestSuite.java b/lib/src/test/java/io/ably/lib/test/rest/RestSuite.java index fd22a2613..45d8ab2a6 100644 --- a/lib/src/test/java/io/ably/lib/test/rest/RestSuite.java +++ b/lib/src/test/java/io/ably/lib/test/rest/RestSuite.java @@ -27,6 +27,7 @@ RestChannelTest.class, RestChannelHistoryTest.class, RestChannelPublishTest.class, + RestChannelBulkPublishTest.class, RestCryptoTest.class, RestPresenceTest.class, RestProxyTest.class, From c8226394237d358e4bea152e2818acc0a0533ddf Mon Sep 17 00:00:00 2001 From: Paddy Byers Date: Tue, 11 Dec 2018 14:15:33 +0000 Subject: [PATCH 2/4] Rename rest.publish() -> rest.publishBatch(); add Experimental annotation --- .../main/java/io/ably/annotation/Experimental.java | 10 ++++++++++ lib/src/main/java/io/ably/lib/rest/AblyRest.java | 13 ++++++++----- .../lib/test/rest/RestChannelBulkPublishTest.java | 4 ++-- 3 files changed, 20 insertions(+), 7 deletions(-) create mode 100644 lib/src/main/java/io/ably/annotation/Experimental.java diff --git a/lib/src/main/java/io/ably/annotation/Experimental.java b/lib/src/main/java/io/ably/annotation/Experimental.java new file mode 100644 index 000000000..23a7393f4 --- /dev/null +++ b/lib/src/main/java/io/ably/annotation/Experimental.java @@ -0,0 +1,10 @@ +package io.ably.annotation; + +import java.lang.annotation.Documented; + +/** + * An annotation indicating an experimental API. Any or all detail of this + * API are subject to change in the future. Feedback on the API is welcomed. + */ +@Documented +public @interface Experimental { } diff --git a/lib/src/main/java/io/ably/lib/rest/AblyRest.java b/lib/src/main/java/io/ably/lib/rest/AblyRest.java index 681e412a9..e0abfdd10 100644 --- a/lib/src/main/java/io/ably/lib/rest/AblyRest.java +++ b/lib/src/main/java/io/ably/lib/rest/AblyRest.java @@ -2,6 +2,7 @@ import java.util.HashMap; +import io.ably.annotation.Experimental; import io.ably.lib.http.AsyncHttpScheduler; import io.ably.lib.http.Http; import io.ably.lib.http.HttpCore; @@ -224,15 +225,17 @@ protected void onAuthError(ErrorInfo errorInfo) { * independent requests. * @throws AblyException */ - public PublishResponse[] publish(Message.Batch[] pubSpecs, ChannelOptions channelOptions) throws AblyException { - return publishImpl(pubSpecs, channelOptions).sync(); + @Experimental + public PublishResponse[] publishBatch(Message.Batch[] pubSpecs, ChannelOptions channelOptions) throws AblyException { + return publishBatchImpl(pubSpecs, channelOptions).sync(); } - public void publishAsync(Message.Batch[] pubSpecs, ChannelOptions channelOptions, final Callback callback) throws AblyException { - publishImpl(pubSpecs, channelOptions).async(callback); + @Experimental + public void publishBatchAsync(Message.Batch[] pubSpecs, ChannelOptions channelOptions, final Callback callback) throws AblyException { + publishBatchImpl(pubSpecs, channelOptions).async(callback); } - private Http.Request publishImpl(final Message.Batch[] pubSpecs, ChannelOptions channelOptions) throws AblyException { + private Http.Request publishBatchImpl(final Message.Batch[] pubSpecs, ChannelOptions channelOptions) throws AblyException { boolean hasClientSuppliedId = false; for(Message.Batch spec : pubSpecs) { for(Message message : spec.messages) { diff --git a/lib/src/test/java/io/ably/lib/test/rest/RestChannelBulkPublishTest.java b/lib/src/test/java/io/ably/lib/test/rest/RestChannelBulkPublishTest.java index ebe2bb0cc..722dabbdf 100644 --- a/lib/src/test/java/io/ably/lib/test/rest/RestChannelBulkPublishTest.java +++ b/lib/src/test/java/io/ably/lib/test/rest/RestChannelBulkPublishTest.java @@ -50,7 +50,7 @@ public void bulk_publish_multiple_channels_simple() { Message.Batch payload = new Message.Batch(channels, Collections.singleton(message)); try { - PublishResponse[] result = ably.publish(new Message.Batch[] { payload }, null); + PublishResponse[] result = ably.publishBatch(new Message.Batch[] { payload }, null); } catch(AblyException e) { e.printStackTrace(); fail("bulkpublish_multiple_channels_simple: Unexpected exception"); @@ -125,7 +125,7 @@ public void bulk_publish_multiple_channels_multiple_messages() { } try { - ably.publish(payload.toArray(new Message.Batch[payload.size()]), null); + ably.publishBatch(payload.toArray(new Message.Batch[payload.size()]), null); } catch(AblyException e) { e.printStackTrace(); fail("bulk_publish_multiple_channels_multiple_messages: Unexpected exception"); From 42c451dac0710453dd47af4a92214478607351e5 Mon Sep 17 00:00:00 2001 From: Paddy Byers Date: Wed, 12 Dec 2018 16:32:30 +0000 Subject: [PATCH 3/4] Bulk publish: fix handling of partial errors, and improve tests --- .../main/java/io/ably/lib/http/HttpCore.java | 104 +++++----- .../main/java/io/ably/lib/rest/AblyRest.java | 46 ++--- .../java/io/ably/lib/types/ErrorInfo.java | 26 +++ .../io/ably/lib/types/PublishResponse.java | 6 +- .../test/rest/RestChannelBulkPublishTest.java | 180 ++++++++++++------ lib/src/test/resources/local/testAppSpec.json | 3 + .../test/resources/local/testAppSpec.json.src | 9 + 7 files changed, 242 insertions(+), 132 deletions(-) diff --git a/lib/src/main/java/io/ably/lib/http/HttpCore.java b/lib/src/main/java/io/ably/lib/http/HttpCore.java index 839e6f77e..e76437962 100644 --- a/lib/src/main/java/io/ably/lib/http/HttpCore.java +++ b/lib/src/main/java/io/ably/lib/http/HttpCore.java @@ -273,10 +273,22 @@ private T handleResponse(HttpURLConnection conn, boolean credentialsIncluded throw AblyException.fromErrorInfo(error); } - if(response.statusCode < 200 || response.statusCode >= 300) { - /* get any in-body error details */ - ErrorInfo error = null; - if(response.body != null && response.body.length > 0) { + if(response.statusCode >= 200 && response.statusCode < 300) { + return (responseHandler != null) ? responseHandler.handleResponse(response, null) : null; + } + + /* get any in-body error details */ + ErrorInfo error = null; + if(response.body != null && response.body.length > 0) { + if(response.contentType != null && response.contentType.contains("msgpack")) { + try { + error = ErrorInfo.fromMsgpackBody(response.body); + } catch (IOException e) { + /* error pages aren't necessarily going to satisfy our Accept criteria ... */ + System.err.println("Unable to parse msgpack error response"); + } + } else { + /* assume json */ String bodyText = new String(response.body); try { ErrorResponse errorResponse = ErrorResponse.fromJSON(bodyText); @@ -288,62 +300,56 @@ private T handleResponse(HttpURLConnection conn, boolean credentialsIncluded System.err.println("Error message in unexpected format: " + bodyText); } } + } - /* handle error details in header instead of body */ - if(error == null) { - String errorCodeHeader = conn.getHeaderField("X-Ably-ErrorCode"); - String errorMessageHeader = conn.getHeaderField("X-Ably-ErrorMessage"); - if(errorCodeHeader != null) { - try { - error = new ErrorInfo(errorMessageHeader, response.statusCode, Integer.parseInt(errorCodeHeader)); - } catch(NumberFormatException e) {} - } + /* handle error details in header */ + if(error == null) { + String errorCodeHeader = conn.getHeaderField("X-Ably-ErrorCode"); + String errorMessageHeader = conn.getHeaderField("X-Ably-ErrorMessage"); + if(errorCodeHeader != null) { + try { + error = new ErrorInfo(errorMessageHeader, response.statusCode, Integer.parseInt(errorCodeHeader)); + } catch(NumberFormatException e) {} } + } - /* handle www-authenticate */ - if(response.statusCode == 401) { - boolean stale = (error != null && error.code == 40140); - List wwwAuthHeaders = response.getHeaderFields(HttpConstants.Headers.WWW_AUTHENTICATE); - if(wwwAuthHeaders != null && wwwAuthHeaders.size() > 0) { - Map headersByType = HttpAuth.sortAuthenticateHeaders(wwwAuthHeaders); - String tokenHeader = headersByType.get(HttpAuth.Type.X_ABLY_TOKEN); - if(tokenHeader != null) { stale |= (tokenHeader.indexOf("stale") > -1); } - AuthRequiredException exception = new AuthRequiredException(null, error); - exception.authChallenge = headersByType; - if(stale) { - exception.expired = true; - throw exception; - } - if(!credentialsIncluded) { - throw exception; - } + /* handle www-authenticate */ + if(response.statusCode == 401) { + boolean stale = (error != null && error.code == 40140); + List wwwAuthHeaders = response.getHeaderFields(HttpConstants.Headers.WWW_AUTHENTICATE); + if(wwwAuthHeaders != null && wwwAuthHeaders.size() > 0) { + Map headersByType = HttpAuth.sortAuthenticateHeaders(wwwAuthHeaders); + String tokenHeader = headersByType.get(HttpAuth.Type.X_ABLY_TOKEN); + if(tokenHeader != null) { stale |= (tokenHeader.indexOf("stale") > -1); } + AuthRequiredException exception = new AuthRequiredException(null, error); + exception.authChallenge = headersByType; + if(stale) { + exception.expired = true; + throw exception; } - } - /* handle proxy-authenticate */ - if(response.statusCode == 407) { - List proxyAuthHeaders = response.getHeaderFields(HttpConstants.Headers.PROXY_AUTHENTICATE); - if(proxyAuthHeaders != null && proxyAuthHeaders.size() > 0) { - AuthRequiredException exception = new AuthRequiredException(null, error); - exception.proxyAuthChallenge = HttpAuth.sortAuthenticateHeaders(proxyAuthHeaders); + if(!credentialsIncluded) { throw exception; } } - if(error == null) { - error = ErrorInfo.fromResponseStatus(response.statusLine, response.statusCode); - } else { - } - Log.e(TAG, "Error response from server: err = " + error.toString()); - if(responseHandler != null) { - return responseHandler.handleResponse(response, error); + } + /* handle proxy-authenticate */ + if(response.statusCode == 407) { + List proxyAuthHeaders = response.getHeaderFields(HttpConstants.Headers.PROXY_AUTHENTICATE); + if(proxyAuthHeaders != null && proxyAuthHeaders.size() > 0) { + AuthRequiredException exception = new AuthRequiredException(null, error); + exception.proxyAuthChallenge = HttpAuth.sortAuthenticateHeaders(proxyAuthHeaders); + throw exception; } - throw AblyException.fromErrorInfo(error); } - + if(error == null) { + error = ErrorInfo.fromResponseStatus(response.statusLine, response.statusCode); + } else { + } + Log.e(TAG, "Error response from server: err = " + error.toString()); if(responseHandler != null) { - return responseHandler.handleResponse(response, null); + return responseHandler.handleResponse(response, error); } - - return null; + throw AblyException.fromErrorInfo(error); } /** diff --git a/lib/src/main/java/io/ably/lib/rest/AblyRest.java b/lib/src/main/java/io/ably/lib/rest/AblyRest.java index e0abfdd10..f855abee6 100644 --- a/lib/src/main/java/io/ably/lib/rest/AblyRest.java +++ b/lib/src/main/java/io/ably/lib/rest/AblyRest.java @@ -195,28 +195,6 @@ public void requestAsync(String method, String path, Param[] params, HttpCore.Re (new AsyncHttpPaginatedQuery(http, method, path, headers, params, body)).exec(callback); } - /** - * Authentication token has changed. waitForResult is true if there is a need to - * wait for server response to auth request - */ - - /** - * Override this method in AblyRealtime and pass updated token to ConnectionManager - * @param token new token - * @param waitForResponse wait for server response before returning from method - * @throws AblyException - */ - protected void onAuthUpdated(String token, boolean waitForResponse) throws AblyException { - /* Default is to do nothing. Overridden by subclass. */ - } - - /** - * Authentication error occurred - */ - protected void onAuthError(ErrorInfo errorInfo) { - /* Default is to do nothing. Overridden by subclass. */ - } - /** * Publish a messages on one or more channels. When there are * messages to be sent on multiple channels simultaneously, @@ -261,7 +239,7 @@ public void execute(HttpScheduler http, final Callback callba http.post("/messages", HttpUtils.defaultAcceptHeaders(options.useBinaryProtocol), null, requestBody, new HttpCore.ResponseHandler() { @Override public PublishResponse[] handleResponse(HttpCore.Response response, ErrorInfo error) throws AblyException { - if(error != null) { + if(error != null && error.code != 40020) { throw AblyException.fromErrorInfo(error); } return PublishResponse.getBulkPublishResponseHandler(response.statusCode).handleResponseBody(response.contentType, response.body); @@ -270,4 +248,26 @@ public PublishResponse[] handleResponse(HttpCore.Response response, ErrorInfo er } }); } + + /** + * Authentication token has changed. waitForResult is true if there is a need to + * wait for server response to auth request + */ + + /** + * Override this method in AblyRealtime and pass updated token to ConnectionManager + * @param token new token + * @param waitForResponse wait for server response before returning from method + * @throws AblyException + */ + protected void onAuthUpdated(String token, boolean waitForResponse) throws AblyException { + /* Default is to do nothing. Overridden by subclass. */ + } + + /** + * Authentication error occurred + */ + protected void onAuthError(ErrorInfo errorInfo) { + /* Default is to do nothing. Overridden by subclass. */ + } } diff --git a/lib/src/main/java/io/ably/lib/types/ErrorInfo.java b/lib/src/main/java/io/ably/lib/types/ErrorInfo.java index b8cae2c1b..b86226ae8 100644 --- a/lib/src/main/java/io/ably/lib/types/ErrorInfo.java +++ b/lib/src/main/java/io/ably/lib/types/ErrorInfo.java @@ -1,5 +1,6 @@ package io.ably.lib.types; +import io.ably.lib.util.Serialisation; import org.msgpack.core.MessageFormat; import org.msgpack.core.MessageUnpacker; @@ -108,6 +109,31 @@ ErrorInfo readMsgpack(MessageUnpacker unpacker) throws IOException { return this; } + public static ErrorInfo fromMsgpackBody(byte[] msgpack) throws IOException { + MessageUnpacker unpacker = Serialisation.msgpackUnpackerConfig.newUnpacker(msgpack); + return fromMsgpackBody(unpacker); + } + + private static ErrorInfo fromMsgpackBody(MessageUnpacker unpacker) throws IOException { + int fieldCount = unpacker.unpackMapHeader(); + ErrorInfo error = null; + for(int i = 0; i < fieldCount; i++) { + String fieldName = unpacker.unpackString().intern(); + MessageFormat fieldFormat = unpacker.getNextFormat(); + if(fieldFormat.equals(MessageFormat.NIL)) { unpacker.unpackNil(); continue; } + + switch(fieldName) { + case "error": + error = ErrorInfo.fromMsgpack(unpacker); + break; + default: + Log.v(TAG, "Unexpected field: " + fieldName); + unpacker.skipValue(); + } + } + return error; + } + static ErrorInfo fromMsgpack(MessageUnpacker unpacker) throws IOException { return (new ErrorInfo()).readMsgpack(unpacker); } diff --git a/lib/src/main/java/io/ably/lib/types/PublishResponse.java b/lib/src/main/java/io/ably/lib/types/PublishResponse.java index 072db4b48..bbac211fb 100644 --- a/lib/src/main/java/io/ably/lib/types/PublishResponse.java +++ b/lib/src/main/java/io/ably/lib/types/PublishResponse.java @@ -15,7 +15,7 @@ public class PublishResponse { public ErrorInfo error; public String channel; - public String id; + public String messageId; private static PublishResponse[] fromJSONArray(byte[] json) { return Serialisation.gson.fromJson(new String(json), PublishResponse[].class); @@ -53,8 +53,8 @@ private PublishResponse readMsgpack(MessageUnpacker unpacker) throws IOException case "channel": channel = unpacker.unpackString(); break; - case "id": - id = unpacker.unpackString(); + case "messageId": + messageId = unpacker.unpackString(); break; default: Log.v(TAG, "Unexpected field: " + fieldName); diff --git a/lib/src/test/java/io/ably/lib/test/rest/RestChannelBulkPublishTest.java b/lib/src/test/java/io/ably/lib/test/rest/RestChannelBulkPublishTest.java index 722dabbdf..2f5b152cf 100644 --- a/lib/src/test/java/io/ably/lib/test/rest/RestChannelBulkPublishTest.java +++ b/lib/src/test/java/io/ably/lib/test/rest/RestChannelBulkPublishTest.java @@ -1,30 +1,21 @@ package io.ably.lib.test.rest; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.fail; - import java.util.ArrayList; import java.util.Collections; import java.util.Random; import io.ably.lib.test.common.ParameterizedTest; import io.ably.lib.types.*; + +import io.ably.lib.rest.AblyRest; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; -import io.ably.lib.rest.AblyRest; +import static org.junit.Assert.*; public class RestChannelBulkPublishTest extends ParameterizedTest { - private AblyRest ably; - - @Before - public void setUpBefore() throws Exception { - ClientOptions opts = createOptions(testVars.keys[0].keyStr); - ably = new AblyRest(opts); - } - /** * Publish a single message on multiple channels * @@ -38,27 +29,33 @@ public void setUpBefore() throws Exception { * * It publishes the given message on all of the given channels. */ + @Ignore // until idempotent publishing is enabled @Test public void bulk_publish_multiple_channels_simple() { - /* first, publish some messages */ - int channelCount = 5; - ArrayList channels = new ArrayList(); - for(int i = 0; i < channelCount; i++) - channels.add("persisted:" + randomString()); + try { + /* setup library instance */ + ClientOptions opts = createOptions(testVars.keys[0].keyStr); + AblyRest ably = new AblyRest(opts); + + /* first, publish some messages */ + int channelCount = 5; + ArrayList channels = new ArrayList(); + for(int i = 0; i < channelCount; i++) { + channels.add("persisted:" + randomString()); + } - Message message = new Message(null, "bulk_publish_multiple_channels_simple"); - Message.Batch payload = new Message.Batch(channels, Collections.singleton(message)); + Message message = new Message(null, "bulk_publish_multiple_channels_simple"); + String messageId = message.id = randomString(); + Message.Batch payload = new Message.Batch(channels, Collections.singleton(message)); - try { PublishResponse[] result = ably.publishBatch(new Message.Batch[] { payload }, null); - } catch(AblyException e) { - e.printStackTrace(); - fail("bulkpublish_multiple_channels_simple: Unexpected exception"); - return; - } + for(PublishResponse response : result) { + assertEquals("Verify expected response id", response.messageId, messageId); + assertTrue("Verify expected channel name", channels.contains(response.channel)); + assertNull("Verify no publish error", response.error); + } - /* get the history for this channel */ - try { + /* get the history for this channel */ for(String channel : channels) { PaginatedResult messages = ably.channels.get(channel).history(null); assertNotNull("Expected non-null messages", messages); @@ -103,44 +100,50 @@ public void bulk_publish_multiple_channels_simple() { */ @Test public void bulk_publish_multiple_channels_multiple_messages() { - /* first, publish some messages */ - int channelCount = 5; - int messageCount = 6; - String baseMessageText = "bulk_publish_multiple_channels_multiple_messages"; - ArrayList payload = new ArrayList(); - - ArrayList rndMessageTexts = new ArrayList(); - for(int i = 0; i < messageCount; i++) { - rndMessageTexts.add(randomString()); - } + try { + /* setup library instance */ + ClientOptions opts = createOptions(testVars.keys[0].keyStr); + opts.idempotentRestPublishing = true; + AblyRest ably = new AblyRest(opts); + + /* first, publish some messages */ + int channelCount = 5; + int messageCount = 6; + String baseMessageText = "bulk_publish_multiple_channels_multiple_messages"; + ArrayList payload = new ArrayList(); + + ArrayList rndMessageTexts = new ArrayList(); + for(int i = 0; i < messageCount; i++) { + rndMessageTexts.add(randomString()); + } - ArrayList channels = new ArrayList(); - for(int i = 0; i < channelCount; i++) { - String channel = "persisted:" + randomString(); - channels.add(channel); - ArrayList messages = new ArrayList(); - for(int j = 0; j < messageCount; j++) - messages.add(new Message(null, baseMessageText + '-' + channel + '-' + rndMessageTexts.get(j))); - payload.add(new Message.Batch(Collections.singleton(channel), messages)); - } + ArrayList channels = new ArrayList(); + for(int i = 0; i < channelCount; i++) { + String channel = "persisted:" + randomString(); + channels.add(channel); + ArrayList messages = new ArrayList(); + for(int j = 0; j < messageCount; j++) { + messages.add(new Message(null, baseMessageText + '-' + channel + '-' + rndMessageTexts.get(j))); + } + payload.add(new Message.Batch(Collections.singleton(channel), messages)); + } - try { - ably.publishBatch(payload.toArray(new Message.Batch[payload.size()]), null); - } catch(AblyException e) { - e.printStackTrace(); - fail("bulk_publish_multiple_channels_multiple_messages: Unexpected exception"); - return; - } + PublishResponse[] result = ably.publishBatch(payload.toArray(new Message.Batch[payload.size()]), null); + for(PublishResponse response : result) { + assertNotNull("Verify expected response id", response.messageId); + assertTrue("Verify expected channel name", channels.contains(response.channel)); + assertNull("Verify no publish error", response.error); + } - /* get the history for this channel */ - try { + /* get the history for this channel */ for(String channel : channels) { PaginatedResult messages = ably.channels.get(channel).history(new Param[] {new Param("direction", "forwards")}); assertNotNull("Expected non-null messages", messages); assertEquals("Expected correct number of messages", messages.items().length, messageCount); /* verify message contents */ - for(int i = 0; i < messageCount; i++) + for(int i = 0; i < messageCount; i++) { assertEquals("Expect message data to be expected String", messages.items()[i].data, baseMessageText + '-' + channel + '-' + rndMessageTexts.get(i)); + } } } catch (AblyException e) { e.printStackTrace(); @@ -149,5 +152,68 @@ public void bulk_publish_multiple_channels_multiple_messages() { } } + /** + * Publish a single message on multiple channels, using credentials + * that are only able to publish to a subset of the channels + * + * The payload constructed has the form + * [ + * { + * channel: [ , , ... ], + * message: [{ data: }] + * } + * ] + * + * It attempts to publishe the given message on all of the given channels. + */ + @Ignore // until idempotent publishing is enabled + @Test + public void bulk_publish_multiple_channels_partial_error() { + try { + /* setup library instance */ + ClientOptions opts = createOptions(testVars.keys[6].keyStr); + AblyRest ably = new AblyRest(opts); + + /* first, publish some messages */ + String baseChannelName = "persisted:" + testParams.name + ":channel"; + int channelCount = 5; + ArrayList channels = new ArrayList(); + for(int i = 0; i < channelCount; i++) { + channels.add(baseChannelName + i); + } + + Message message = new Message(null, "bulk_publish_multiple_channels_partial_error"); + String messageId = message.id = randomString(); + Message.Batch payload = new Message.Batch(channels, Collections.singleton(message)); + + PublishResponse[] result = ably.publishBatch(new Message.Batch[] { payload }, null); + for(PublishResponse response : result) { + if((baseChannelName + "1").compareTo(response.channel) >= 0) { + assertEquals("Verify expected response id", response.messageId, messageId); + assertTrue("Verify expected channel name", channels.contains(response.channel)); + assertNull("Verify no publish error", response.error); + } else { + assertNotNull("Verify expected publish error", response.error); + assertEquals("Verify expected publish error code", response.error.code, 40160); + } + } + + /* get the history for this channel */ + for(String channel : channels) { + if((baseChannelName + "1").compareTo(channel) >= 0) { + PaginatedResult messages = ably.channels.get(channel).history(null); + assertNotNull("Expected non-null messages", messages); + assertEquals("Expected 1 message", messages.items().length, 1); + /* verify message contents */ + assertEquals("Expect message data to be expected String", messages.items()[0].data, message.data); + } + } + } catch (AblyException e) { + e.printStackTrace(); + fail("bulkpublish_multiple_channels_simple: Unexpected exception"); + return; + } + } + private String randomString() { return String.valueOf(new Random().nextDouble()).substring(2); } } diff --git a/lib/src/test/resources/local/testAppSpec.json b/lib/src/test/resources/local/testAppSpec.json index 2f15761ef..1281f69b9 100644 --- a/lib/src/test/resources/local/testAppSpec.json +++ b/lib/src/test/resources/local/testAppSpec.json @@ -16,6 +16,9 @@ { "privileged": true, "capability": "{\"channel0\":[\"publish\"],\"channel1\":[\"publish\"],\"channel2\":[\"publish\",\"subscribe\"],\"channel3\":[\"subscribe\"],\"channel4\":[\"presence\",\"publish\",\"subscribe\"],\"channel5\":[\"presence\"],\"channel6\":[\"*\"]}" + }, + { + "capability": "{\"persisted:text_protocol:channel0\":[\"publish\",\"subscribe\",\"history\"],\"persisted:text_protocol:channel1\":[\"publish\",\"subscribe\",\"history\"],\"persisted:binary_protocol:channel0\":[\"publish\",\"subscribe\",\"history\"],\"persisted:binary_protocol:channel1\":[\"publish\",\"subscribe\",\"history\"],\"persisted:*\":[\"subscribe\",\"history\"]}" } ], "namespaces": [ diff --git a/lib/src/test/resources/local/testAppSpec.json.src b/lib/src/test/resources/local/testAppSpec.json.src index c0f0f94c7..d2bab25a6 100644 --- a/lib/src/test/resources/local/testAppSpec.json.src +++ b/lib/src/test/resources/local/testAppSpec.json.src @@ -40,6 +40,15 @@ channel5:['presence'], channel6:['*'] }) + }, + { /* key 6, has permissions for selected persisted channels */ + capability: JSON.stringify({ + 'persisted:text_protocol:channel0':['publish', 'subscribe','history'], + 'persisted:text_protocol:channel1':['publish', 'subscribe','history'], + 'persisted:binary_protocol:channel0':['publish', 'subscribe','history'], + 'persisted:binary_protocol:channel1':['publish', 'subscribe','history'], + 'persisted:*':['subscribe','history'] + }) } ], namespaces: [ From 67ad67d6948a64d7ea5893957026ef3736c6de93 Mon Sep 17 00:00:00 2001 From: Paddy Byers Date: Wed, 12 Dec 2018 18:36:00 +0000 Subject: [PATCH 4/4] Bulk publish: PublishResponse.channel -> channelId --- .../io/ably/lib/types/PublishResponse.java | 7 +++-- .../test/rest/RestChannelBulkPublishTest.java | 30 +++++++++---------- 2 files changed, 20 insertions(+), 17 deletions(-) diff --git a/lib/src/main/java/io/ably/lib/types/PublishResponse.java b/lib/src/main/java/io/ably/lib/types/PublishResponse.java index bbac211fb..fe2866bc3 100644 --- a/lib/src/main/java/io/ably/lib/types/PublishResponse.java +++ b/lib/src/main/java/io/ably/lib/types/PublishResponse.java @@ -1,5 +1,6 @@ package io.ably.lib.types; +import com.google.gson.annotations.SerializedName; import io.ably.lib.http.HttpCore; import io.ably.lib.util.Log; import io.ably.lib.util.Serialisation; @@ -14,7 +15,8 @@ public class PublishResponse { public ErrorInfo error; - public String channel; + @SerializedName("channel") + public String channelId; public String messageId; private static PublishResponse[] fromJSONArray(byte[] json) { @@ -51,7 +53,8 @@ private PublishResponse readMsgpack(MessageUnpacker unpacker) throws IOException error = ErrorInfo.fromMsgpack(unpacker); break; case "channel": - channel = unpacker.unpackString(); + case "channelId": + channelId = unpacker.unpackString(); break; case "messageId": messageId = unpacker.unpackString(); diff --git a/lib/src/test/java/io/ably/lib/test/rest/RestChannelBulkPublishTest.java b/lib/src/test/java/io/ably/lib/test/rest/RestChannelBulkPublishTest.java index 2f5b152cf..680683f6f 100644 --- a/lib/src/test/java/io/ably/lib/test/rest/RestChannelBulkPublishTest.java +++ b/lib/src/test/java/io/ably/lib/test/rest/RestChannelBulkPublishTest.java @@ -39,24 +39,24 @@ public void bulk_publish_multiple_channels_simple() { /* first, publish some messages */ int channelCount = 5; - ArrayList channels = new ArrayList(); + ArrayList channelIds = new ArrayList(); for(int i = 0; i < channelCount; i++) { - channels.add("persisted:" + randomString()); + channelIds.add("persisted:" + randomString()); } Message message = new Message(null, "bulk_publish_multiple_channels_simple"); String messageId = message.id = randomString(); - Message.Batch payload = new Message.Batch(channels, Collections.singleton(message)); + Message.Batch payload = new Message.Batch(channelIds, Collections.singleton(message)); PublishResponse[] result = ably.publishBatch(new Message.Batch[] { payload }, null); for(PublishResponse response : result) { assertEquals("Verify expected response id", response.messageId, messageId); - assertTrue("Verify expected channel name", channels.contains(response.channel)); + assertTrue("Verify expected channel name", channelIds.contains(response.channelId)); assertNull("Verify no publish error", response.error); } /* get the history for this channel */ - for(String channel : channels) { + for(String channel : channelIds) { PaginatedResult messages = ably.channels.get(channel).history(null); assertNotNull("Expected non-null messages", messages); assertEquals("Expected 1 message", messages.items().length, 1); @@ -117,10 +117,10 @@ public void bulk_publish_multiple_channels_multiple_messages() { rndMessageTexts.add(randomString()); } - ArrayList channels = new ArrayList(); + ArrayList channelIds = new ArrayList(); for(int i = 0; i < channelCount; i++) { String channel = "persisted:" + randomString(); - channels.add(channel); + channelIds.add(channel); ArrayList messages = new ArrayList(); for(int j = 0; j < messageCount; j++) { messages.add(new Message(null, baseMessageText + '-' + channel + '-' + rndMessageTexts.get(j))); @@ -131,12 +131,12 @@ public void bulk_publish_multiple_channels_multiple_messages() { PublishResponse[] result = ably.publishBatch(payload.toArray(new Message.Batch[payload.size()]), null); for(PublishResponse response : result) { assertNotNull("Verify expected response id", response.messageId); - assertTrue("Verify expected channel name", channels.contains(response.channel)); + assertTrue("Verify expected channel name", channelIds.contains(response.channelId)); assertNull("Verify no publish error", response.error); } /* get the history for this channel */ - for(String channel : channels) { + for(String channel : channelIds) { PaginatedResult messages = ably.channels.get(channel).history(new Param[] {new Param("direction", "forwards")}); assertNotNull("Expected non-null messages", messages); assertEquals("Expected correct number of messages", messages.items().length, messageCount); @@ -177,20 +177,20 @@ public void bulk_publish_multiple_channels_partial_error() { /* first, publish some messages */ String baseChannelName = "persisted:" + testParams.name + ":channel"; int channelCount = 5; - ArrayList channels = new ArrayList(); + ArrayList channelIds = new ArrayList(); for(int i = 0; i < channelCount; i++) { - channels.add(baseChannelName + i); + channelIds.add(baseChannelName + i); } Message message = new Message(null, "bulk_publish_multiple_channels_partial_error"); String messageId = message.id = randomString(); - Message.Batch payload = new Message.Batch(channels, Collections.singleton(message)); + Message.Batch payload = new Message.Batch(channelIds, Collections.singleton(message)); PublishResponse[] result = ably.publishBatch(new Message.Batch[] { payload }, null); for(PublishResponse response : result) { - if((baseChannelName + "1").compareTo(response.channel) >= 0) { + if((baseChannelName + "1").compareTo(response.channelId) >= 0) { assertEquals("Verify expected response id", response.messageId, messageId); - assertTrue("Verify expected channel name", channels.contains(response.channel)); + assertTrue("Verify expected channel name", channelIds.contains(response.channelId)); assertNull("Verify no publish error", response.error); } else { assertNotNull("Verify expected publish error", response.error); @@ -199,7 +199,7 @@ public void bulk_publish_multiple_channels_partial_error() { } /* get the history for this channel */ - for(String channel : channels) { + for(String channel : channelIds) { if((baseChannelName + "1").compareTo(channel) >= 0) { PaginatedResult messages = ably.channels.get(channel).history(null); assertNotNull("Expected non-null messages", messages);