Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions lib/src/main/java/io/ably/annotation/Experimental.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package io.ably.annotation;

import java.lang.annotation.Documented;

/**
* An annotation indicating an experimental API. Any or all detail of this
* API are subject to change in the future. Feedback on the API is welcomed.
*/
@Documented
public @interface Experimental { }
104 changes: 55 additions & 49 deletions lib/src/main/java/io/ably/lib/http/HttpCore.java
Original file line number Diff line number Diff line change
Expand Up @@ -273,10 +273,22 @@ private <T> T handleResponse(HttpURLConnection conn, boolean credentialsIncluded
throw AblyException.fromErrorInfo(error);
}

if(response.statusCode < 200 || response.statusCode >= 300) {
/* get any in-body error details */
ErrorInfo error = null;
if(response.body != null && response.body.length > 0) {
if(response.statusCode >= 200 && response.statusCode < 300) {
return (responseHandler != null) ? responseHandler.handleResponse(response, null) : null;
}

/* get any in-body error details */
ErrorInfo error = null;
if(response.body != null && response.body.length > 0) {
if(response.contentType != null && response.contentType.contains("msgpack")) {
try {
error = ErrorInfo.fromMsgpackBody(response.body);
} catch (IOException e) {
/* error pages aren't necessarily going to satisfy our Accept criteria ... */
System.err.println("Unable to parse msgpack error response");
}
} else {
/* assume json */
String bodyText = new String(response.body);
try {
ErrorResponse errorResponse = ErrorResponse.fromJSON(bodyText);
Expand All @@ -288,62 +300,56 @@ private <T> T handleResponse(HttpURLConnection conn, boolean credentialsIncluded
System.err.println("Error message in unexpected format: " + bodyText);
}
}
}

/* handle error details in header instead of body */
if(error == null) {
String errorCodeHeader = conn.getHeaderField("X-Ably-ErrorCode");
String errorMessageHeader = conn.getHeaderField("X-Ably-ErrorMessage");
if(errorCodeHeader != null) {
try {
error = new ErrorInfo(errorMessageHeader, response.statusCode, Integer.parseInt(errorCodeHeader));
} catch(NumberFormatException e) {}
}
/* handle error details in header */
if(error == null) {
String errorCodeHeader = conn.getHeaderField("X-Ably-ErrorCode");
String errorMessageHeader = conn.getHeaderField("X-Ably-ErrorMessage");
if(errorCodeHeader != null) {
try {
error = new ErrorInfo(errorMessageHeader, response.statusCode, Integer.parseInt(errorCodeHeader));
} catch(NumberFormatException e) {}
}
}

/* handle www-authenticate */
if(response.statusCode == 401) {
boolean stale = (error != null && error.code == 40140);
List<String> wwwAuthHeaders = response.getHeaderFields(HttpConstants.Headers.WWW_AUTHENTICATE);
if(wwwAuthHeaders != null && wwwAuthHeaders.size() > 0) {
Map<HttpAuth.Type, String> headersByType = HttpAuth.sortAuthenticateHeaders(wwwAuthHeaders);
String tokenHeader = headersByType.get(HttpAuth.Type.X_ABLY_TOKEN);
if(tokenHeader != null) { stale |= (tokenHeader.indexOf("stale") > -1); }
AuthRequiredException exception = new AuthRequiredException(null, error);
exception.authChallenge = headersByType;
if(stale) {
exception.expired = true;
throw exception;
}
if(!credentialsIncluded) {
throw exception;
}
/* handle www-authenticate */
if(response.statusCode == 401) {
boolean stale = (error != null && error.code == 40140);
List<String> wwwAuthHeaders = response.getHeaderFields(HttpConstants.Headers.WWW_AUTHENTICATE);
if(wwwAuthHeaders != null && wwwAuthHeaders.size() > 0) {
Map<HttpAuth.Type, String> headersByType = HttpAuth.sortAuthenticateHeaders(wwwAuthHeaders);
String tokenHeader = headersByType.get(HttpAuth.Type.X_ABLY_TOKEN);
if(tokenHeader != null) { stale |= (tokenHeader.indexOf("stale") > -1); }
AuthRequiredException exception = new AuthRequiredException(null, error);
exception.authChallenge = headersByType;
if(stale) {
exception.expired = true;
throw exception;
}
}
/* handle proxy-authenticate */
if(response.statusCode == 407) {
List<String> proxyAuthHeaders = response.getHeaderFields(HttpConstants.Headers.PROXY_AUTHENTICATE);
if(proxyAuthHeaders != null && proxyAuthHeaders.size() > 0) {
AuthRequiredException exception = new AuthRequiredException(null, error);
exception.proxyAuthChallenge = HttpAuth.sortAuthenticateHeaders(proxyAuthHeaders);
if(!credentialsIncluded) {
throw exception;
}
}
if(error == null) {
error = ErrorInfo.fromResponseStatus(response.statusLine, response.statusCode);
} else {
}
Log.e(TAG, "Error response from server: err = " + error.toString());
if(responseHandler != null) {
return responseHandler.handleResponse(response, error);
}
/* handle proxy-authenticate */
if(response.statusCode == 407) {
List<String> proxyAuthHeaders = response.getHeaderFields(HttpConstants.Headers.PROXY_AUTHENTICATE);
if(proxyAuthHeaders != null && proxyAuthHeaders.size() > 0) {
AuthRequiredException exception = new AuthRequiredException(null, error);
exception.proxyAuthChallenge = HttpAuth.sortAuthenticateHeaders(proxyAuthHeaders);
throw exception;
}
throw AblyException.fromErrorInfo(error);
}

if(error == null) {
error = ErrorInfo.fromResponseStatus(response.statusLine, response.statusCode);
} else {
}
Log.e(TAG, "Error response from server: err = " + error.toString());
if(responseHandler != null) {
return responseHandler.handleResponse(response, null);
return responseHandler.handleResponse(response, error);
}

return null;
throw AblyException.fromErrorInfo(error);
}

/**
Expand Down
69 changes: 57 additions & 12 deletions lib/src/main/java/io/ably/lib/rest/AblyRest.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.util.HashMap;

import io.ably.annotation.Experimental;
import io.ably.lib.http.AsyncHttpScheduler;
import io.ably.lib.http.Http;
import io.ably.lib.http.HttpCore;
Expand All @@ -12,18 +13,8 @@
import io.ably.lib.http.HttpPaginatedQuery;
import io.ably.lib.http.HttpUtils;
import io.ably.lib.http.PaginatedQuery;
import io.ably.lib.types.AblyException;
import io.ably.lib.types.AsyncHttpPaginatedResponse;
import io.ably.lib.types.AsyncPaginatedResult;
import io.ably.lib.types.Callback;
import io.ably.lib.types.ChannelOptions;
import io.ably.lib.types.ClientOptions;
import io.ably.lib.types.ErrorInfo;
import io.ably.lib.types.HttpPaginatedResponse;
import io.ably.lib.types.PaginatedResult;
import io.ably.lib.types.Param;
import io.ably.lib.types.Stats;
import io.ably.lib.types.StatsReader;
import io.ably.lib.types.*;
import io.ably.lib.util.Crypto;
import io.ably.lib.util.Log;
import io.ably.lib.util.Serialisation;

Expand Down Expand Up @@ -204,6 +195,60 @@ public void requestAsync(String method, String path, Param[] params, HttpCore.Re
(new AsyncHttpPaginatedQuery(http, method, path, headers, params, body)).exec(callback);
}

/**
* Publish a messages on one or more channels. When there are
* messages to be sent on multiple channels simultaneously,
* it is more efficient to use this method to publish them in
* a single request, as compared with publishing via multiple
* independent requests.
* @throws AblyException
*/
@Experimental
public PublishResponse[] publishBatch(Message.Batch[] pubSpecs, ChannelOptions channelOptions) throws AblyException {
return publishBatchImpl(pubSpecs, channelOptions).sync();
}

@Experimental
public void publishBatchAsync(Message.Batch[] pubSpecs, ChannelOptions channelOptions, final Callback<PublishResponse[]> callback) throws AblyException {
publishBatchImpl(pubSpecs, channelOptions).async(callback);
}

private Http.Request<PublishResponse[]> publishBatchImpl(final Message.Batch[] pubSpecs, ChannelOptions channelOptions) throws AblyException {
boolean hasClientSuppliedId = false;
for(Message.Batch spec : pubSpecs) {
for(Message message : spec.messages) {
/* handle message ids */
/* RSL1k2 */
hasClientSuppliedId |= (message.id != null);
/* RTL6g3 */
auth.checkClientId(message, true, false);
message.encode(channelOptions);
}
if(!hasClientSuppliedId && options.idempotentRestPublishing) {
/* RSL1k1: populate the message id with a library-generated id */
String messageId = Crypto.getRandomMessageId();
for (int i = 0; i < spec.messages.length; i++) {
spec.messages[i].id = messageId + ':' + i;
}
}
}
return http.request(new Http.Execute<PublishResponse[]>() {
@Override
public void execute(HttpScheduler http, final Callback<PublishResponse[]> callback) throws AblyException {
HttpCore.RequestBody requestBody = options.useBinaryProtocol ? MessageSerializer.asMsgpackRequest(pubSpecs) : MessageSerializer.asJSONRequest(pubSpecs);
http.post("/messages", HttpUtils.defaultAcceptHeaders(options.useBinaryProtocol), null, requestBody, new HttpCore.ResponseHandler<PublishResponse[]>() {
@Override
public PublishResponse[] handleResponse(HttpCore.Response response, ErrorInfo error) throws AblyException {
if(error != null && error.code != 40020) {
throw AblyException.fromErrorInfo(error);
}
return PublishResponse.getBulkPublishResponseHandler(response.statusCode).handleResponseBody(response.contentType, response.body);
}
}, true, callback);
}
});
}

/**
* Authentication token has changed. waitForResult is true if there is a need to
* wait for server response to auth request
Expand Down
26 changes: 26 additions & 0 deletions lib/src/main/java/io/ably/lib/types/ErrorInfo.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.ably.lib.types;

import io.ably.lib.util.Serialisation;
import org.msgpack.core.MessageFormat;
import org.msgpack.core.MessageUnpacker;

Expand Down Expand Up @@ -108,6 +109,31 @@ ErrorInfo readMsgpack(MessageUnpacker unpacker) throws IOException {
return this;
}

public static ErrorInfo fromMsgpackBody(byte[] msgpack) throws IOException {
MessageUnpacker unpacker = Serialisation.msgpackUnpackerConfig.newUnpacker(msgpack);
return fromMsgpackBody(unpacker);
}

private static ErrorInfo fromMsgpackBody(MessageUnpacker unpacker) throws IOException {
int fieldCount = unpacker.unpackMapHeader();
ErrorInfo error = null;
for(int i = 0; i < fieldCount; i++) {
String fieldName = unpacker.unpackString().intern();
MessageFormat fieldFormat = unpacker.getNextFormat();
if(fieldFormat.equals(MessageFormat.NIL)) { unpacker.unpackNil(); continue; }

switch(fieldName) {
case "error":
error = ErrorInfo.fromMsgpack(unpacker);
break;
default:
Log.v(TAG, "Unexpected field: " + fieldName);
unpacker.skipValue();
}
}
return error;
}

static ErrorInfo fromMsgpack(MessageUnpacker unpacker) throws IOException {
return (new ErrorInfo()).readMsgpack(unpacker);
}
Expand Down
38 changes: 38 additions & 0 deletions lib/src/main/java/io/ably/lib/types/Message.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.io.IOException;
import java.lang.reflect.Type;
import java.util.Collection;

import org.msgpack.core.MessageFormat;
import org.msgpack.core.MessagePacker;
Expand Down Expand Up @@ -92,6 +93,43 @@ Message readMsgpack(MessageUnpacker unpacker) throws IOException {
return this;
}

/**
* A specification for a collection of messages to be sent using the batch API
* @author paddy
*
*/
public static class Batch {
public String[] channels;
public Message[] messages;

public Batch(String channel, Message[] messages) {
if(channel == null || channel.isEmpty()) throw new IllegalArgumentException("A Batch spec cannot have an empty set of channels");
if(messages == null || messages.length == 0) throw new IllegalArgumentException("A Batch spec cannot have an empty set of messages");
this.channels = new String[] { channel };
this.messages = messages;
}

public Batch(String[] channels, Message[] messages) {
if(channels == null || channels.length == 0) throw new IllegalArgumentException("A Batch spec cannot have an empty set of channels");
if(messages == null || messages.length == 0) throw new IllegalArgumentException("A Batch spec cannot have an empty set of messages");
this.channels = channels;
this.messages = messages;
}

public Batch(Collection<String> channels, Collection<Message> messages) {
this(channels.toArray(new String[channels.size()]), messages.toArray(new Message[messages.size()]));
}

public void writeMsgpack(MessagePacker packer) throws IOException {
packer.packMapHeader(2);
packer.packString("channels");
packer.packArrayHeader(channels.length);
for(String ch : channels) packer.packString(ch);
packer.packString("messages");
MessageSerializer.writeMsgpackArray(messages, packer);
}
}

static Message fromMsgpack(MessageUnpacker unpacker) throws IOException {
return (new Message()).readMsgpack(unpacker);
}
Expand Down
28 changes: 27 additions & 1 deletion lib/src/main/java/io/ably/lib/types/MessageSerializer.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,29 @@ public static void writeMsgpackArray(Message[] messages, MessagePacker packer) {
} catch(IOException e) {}
}

public static HttpCore.RequestBody asMsgpackRequest(Message.Batch[] pubSpecs) {
return new HttpUtils.ByteArrayRequestBody(writeMsgpackArray(pubSpecs), "application/x-msgpack");
}

static byte[] writeMsgpackArray(Message.Batch[] pubSpecs) {
try {
ByteArrayOutputStream out = new ByteArrayOutputStream();
MessagePacker packer = Serialisation.msgpackPackerConfig.newPacker(out);
writeMsgpackArray(pubSpecs, packer);
packer.flush();
return out.toByteArray();
} catch(IOException e) { return null; }
}

static void writeMsgpackArray(Message.Batch[] pubSpecs, MessagePacker packer) throws IOException {
try {
int count = pubSpecs.length;
packer.packArrayHeader(count);
for(Message.Batch spec : pubSpecs)
spec.writeMsgpack(packer);
} catch(IOException e) {}
}

/****************************************
* JSON decode
****************************************/
Expand All @@ -90,6 +113,10 @@ public static HttpCore.RequestBody asJsonRequest(Message[] messages) {
return new HttpUtils.JsonRequestBody(Serialisation.gson.toJson(messages));
}

public static HttpCore.RequestBody asJSONRequest(Message.Batch[] pubSpecs) {
return new HttpUtils.JsonRequestBody(Serialisation.gson.toJson(pubSpecs));
}

/****************************************
* BodyHandler
****************************************/
Expand Down Expand Up @@ -129,6 +156,5 @@ else if("application/x-msgpack".equals(contentType))
}

private static HttpCore.BodyHandler<Message> messageResponseHandler = new MessageBodyHandler(null);

private static final String TAG = MessageSerializer.class.getName();
}
Loading