diff --git a/lib/src/main/java/io/ably/annotation/Experimental.java b/lib/src/main/java/io/ably/annotation/Experimental.java new file mode 100644 index 000000000..23a7393f4 --- /dev/null +++ b/lib/src/main/java/io/ably/annotation/Experimental.java @@ -0,0 +1,10 @@ +package io.ably.annotation; + +import java.lang.annotation.Documented; + +/** + * An annotation indicating an experimental API. Any or all detail of this + * API are subject to change in the future. Feedback on the API is welcomed. + */ +@Documented +public @interface Experimental { } diff --git a/lib/src/main/java/io/ably/lib/http/HttpCore.java b/lib/src/main/java/io/ably/lib/http/HttpCore.java index 839e6f77e..e76437962 100644 --- a/lib/src/main/java/io/ably/lib/http/HttpCore.java +++ b/lib/src/main/java/io/ably/lib/http/HttpCore.java @@ -273,10 +273,22 @@ private T handleResponse(HttpURLConnection conn, boolean credentialsIncluded throw AblyException.fromErrorInfo(error); } - if(response.statusCode < 200 || response.statusCode >= 300) { - /* get any in-body error details */ - ErrorInfo error = null; - if(response.body != null && response.body.length > 0) { + if(response.statusCode >= 200 && response.statusCode < 300) { + return (responseHandler != null) ? responseHandler.handleResponse(response, null) : null; + } + + /* get any in-body error details */ + ErrorInfo error = null; + if(response.body != null && response.body.length > 0) { + if(response.contentType != null && response.contentType.contains("msgpack")) { + try { + error = ErrorInfo.fromMsgpackBody(response.body); + } catch (IOException e) { + /* error pages aren't necessarily going to satisfy our Accept criteria ... */ + System.err.println("Unable to parse msgpack error response"); + } + } else { + /* assume json */ String bodyText = new String(response.body); try { ErrorResponse errorResponse = ErrorResponse.fromJSON(bodyText); @@ -288,62 +300,56 @@ private T handleResponse(HttpURLConnection conn, boolean credentialsIncluded System.err.println("Error message in unexpected format: " + bodyText); } } + } - /* handle error details in header instead of body */ - if(error == null) { - String errorCodeHeader = conn.getHeaderField("X-Ably-ErrorCode"); - String errorMessageHeader = conn.getHeaderField("X-Ably-ErrorMessage"); - if(errorCodeHeader != null) { - try { - error = new ErrorInfo(errorMessageHeader, response.statusCode, Integer.parseInt(errorCodeHeader)); - } catch(NumberFormatException e) {} - } + /* handle error details in header */ + if(error == null) { + String errorCodeHeader = conn.getHeaderField("X-Ably-ErrorCode"); + String errorMessageHeader = conn.getHeaderField("X-Ably-ErrorMessage"); + if(errorCodeHeader != null) { + try { + error = new ErrorInfo(errorMessageHeader, response.statusCode, Integer.parseInt(errorCodeHeader)); + } catch(NumberFormatException e) {} } + } - /* handle www-authenticate */ - if(response.statusCode == 401) { - boolean stale = (error != null && error.code == 40140); - List wwwAuthHeaders = response.getHeaderFields(HttpConstants.Headers.WWW_AUTHENTICATE); - if(wwwAuthHeaders != null && wwwAuthHeaders.size() > 0) { - Map headersByType = HttpAuth.sortAuthenticateHeaders(wwwAuthHeaders); - String tokenHeader = headersByType.get(HttpAuth.Type.X_ABLY_TOKEN); - if(tokenHeader != null) { stale |= (tokenHeader.indexOf("stale") > -1); } - AuthRequiredException exception = new AuthRequiredException(null, error); - exception.authChallenge = headersByType; - if(stale) { - exception.expired = true; - throw exception; - } - if(!credentialsIncluded) { - throw exception; - } + /* handle www-authenticate */ + if(response.statusCode == 401) { + boolean stale = (error != null && error.code == 40140); + List wwwAuthHeaders = response.getHeaderFields(HttpConstants.Headers.WWW_AUTHENTICATE); + if(wwwAuthHeaders != null && wwwAuthHeaders.size() > 0) { + Map headersByType = HttpAuth.sortAuthenticateHeaders(wwwAuthHeaders); + String tokenHeader = headersByType.get(HttpAuth.Type.X_ABLY_TOKEN); + if(tokenHeader != null) { stale |= (tokenHeader.indexOf("stale") > -1); } + AuthRequiredException exception = new AuthRequiredException(null, error); + exception.authChallenge = headersByType; + if(stale) { + exception.expired = true; + throw exception; } - } - /* handle proxy-authenticate */ - if(response.statusCode == 407) { - List proxyAuthHeaders = response.getHeaderFields(HttpConstants.Headers.PROXY_AUTHENTICATE); - if(proxyAuthHeaders != null && proxyAuthHeaders.size() > 0) { - AuthRequiredException exception = new AuthRequiredException(null, error); - exception.proxyAuthChallenge = HttpAuth.sortAuthenticateHeaders(proxyAuthHeaders); + if(!credentialsIncluded) { throw exception; } } - if(error == null) { - error = ErrorInfo.fromResponseStatus(response.statusLine, response.statusCode); - } else { - } - Log.e(TAG, "Error response from server: err = " + error.toString()); - if(responseHandler != null) { - return responseHandler.handleResponse(response, error); + } + /* handle proxy-authenticate */ + if(response.statusCode == 407) { + List proxyAuthHeaders = response.getHeaderFields(HttpConstants.Headers.PROXY_AUTHENTICATE); + if(proxyAuthHeaders != null && proxyAuthHeaders.size() > 0) { + AuthRequiredException exception = new AuthRequiredException(null, error); + exception.proxyAuthChallenge = HttpAuth.sortAuthenticateHeaders(proxyAuthHeaders); + throw exception; } - throw AblyException.fromErrorInfo(error); } - + if(error == null) { + error = ErrorInfo.fromResponseStatus(response.statusLine, response.statusCode); + } else { + } + Log.e(TAG, "Error response from server: err = " + error.toString()); if(responseHandler != null) { - return responseHandler.handleResponse(response, null); + return responseHandler.handleResponse(response, error); } - - return null; + throw AblyException.fromErrorInfo(error); } /** diff --git a/lib/src/main/java/io/ably/lib/rest/AblyRest.java b/lib/src/main/java/io/ably/lib/rest/AblyRest.java index eb2542928..f855abee6 100644 --- a/lib/src/main/java/io/ably/lib/rest/AblyRest.java +++ b/lib/src/main/java/io/ably/lib/rest/AblyRest.java @@ -2,6 +2,7 @@ import java.util.HashMap; +import io.ably.annotation.Experimental; import io.ably.lib.http.AsyncHttpScheduler; import io.ably.lib.http.Http; import io.ably.lib.http.HttpCore; @@ -12,18 +13,8 @@ import io.ably.lib.http.HttpPaginatedQuery; import io.ably.lib.http.HttpUtils; import io.ably.lib.http.PaginatedQuery; -import io.ably.lib.types.AblyException; -import io.ably.lib.types.AsyncHttpPaginatedResponse; -import io.ably.lib.types.AsyncPaginatedResult; -import io.ably.lib.types.Callback; -import io.ably.lib.types.ChannelOptions; -import io.ably.lib.types.ClientOptions; -import io.ably.lib.types.ErrorInfo; -import io.ably.lib.types.HttpPaginatedResponse; -import io.ably.lib.types.PaginatedResult; -import io.ably.lib.types.Param; -import io.ably.lib.types.Stats; -import io.ably.lib.types.StatsReader; +import io.ably.lib.types.*; +import io.ably.lib.util.Crypto; import io.ably.lib.util.Log; import io.ably.lib.util.Serialisation; @@ -204,6 +195,60 @@ public void requestAsync(String method, String path, Param[] params, HttpCore.Re (new AsyncHttpPaginatedQuery(http, method, path, headers, params, body)).exec(callback); } + /** + * Publish a messages on one or more channels. When there are + * messages to be sent on multiple channels simultaneously, + * it is more efficient to use this method to publish them in + * a single request, as compared with publishing via multiple + * independent requests. + * @throws AblyException + */ + @Experimental + public PublishResponse[] publishBatch(Message.Batch[] pubSpecs, ChannelOptions channelOptions) throws AblyException { + return publishBatchImpl(pubSpecs, channelOptions).sync(); + } + + @Experimental + public void publishBatchAsync(Message.Batch[] pubSpecs, ChannelOptions channelOptions, final Callback callback) throws AblyException { + publishBatchImpl(pubSpecs, channelOptions).async(callback); + } + + private Http.Request publishBatchImpl(final Message.Batch[] pubSpecs, ChannelOptions channelOptions) throws AblyException { + boolean hasClientSuppliedId = false; + for(Message.Batch spec : pubSpecs) { + for(Message message : spec.messages) { + /* handle message ids */ + /* RSL1k2 */ + hasClientSuppliedId |= (message.id != null); + /* RTL6g3 */ + auth.checkClientId(message, true, false); + message.encode(channelOptions); + } + if(!hasClientSuppliedId && options.idempotentRestPublishing) { + /* RSL1k1: populate the message id with a library-generated id */ + String messageId = Crypto.getRandomMessageId(); + for (int i = 0; i < spec.messages.length; i++) { + spec.messages[i].id = messageId + ':' + i; + } + } + } + return http.request(new Http.Execute() { + @Override + public void execute(HttpScheduler http, final Callback callback) throws AblyException { + HttpCore.RequestBody requestBody = options.useBinaryProtocol ? MessageSerializer.asMsgpackRequest(pubSpecs) : MessageSerializer.asJSONRequest(pubSpecs); + http.post("/messages", HttpUtils.defaultAcceptHeaders(options.useBinaryProtocol), null, requestBody, new HttpCore.ResponseHandler() { + @Override + public PublishResponse[] handleResponse(HttpCore.Response response, ErrorInfo error) throws AblyException { + if(error != null && error.code != 40020) { + throw AblyException.fromErrorInfo(error); + } + return PublishResponse.getBulkPublishResponseHandler(response.statusCode).handleResponseBody(response.contentType, response.body); + } + }, true, callback); + } + }); + } + /** * Authentication token has changed. waitForResult is true if there is a need to * wait for server response to auth request diff --git a/lib/src/main/java/io/ably/lib/types/ErrorInfo.java b/lib/src/main/java/io/ably/lib/types/ErrorInfo.java index b8cae2c1b..b86226ae8 100644 --- a/lib/src/main/java/io/ably/lib/types/ErrorInfo.java +++ b/lib/src/main/java/io/ably/lib/types/ErrorInfo.java @@ -1,5 +1,6 @@ package io.ably.lib.types; +import io.ably.lib.util.Serialisation; import org.msgpack.core.MessageFormat; import org.msgpack.core.MessageUnpacker; @@ -108,6 +109,31 @@ ErrorInfo readMsgpack(MessageUnpacker unpacker) throws IOException { return this; } + public static ErrorInfo fromMsgpackBody(byte[] msgpack) throws IOException { + MessageUnpacker unpacker = Serialisation.msgpackUnpackerConfig.newUnpacker(msgpack); + return fromMsgpackBody(unpacker); + } + + private static ErrorInfo fromMsgpackBody(MessageUnpacker unpacker) throws IOException { + int fieldCount = unpacker.unpackMapHeader(); + ErrorInfo error = null; + for(int i = 0; i < fieldCount; i++) { + String fieldName = unpacker.unpackString().intern(); + MessageFormat fieldFormat = unpacker.getNextFormat(); + if(fieldFormat.equals(MessageFormat.NIL)) { unpacker.unpackNil(); continue; } + + switch(fieldName) { + case "error": + error = ErrorInfo.fromMsgpack(unpacker); + break; + default: + Log.v(TAG, "Unexpected field: " + fieldName); + unpacker.skipValue(); + } + } + return error; + } + static ErrorInfo fromMsgpack(MessageUnpacker unpacker) throws IOException { return (new ErrorInfo()).readMsgpack(unpacker); } diff --git a/lib/src/main/java/io/ably/lib/types/Message.java b/lib/src/main/java/io/ably/lib/types/Message.java index e3c6bec3f..19bee8e5f 100644 --- a/lib/src/main/java/io/ably/lib/types/Message.java +++ b/lib/src/main/java/io/ably/lib/types/Message.java @@ -2,6 +2,7 @@ import java.io.IOException; import java.lang.reflect.Type; +import java.util.Collection; import org.msgpack.core.MessageFormat; import org.msgpack.core.MessagePacker; @@ -92,6 +93,43 @@ Message readMsgpack(MessageUnpacker unpacker) throws IOException { return this; } + /** + * A specification for a collection of messages to be sent using the batch API + * @author paddy + * + */ + public static class Batch { + public String[] channels; + public Message[] messages; + + public Batch(String channel, Message[] messages) { + if(channel == null || channel.isEmpty()) throw new IllegalArgumentException("A Batch spec cannot have an empty set of channels"); + if(messages == null || messages.length == 0) throw new IllegalArgumentException("A Batch spec cannot have an empty set of messages"); + this.channels = new String[] { channel }; + this.messages = messages; + } + + public Batch(String[] channels, Message[] messages) { + if(channels == null || channels.length == 0) throw new IllegalArgumentException("A Batch spec cannot have an empty set of channels"); + if(messages == null || messages.length == 0) throw new IllegalArgumentException("A Batch spec cannot have an empty set of messages"); + this.channels = channels; + this.messages = messages; + } + + public Batch(Collection channels, Collection messages) { + this(channels.toArray(new String[channels.size()]), messages.toArray(new Message[messages.size()])); + } + + public void writeMsgpack(MessagePacker packer) throws IOException { + packer.packMapHeader(2); + packer.packString("channels"); + packer.packArrayHeader(channels.length); + for(String ch : channels) packer.packString(ch); + packer.packString("messages"); + MessageSerializer.writeMsgpackArray(messages, packer); + } + } + static Message fromMsgpack(MessageUnpacker unpacker) throws IOException { return (new Message()).readMsgpack(unpacker); } diff --git a/lib/src/main/java/io/ably/lib/types/MessageSerializer.java b/lib/src/main/java/io/ably/lib/types/MessageSerializer.java index 8c444fc74..dde4a13b2 100644 --- a/lib/src/main/java/io/ably/lib/types/MessageSerializer.java +++ b/lib/src/main/java/io/ably/lib/types/MessageSerializer.java @@ -70,6 +70,29 @@ public static void writeMsgpackArray(Message[] messages, MessagePacker packer) { } catch(IOException e) {} } + public static HttpCore.RequestBody asMsgpackRequest(Message.Batch[] pubSpecs) { + return new HttpUtils.ByteArrayRequestBody(writeMsgpackArray(pubSpecs), "application/x-msgpack"); + } + + static byte[] writeMsgpackArray(Message.Batch[] pubSpecs) { + try { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + MessagePacker packer = Serialisation.msgpackPackerConfig.newPacker(out); + writeMsgpackArray(pubSpecs, packer); + packer.flush(); + return out.toByteArray(); + } catch(IOException e) { return null; } + } + + static void writeMsgpackArray(Message.Batch[] pubSpecs, MessagePacker packer) throws IOException { + try { + int count = pubSpecs.length; + packer.packArrayHeader(count); + for(Message.Batch spec : pubSpecs) + spec.writeMsgpack(packer); + } catch(IOException e) {} + } + /**************************************** * JSON decode ****************************************/ @@ -90,6 +113,10 @@ public static HttpCore.RequestBody asJsonRequest(Message[] messages) { return new HttpUtils.JsonRequestBody(Serialisation.gson.toJson(messages)); } + public static HttpCore.RequestBody asJSONRequest(Message.Batch[] pubSpecs) { + return new HttpUtils.JsonRequestBody(Serialisation.gson.toJson(pubSpecs)); + } + /**************************************** * BodyHandler ****************************************/ @@ -129,6 +156,5 @@ else if("application/x-msgpack".equals(contentType)) } private static HttpCore.BodyHandler messageResponseHandler = new MessageBodyHandler(null); - private static final String TAG = MessageSerializer.class.getName(); } diff --git a/lib/src/main/java/io/ably/lib/types/PublishResponse.java b/lib/src/main/java/io/ably/lib/types/PublishResponse.java new file mode 100644 index 000000000..fe2866bc3 --- /dev/null +++ b/lib/src/main/java/io/ably/lib/types/PublishResponse.java @@ -0,0 +1,153 @@ +package io.ably.lib.types; + +import com.google.gson.annotations.SerializedName; +import io.ably.lib.http.HttpCore; +import io.ably.lib.util.Log; +import io.ably.lib.util.Serialisation; +import org.msgpack.core.MessageFormat; +import org.msgpack.core.MessageUnpacker; + +import java.io.IOException; + +/**************************************** + * PublishResponse + ****************************************/ + +public class PublishResponse { + public ErrorInfo error; + @SerializedName("channel") + public String channelId; + public String messageId; + + private static PublishResponse[] fromJSONArray(byte[] json) { + return Serialisation.gson.fromJson(new String(json), PublishResponse[].class); + } + + private static PublishResponse fromMsgpack(MessageUnpacker unpacker) throws IOException { + return (new PublishResponse()).readMsgpack(unpacker); + } + + private static PublishResponse[] fromMsgpackArray(byte[] msgpack) throws IOException { + MessageUnpacker unpacker = Serialisation.msgpackUnpackerConfig.newUnpacker(msgpack); + return fromMsgpackArray(unpacker); + } + + private static PublishResponse[] fromMsgpackArray(MessageUnpacker unpacker) throws IOException { + int count = unpacker.unpackArrayHeader(); + PublishResponse[] result = new PublishResponse[count]; + for(int j = 0; j < count; j++) { + result[j] = PublishResponse.fromMsgpack(unpacker); + } + return result; + } + + private PublishResponse readMsgpack(MessageUnpacker unpacker) throws IOException { + int fieldCount = unpacker.unpackMapHeader(); + for(int i = 0; i < fieldCount; i++) { + String fieldName = unpacker.unpackString().intern(); + MessageFormat fieldFormat = unpacker.getNextFormat(); + if(fieldFormat.equals(MessageFormat.NIL)) { unpacker.unpackNil(); continue; } + + switch(fieldName) { + case "error": + error = ErrorInfo.fromMsgpack(unpacker); + break; + case "channel": + case "channelId": + channelId = unpacker.unpackString(); + break; + case "messageId": + messageId = unpacker.unpackString(); + break; + default: + Log.v(TAG, "Unexpected field: " + fieldName); + unpacker.skipValue(); + } + } + return this; + } + + public static HttpCore.BodyHandler getBulkPublishResponseHandler(int statusCode) { + return (statusCode < 300) ? bulkResponseBodyHandler : batchErrorBodyHandler; + } + + private static class BatchErrorResponse { + public ErrorInfo error; + public PublishResponse[] batchResponse; + + static BatchErrorResponse readJSON(byte[] json) { + return Serialisation.gson.fromJson(new String(json), BatchErrorResponse.class); + } + + static BatchErrorResponse readMsgpack(byte[] msgpack) throws IOException { + MessageUnpacker unpacker = Serialisation.msgpackUnpackerConfig.newUnpacker(msgpack); + return (new BatchErrorResponse()).readMsgpack(unpacker); + } + + BatchErrorResponse readMsgpack(MessageUnpacker unpacker) throws IOException { + int fieldCount = unpacker.unpackMapHeader(); + for(int i = 0; i < fieldCount; i++) { + String fieldName = unpacker.unpackString().intern(); + MessageFormat fieldFormat = unpacker.getNextFormat(); + if(fieldFormat.equals(MessageFormat.NIL)) { unpacker.unpackNil(); continue; } + + switch(fieldName) { + case "error": + error = ErrorInfo.fromMsgpack(unpacker); + break; + case "batchResponse": + batchResponse = PublishResponse.fromMsgpackArray(unpacker); + break; + default: + Log.v(TAG, "Unexpected field: " + fieldName); + unpacker.skipValue(); + } + } + return this; + } + } + + private static class BulkResponseBodyHandler implements HttpCore.BodyHandler { + @Override + public PublishResponse[] handleResponseBody(String contentType, byte[] body) throws AblyException { + try { + if("application/json".equals(contentType)) { + return PublishResponse.fromJSONArray(body); + } else if("application/x-msgpack".equals(contentType)) { + return PublishResponse.fromMsgpackArray(body); + } + return null; + } catch(IOException e) { + throw AblyException.fromThrowable(e); + } + } + } + + private static class BatchErrorBodyHandler implements HttpCore.BodyHandler { + @Override + public PublishResponse[] handleResponseBody(String contentType, byte[] body) throws AblyException { + try { + BatchErrorResponse response = null; + if("application/json".equals(contentType)) { + response = BatchErrorResponse.readJSON(body); + } else if("application/x-msgpack".equals(contentType)) { + response = BatchErrorResponse.readMsgpack(body); + } + if(response == null) { + return null; + } + if(response.error != null && response.error.code != 40020) { + throw AblyException.fromErrorInfo(response.error); + } + return response.batchResponse; + } catch(IOException e) { + throw AblyException.fromThrowable(e); + } + } + } + + private static HttpCore.BodyHandler batchErrorBodyHandler = new BatchErrorBodyHandler(); + private static HttpCore.BodyHandler bulkResponseBodyHandler = new BulkResponseBodyHandler(); + + private static final String TAG = MessageSerializer.class.getName(); +} diff --git a/lib/src/test/java/io/ably/lib/test/rest/RestChannelBulkPublishTest.java b/lib/src/test/java/io/ably/lib/test/rest/RestChannelBulkPublishTest.java new file mode 100644 index 000000000..680683f6f --- /dev/null +++ b/lib/src/test/java/io/ably/lib/test/rest/RestChannelBulkPublishTest.java @@ -0,0 +1,219 @@ +package io.ably.lib.test.rest; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Random; + +import io.ably.lib.test.common.ParameterizedTest; +import io.ably.lib.types.*; + +import io.ably.lib.rest.AblyRest; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; + +import static org.junit.Assert.*; + +public class RestChannelBulkPublishTest extends ParameterizedTest { + + /** + * Publish a single message on multiple channels + * + * The payload constructed has the form + * [ + * { + * channel: [ , , ... ], + * message: [{ data: }] + * } + * ] + * + * It publishes the given message on all of the given channels. + */ + @Ignore // until idempotent publishing is enabled + @Test + public void bulk_publish_multiple_channels_simple() { + try { + /* setup library instance */ + ClientOptions opts = createOptions(testVars.keys[0].keyStr); + AblyRest ably = new AblyRest(opts); + + /* first, publish some messages */ + int channelCount = 5; + ArrayList channelIds = new ArrayList(); + for(int i = 0; i < channelCount; i++) { + channelIds.add("persisted:" + randomString()); + } + + Message message = new Message(null, "bulk_publish_multiple_channels_simple"); + String messageId = message.id = randomString(); + Message.Batch payload = new Message.Batch(channelIds, Collections.singleton(message)); + + PublishResponse[] result = ably.publishBatch(new Message.Batch[] { payload }, null); + for(PublishResponse response : result) { + assertEquals("Verify expected response id", response.messageId, messageId); + assertTrue("Verify expected channel name", channelIds.contains(response.channelId)); + assertNull("Verify no publish error", response.error); + } + + /* get the history for this channel */ + for(String channel : channelIds) { + PaginatedResult messages = ably.channels.get(channel).history(null); + assertNotNull("Expected non-null messages", messages); + assertEquals("Expected 1 message", messages.items().length, 1); + /* verify message contents */ + assertEquals("Expect message data to be expected String", messages.items()[0].data, message.data); + } + } catch (AblyException e) { + e.printStackTrace(); + fail("bulkpublish_multiple_channels_simple: Unexpected exception"); + return; + } + } + + /** + * Publish a multiple messages on multiple channels + * + * The payload constructed has the form + * [ + * { + * channel: [ ], + * message: [ + * { data: }, + * { data: }, + * { data: }, + * ... + * ] + * }, + * { + * channel: [ ], + * message: [ + * { data: }, + * { data: }, + * { data: }, + * ... + * ] + * }, + * ... + * ] + * + * It publishes the given messages on the associated channels. + */ + @Test + public void bulk_publish_multiple_channels_multiple_messages() { + try { + /* setup library instance */ + ClientOptions opts = createOptions(testVars.keys[0].keyStr); + opts.idempotentRestPublishing = true; + AblyRest ably = new AblyRest(opts); + + /* first, publish some messages */ + int channelCount = 5; + int messageCount = 6; + String baseMessageText = "bulk_publish_multiple_channels_multiple_messages"; + ArrayList payload = new ArrayList(); + + ArrayList rndMessageTexts = new ArrayList(); + for(int i = 0; i < messageCount; i++) { + rndMessageTexts.add(randomString()); + } + + ArrayList channelIds = new ArrayList(); + for(int i = 0; i < channelCount; i++) { + String channel = "persisted:" + randomString(); + channelIds.add(channel); + ArrayList messages = new ArrayList(); + for(int j = 0; j < messageCount; j++) { + messages.add(new Message(null, baseMessageText + '-' + channel + '-' + rndMessageTexts.get(j))); + } + payload.add(new Message.Batch(Collections.singleton(channel), messages)); + } + + PublishResponse[] result = ably.publishBatch(payload.toArray(new Message.Batch[payload.size()]), null); + for(PublishResponse response : result) { + assertNotNull("Verify expected response id", response.messageId); + assertTrue("Verify expected channel name", channelIds.contains(response.channelId)); + assertNull("Verify no publish error", response.error); + } + + /* get the history for this channel */ + for(String channel : channelIds) { + PaginatedResult messages = ably.channels.get(channel).history(new Param[] {new Param("direction", "forwards")}); + assertNotNull("Expected non-null messages", messages); + assertEquals("Expected correct number of messages", messages.items().length, messageCount); + /* verify message contents */ + for(int i = 0; i < messageCount; i++) { + assertEquals("Expect message data to be expected String", messages.items()[i].data, baseMessageText + '-' + channel + '-' + rndMessageTexts.get(i)); + } + } + } catch (AblyException e) { + e.printStackTrace(); + fail("bulk_publish_multiple_channels_multiple_messages: Unexpected exception"); + return; + } + } + + /** + * Publish a single message on multiple channels, using credentials + * that are only able to publish to a subset of the channels + * + * The payload constructed has the form + * [ + * { + * channel: [ , , ... ], + * message: [{ data: }] + * } + * ] + * + * It attempts to publishe the given message on all of the given channels. + */ + @Ignore // until idempotent publishing is enabled + @Test + public void bulk_publish_multiple_channels_partial_error() { + try { + /* setup library instance */ + ClientOptions opts = createOptions(testVars.keys[6].keyStr); + AblyRest ably = new AblyRest(opts); + + /* first, publish some messages */ + String baseChannelName = "persisted:" + testParams.name + ":channel"; + int channelCount = 5; + ArrayList channelIds = new ArrayList(); + for(int i = 0; i < channelCount; i++) { + channelIds.add(baseChannelName + i); + } + + Message message = new Message(null, "bulk_publish_multiple_channels_partial_error"); + String messageId = message.id = randomString(); + Message.Batch payload = new Message.Batch(channelIds, Collections.singleton(message)); + + PublishResponse[] result = ably.publishBatch(new Message.Batch[] { payload }, null); + for(PublishResponse response : result) { + if((baseChannelName + "1").compareTo(response.channelId) >= 0) { + assertEquals("Verify expected response id", response.messageId, messageId); + assertTrue("Verify expected channel name", channelIds.contains(response.channelId)); + assertNull("Verify no publish error", response.error); + } else { + assertNotNull("Verify expected publish error", response.error); + assertEquals("Verify expected publish error code", response.error.code, 40160); + } + } + + /* get the history for this channel */ + for(String channel : channelIds) { + if((baseChannelName + "1").compareTo(channel) >= 0) { + PaginatedResult messages = ably.channels.get(channel).history(null); + assertNotNull("Expected non-null messages", messages); + assertEquals("Expected 1 message", messages.items().length, 1); + /* verify message contents */ + assertEquals("Expect message data to be expected String", messages.items()[0].data, message.data); + } + } + } catch (AblyException e) { + e.printStackTrace(); + fail("bulkpublish_multiple_channels_simple: Unexpected exception"); + return; + } + } + + private String randomString() { return String.valueOf(new Random().nextDouble()).substring(2); } +} diff --git a/lib/src/test/java/io/ably/lib/test/rest/RestSuite.java b/lib/src/test/java/io/ably/lib/test/rest/RestSuite.java index fd22a2613..45d8ab2a6 100644 --- a/lib/src/test/java/io/ably/lib/test/rest/RestSuite.java +++ b/lib/src/test/java/io/ably/lib/test/rest/RestSuite.java @@ -27,6 +27,7 @@ RestChannelTest.class, RestChannelHistoryTest.class, RestChannelPublishTest.class, + RestChannelBulkPublishTest.class, RestCryptoTest.class, RestPresenceTest.class, RestProxyTest.class, diff --git a/lib/src/test/resources/local/testAppSpec.json b/lib/src/test/resources/local/testAppSpec.json index 2f15761ef..1281f69b9 100644 --- a/lib/src/test/resources/local/testAppSpec.json +++ b/lib/src/test/resources/local/testAppSpec.json @@ -16,6 +16,9 @@ { "privileged": true, "capability": "{\"channel0\":[\"publish\"],\"channel1\":[\"publish\"],\"channel2\":[\"publish\",\"subscribe\"],\"channel3\":[\"subscribe\"],\"channel4\":[\"presence\",\"publish\",\"subscribe\"],\"channel5\":[\"presence\"],\"channel6\":[\"*\"]}" + }, + { + "capability": "{\"persisted:text_protocol:channel0\":[\"publish\",\"subscribe\",\"history\"],\"persisted:text_protocol:channel1\":[\"publish\",\"subscribe\",\"history\"],\"persisted:binary_protocol:channel0\":[\"publish\",\"subscribe\",\"history\"],\"persisted:binary_protocol:channel1\":[\"publish\",\"subscribe\",\"history\"],\"persisted:*\":[\"subscribe\",\"history\"]}" } ], "namespaces": [ diff --git a/lib/src/test/resources/local/testAppSpec.json.src b/lib/src/test/resources/local/testAppSpec.json.src index c0f0f94c7..d2bab25a6 100644 --- a/lib/src/test/resources/local/testAppSpec.json.src +++ b/lib/src/test/resources/local/testAppSpec.json.src @@ -40,6 +40,15 @@ channel5:['presence'], channel6:['*'] }) + }, + { /* key 6, has permissions for selected persisted channels */ + capability: JSON.stringify({ + 'persisted:text_protocol:channel0':['publish', 'subscribe','history'], + 'persisted:text_protocol:channel1':['publish', 'subscribe','history'], + 'persisted:binary_protocol:channel0':['publish', 'subscribe','history'], + 'persisted:binary_protocol:channel1':['publish', 'subscribe','history'], + 'persisted:*':['subscribe','history'] + }) } ], namespaces: [