Rework SmackFuture and add async API based on it

This commit is contained in:
Florian Schmaus 2017-08-12 17:35:20 +02:00
parent 0602ae064a
commit f4391c07d7
8 changed files with 349 additions and 66 deletions

View File

@ -49,6 +49,7 @@ import org.jivesoftware.smack.SmackException.NotConnectedException;
import org.jivesoftware.smack.SmackException.ResourceBindingNotOfferedException;
import org.jivesoftware.smack.SmackException.SecurityRequiredByClientException;
import org.jivesoftware.smack.SmackException.SecurityRequiredException;
import org.jivesoftware.smack.SmackFuture.InternalSmackFuture;
import org.jivesoftware.smack.XMPPException.StreamErrorException;
import org.jivesoftware.smack.XMPPException.XMPPErrorException;
import org.jivesoftware.smack.compress.packet.Compress;
@ -1489,6 +1490,87 @@ public abstract class AbstractXMPPConnection implements XMPPConnection {
getReplyTimeout());
}
@Override
public SmackFuture<IQ, Exception> sendIqRequestAsync(IQ request) {
return sendIqRequestAsync(request, getReplyTimeout());
}
@Override
public SmackFuture<IQ, Exception> sendIqRequestAsync(IQ request, long timeout) {
StanzaFilter replyFilter = new IQReplyFilter(request, this);
return sendAsync(request, replyFilter, timeout);
}
@Override
public <S extends Stanza> SmackFuture<S, Exception> sendAsync(S stanza, final StanzaFilter replyFilter) {
return sendAsync(stanza, replyFilter, getReplyTimeout());
}
@SuppressWarnings("FutureReturnValueIgnored")
@Override
public <S extends Stanza> SmackFuture<S, Exception> sendAsync(S stanza, final StanzaFilter replyFilter, long timeout) {
Objects.requireNonNull(stanza, "stanza must not be null");
// While Smack allows to add PacketListeners with a PacketFilter value of 'null', we
// disallow it here in the async API as it makes no sense
Objects.requireNonNull(replyFilter, "replyFilter must not be null");
final InternalSmackFuture<S, Exception> future = new InternalSmackFuture<>();
final StanzaListener stanzaListener = new StanzaListener() {
@Override
public void processStanza(Stanza stanza) throws NotConnectedException, InterruptedException {
boolean removed = removeAsyncStanzaListener(this);
if (!removed) {
// We lost a race against the "no response" handling runnable. Avoid calling the callback, as the
// exception callback will be invoked (if any).
return;
}
try {
XMPPErrorException.ifHasErrorThenThrow(stanza);
@SuppressWarnings("unchecked")
S s = (S) stanza;
future.setResult(s);
}
catch (XMPPErrorException exception) {
future.setException(exception);
}
}
};
removeCallbacksService.schedule(new Runnable() {
@Override
public void run() {
boolean removed = removeAsyncStanzaListener(stanzaListener);
if (!removed) {
// We lost a race against the stanza listener, he already removed itself because he received a
// reply. There is nothing more to do here.
return;
}
// If the packetListener got removed, then it was never run and
// we never received a response, inform the exception callback
Exception exception;
if (!isConnected()) {
// If the connection is no longer connected, throw a not connected exception.
exception = new NotConnectedException(AbstractXMPPConnection.this, replyFilter);
}
else {
exception = NoResponseException.newWith(AbstractXMPPConnection.this, replyFilter);
}
future.setException(exception);
}
}, timeout, TimeUnit.MILLISECONDS);
addAsyncStanzaListener(stanzaListener, replyFilter);
try {
sendStanza(stanza);
}
catch (NotConnectedException | InterruptedException exception) {
future.setException(exception);
}
return future;
}
@SuppressWarnings("FutureReturnValueIgnored")
@Override
public void sendStanzaWithResponseCallback(Stanza stanza, final StanzaFilter replyFilter,

View File

@ -16,25 +16,34 @@
*/
package org.jivesoftware.smack;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.jivesoftware.smack.SmackException.NotConnectedException;
import org.jivesoftware.smack.packet.Stanza;
import org.jivesoftware.smack.util.CallbackRecipient;
import org.jivesoftware.smack.util.ExceptionCallback;
import org.jivesoftware.smack.util.SuccessCallback;
public abstract class SmackFuture<V> implements Future<V> {
public abstract class SmackFuture<V, E extends Exception> implements Future<V>, CallbackRecipient<V, E> {
private boolean cancelled;
private V result;
protected V result;
protected Exception exception;
protected E exception;
private SuccessCallback<V> successCallback;
private ExceptionCallback exceptionCallback;
private ExceptionCallback<E> exceptionCallback;
@Override
public synchronized final boolean cancel(boolean mayInterruptIfRunning) {
@ -43,6 +52,11 @@ public abstract class SmackFuture<V> implements Future<V> {
}
cancelled = true;
if (mayInterruptIfRunning) {
notifyAll();
}
return true;
}
@ -56,37 +70,62 @@ public abstract class SmackFuture<V> implements Future<V> {
return result != null;
}
public void onSuccessOrError(SuccessCallback<V> successCallback, ExceptionCallback exceptionCallback) {
@Override
public CallbackRecipient<V, E> onSuccess(SuccessCallback<V> successCallback) {
this.successCallback = successCallback;
this.exceptionCallback = exceptionCallback;
maybeInvokeCallbacks();
return this;
}
public void onSuccess(SuccessCallback<V> successCallback) {
onSuccessOrError(successCallback, null);
@Override
public CallbackRecipient<V, E> onError(ExceptionCallback<E> exceptionCallback) {
this.exceptionCallback = exceptionCallback;
maybeInvokeCallbacks();
return this;
}
public void onError(ExceptionCallback exceptionCallback) {
onSuccessOrError(null, exceptionCallback);
}
private final V getResultOrThrow() throws ExecutionException {
assert (result != null || exception != null);
private final V getOrThrowExecutionException() throws ExecutionException {
assert (result != null || exception != null || cancelled);
if (result != null) {
return result;
}
if (exception != null) {
throw new ExecutionException(exception);
}
throw new ExecutionException(exception);
assert (cancelled);
throw new CancellationException();
}
@Override
public synchronized final V get() throws InterruptedException, ExecutionException {
while (result == null && exception == null) {
while (result == null && exception == null && !cancelled) {
wait();
}
return getResultOrThrow();
return getOrThrowExecutionException();
}
public synchronized final V getOrThrow() throws E {
while (result == null && exception == null && !cancelled) {
try {
wait();
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
if (exception != null) {
throw exception;
}
if (cancelled) {
throw new CancellationException();
}
assert result != null;
return result;
}
@Override
@ -100,45 +139,101 @@ public abstract class SmackFuture<V> implements Future<V> {
}
}
if (cancelled) {
throw new CancellationException();
}
if (result == null || exception == null) {
throw new TimeoutException();
}
return getResultOrThrow();
return getOrThrowExecutionException();
}
private static final ExecutorService EXECUTOR_SERVICE;
static {
ThreadFactory threadFactory = new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("SmackFuture Thread");
return thread;
}
};
BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<>(128);
RejectedExecutionHandler rejectedExecutionHandler = new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
r.run();
}
};
int cores = Runtime.getRuntime().availableProcessors();
int maximumPoolSize = cores <= 4 ? 2 : cores;
ExecutorService executorService = new ThreadPoolExecutor(0, maximumPoolSize, 60L, TimeUnit.SECONDS,
blockingQueue, threadFactory, rejectedExecutionHandler);
EXECUTOR_SERVICE = executorService;
}
@SuppressWarnings("FutureReturnValueIgnored")
protected final synchronized void maybeInvokeCallbacks() {
if (cancelled) {
return;
}
if (result != null && successCallback != null) {
successCallback.onSuccess(result);
} else if (exception != null && exceptionCallback != null) {
exceptionCallback.processException(exception);
EXECUTOR_SERVICE.submit(new Runnable() {
@Override
public void run() {
successCallback.onSuccess(result);
}
});
}
else if (exception != null && exceptionCallback != null) {
EXECUTOR_SERVICE.submit(new Runnable() {
@Override
public void run() {
exceptionCallback.processException(exception);
}
});
}
}
/**
* This method checks if the given exception is <b>not</b> fatal. If this method returns <code>false</code>, then
* the future will automatically set the given exception as failure reason and notify potential waiting threads.
*
* @param exception the exception to check.
* @return <code>true</code> if the exception is not fatal, <code>false</code> otherwise.
*/
protected abstract boolean isNonFatalException(Exception exception);
public static class InternalSmackFuture<V, E extends Exception> extends SmackFuture<V, E> {
public final synchronized void setResult(V result) {
this.result = result;
this.notifyAll();
protected abstract void handleStanza(Stanza stanza) throws NotConnectedException, InterruptedException;
maybeInvokeCallbacks();
}
protected final void setResult(V result) {
assert (Thread.holdsLock(this));
public final synchronized void setException(E exception) {
this.exception = exception;
this.notifyAll();
this.result = result;
this.notifyAll();
maybeInvokeCallbacks();
maybeInvokeCallbacks();
}
}
public static abstract class InternalSmackFuture<V> extends SmackFuture<V> implements StanzaListener, ExceptionCallback {
public static abstract class InternalProcessStanzaSmackFuture<V, E extends Exception> extends InternalSmackFuture<V, E>
implements StanzaListener, ExceptionCallback<E> {
/**
* This method checks if the given exception is <b>not</b> fatal. If this method returns <code>false</code>,
* then the future will automatically set the given exception as failure reason and notify potential waiting
* threads.
*
* @param exception the exception to check.
* @return <code>true</code> if the exception is not fatal, <code>false</code> otherwise.
*/
protected abstract boolean isNonFatalException(E exception);
protected abstract void handleStanza(Stanza stanza);
@Override
public synchronized final void processException(Exception exception) {
public synchronized final void processException(E exception) {
if (!isNonFatalException(exception)) {
this.exception = exception;
this.notifyAll();
@ -151,20 +246,29 @@ public abstract class SmackFuture<V> implements Future<V> {
* Wrapper method for {@link #handleStanza(Stanza)}. Note that this method is <code>synchronized</code>.
*/
@Override
public synchronized final void processStanza(Stanza stanza) throws NotConnectedException, InterruptedException {
public synchronized final void processStanza(Stanza stanza) {
handleStanza(stanza);
}
}
/**
* A simple version of InternalSmackFuture which implements {@link #isNonFatalException(Exception)} as always returning <code>false</code> method.
* A simple version of InternalSmackFuture which implements isNonFatalException(E) as always returning
* <code>false</code> method.
*
* @param <V>
*/
public static abstract class SimpleInternalSmackFuture<V> extends InternalSmackFuture<V> {
public static abstract class SimpleInternalProcessStanzaSmackFuture<V, E extends Exception>
extends InternalProcessStanzaSmackFuture<V, E> {
@Override
protected boolean isNonFatalException(Exception exception) {
protected boolean isNonFatalException(E exception) {
return false;
}
}
public static <V, E extends Exception> SmackFuture<V, E> from(V result) {
InternalSmackFuture<V, E> future = new InternalSmackFuture<>();
future.setResult(result);
return future;
}
}

View File

@ -428,6 +428,43 @@ public interface XMPPConnection {
*/
public boolean hasFeature(String element, String namespace);
/**
* Send an IQ request asynchronously. The connection's default reply timeout will be used.
*
* @param request the IQ request to send.
* @return a SmackFuture for the response.
*/
public SmackFuture<IQ, Exception> sendIqRequestAsync(IQ request);
/**
* Send an IQ request asynchronously.
*
* @param request the IQ request to send.
* @param timeout the reply timeout in milliseconds.
* @return a SmackFuture for the response.
*/
public SmackFuture<IQ, Exception> sendIqRequestAsync(IQ request, long timeout);
/**
* Send a stanza asynchronously, waiting for exactly one response stanza using the given reply filter. The connection's default reply timeout will be used.
*
* @param stanza the stanza to send.
* @param replyFilter the filter used for the response stanza.
* @return a SmackFuture for the response.
*/
public <S extends Stanza> SmackFuture<S, Exception> sendAsync(S stanza, StanzaFilter replyFilter);
/**
* Send a stanza asynchronously, waiting for exactly one response stanza using the given reply filter.
*
* @param stanza the stanza to send.
* @param replyFilter the filter used for the response stanza.
* @param timeout the reply timeout in milliseconds.
* @return a SmackFuture for the response.
*/
public <S extends Stanza> SmackFuture<S, Exception> sendAsync(S stanza, StanzaFilter replyFilter, long timeout);
/**
* Send a stanza and wait asynchronously for a response by using <code>replyFilter</code>.
* <p>
@ -442,6 +479,7 @@ public interface XMPPConnection {
* @throws NotConnectedException
* @throws InterruptedException
*/
// TODO: Mark deprecated in favor of the new SmackFuture based async API.
public void sendStanzaWithResponseCallback(Stanza stanza, StanzaFilter replyFilter,
StanzaListener callback) throws NotConnectedException, InterruptedException;
@ -460,6 +498,7 @@ public interface XMPPConnection {
* @throws NotConnectedException
* @throws InterruptedException
*/
// TODO: Mark deprecated in favor of the new SmackFuture based async API. And do not forget to mark smack.ExceptionCallback deprecated too.
public void sendStanzaWithResponseCallback(Stanza stanza, StanzaFilter replyFilter, StanzaListener callback,
ExceptionCallback exceptionCallback) throws NotConnectedException, InterruptedException;
@ -479,6 +518,7 @@ public interface XMPPConnection {
* @throws NotConnectedException
* @throws InterruptedException
*/
// TODO: Mark deprecated in favor of the new SmackFuture based async API.
public void sendStanzaWithResponseCallback(Stanza stanza, StanzaFilter replyFilter,
final StanzaListener callback, final ExceptionCallback exceptionCallback,
long timeout) throws NotConnectedException, InterruptedException;
@ -493,6 +533,7 @@ public interface XMPPConnection {
* @throws NotConnectedException
* @throws InterruptedException
*/
// TODO: Mark deprecated in favor of the new SmackFuture based async API.
public void sendIqWithResponseCallback(IQ iqRequest, StanzaListener callback) throws NotConnectedException, InterruptedException;
/**
@ -509,6 +550,7 @@ public interface XMPPConnection {
* @throws NotConnectedException
* @throws InterruptedException
*/
// TODO: Mark deprecated in favor of the new SmackFuture based async API.
public void sendIqWithResponseCallback(IQ iqRequest, StanzaListener callback,
ExceptionCallback exceptionCallback) throws NotConnectedException, InterruptedException;
@ -527,6 +569,7 @@ public interface XMPPConnection {
* @throws NotConnectedException
* @throws InterruptedException
*/
// TODO: Mark deprecated in favor of the new SmackFuture based async API.
public void sendIqWithResponseCallback(IQ iqRequest, final StanzaListener callback,
final ExceptionCallback exceptionCallback, long timeout)
throws NotConnectedException, InterruptedException;

View File

@ -0,0 +1,25 @@
/**
*
* Copyright 2017 Florian Schmaus
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jivesoftware.smack.util;
public interface CallbackRecipient<V,E> {
CallbackRecipient<V, E> onSuccess(SuccessCallback<V> successCallback);
CallbackRecipient<V, E> onError(ExceptionCallback<E> exceptionCallback);
}

View File

@ -0,0 +1,23 @@
/**
*
* Copyright 2017 Florian Schmaus
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jivesoftware.smack.util;
public interface ExceptionCallback<E> {
public void processException(E exception);
}

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jivesoftware.smack;
package org.jivesoftware.smack.util;
public interface SuccessCallback<T> {

View File

@ -22,9 +22,8 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.jivesoftware.smack.SmackException.NotConnectedException;
import org.jivesoftware.smack.SmackFuture.InternalSmackFuture;
import org.jivesoftware.smack.SmackFuture.SimpleInternalSmackFuture;
import org.jivesoftware.smack.SmackFuture.InternalProcessStanzaSmackFuture;
import org.jivesoftware.smack.SmackFuture.SimpleInternalProcessStanzaSmackFuture;
import org.jivesoftware.smack.packet.Stanza;
import org.junit.Test;
@ -32,10 +31,10 @@ import org.junit.Test;
public class SmackFutureTest {
@Test
public void simpleSmackFutureSuccessTest() throws NotConnectedException, InterruptedException, ExecutionException {
InternalSmackFuture<Boolean> future = new SimpleInternalSmackFuture<Boolean>() {
public void simpleSmackFutureSuccessTest() throws InterruptedException, ExecutionException {
InternalProcessStanzaSmackFuture<Boolean, Exception> future = new SimpleInternalProcessStanzaSmackFuture<Boolean, Exception>() {
@Override
protected void handleStanza(Stanza stanza) throws NotConnectedException, InterruptedException {
protected void handleStanza(Stanza stanza) {
setResult(true);
}
};
@ -47,9 +46,9 @@ public class SmackFutureTest {
@Test(expected = TimeoutException.class)
public void simpleSmackFutureTimeoutTest() throws InterruptedException, ExecutionException, TimeoutException {
InternalSmackFuture<Boolean> future = new SimpleInternalSmackFuture<Boolean>() {
InternalProcessStanzaSmackFuture<Boolean, Exception> future = new SimpleInternalProcessStanzaSmackFuture<Boolean, Exception>() {
@Override
protected void handleStanza(Stanza stanza) throws NotConnectedException, InterruptedException {
protected void handleStanza(Stanza stanza) {
}
};

View File

@ -33,9 +33,8 @@ import org.jivesoftware.smack.Manager;
import org.jivesoftware.smack.SmackException;
import org.jivesoftware.smack.SmackException.NoResponseException;
import org.jivesoftware.smack.SmackException.NotConnectedException;
import org.jivesoftware.smack.SmackException.NotLoggedInException;
import org.jivesoftware.smack.SmackFuture;
import org.jivesoftware.smack.SmackFuture.InternalSmackFuture;
import org.jivesoftware.smack.SmackFuture.InternalProcessStanzaSmackFuture;
import org.jivesoftware.smack.XMPPConnection;
import org.jivesoftware.smack.XMPPConnectionRegistry;
import org.jivesoftware.smack.XMPPException.XMPPErrorException;
@ -45,7 +44,9 @@ import org.jivesoftware.smack.packet.IQ;
import org.jivesoftware.smack.packet.IQ.Type;
import org.jivesoftware.smack.packet.Stanza;
import org.jivesoftware.smack.packet.XMPPError;
import org.jivesoftware.smack.util.ExceptionCallback;
import org.jivesoftware.smack.util.SmackExecutorThreadFactory;
import org.jivesoftware.smack.util.SuccessCallback;
import org.jivesoftware.smackx.disco.ServiceDiscoveryManager;
import org.jivesoftware.smackx.ping.packet.Ping;
@ -178,14 +179,14 @@ public final class PingManager extends Manager {
return type == XMPPError.Type.CANCEL && condition == XMPPError.Condition.feature_not_implemented;
}
public SmackFuture<Boolean> pingAsync(Jid jid) {
public SmackFuture<Boolean, Exception> pingAsync(Jid jid) {
return pingAsync(jid, connection().getReplyTimeout());
}
public SmackFuture<Boolean> pingAsync(final Jid jid, long pongTimeout) {
final InternalSmackFuture<Boolean> future = new InternalSmackFuture<Boolean>() {
public SmackFuture<Boolean, Exception> pingAsync(final Jid jid, long pongTimeout) {
final InternalProcessStanzaSmackFuture<Boolean, Exception> future = new InternalProcessStanzaSmackFuture<Boolean, Exception>() {
@Override
public void handleStanza(Stanza packet) throws NotConnectedException, InterruptedException {
public void handleStanza(Stanza packet) {
setResult(true);
}
@Override
@ -202,13 +203,19 @@ public final class PingManager extends Manager {
};
Ping ping = new Ping(jid);
try {
XMPPConnection connection = getAuthenticatedConnectionOrThrow();
connection.sendIqWithResponseCallback(ping, future, future, pongTimeout);
}
catch (NotLoggedInException | NotConnectedException | InterruptedException e) {
future.processException(e);
}
connection().sendIqRequestAsync(ping, pongTimeout)
.onSuccess(new SuccessCallback<IQ>() {
@Override
public void onSuccess(IQ result) {
future.processStanza(result);
}
})
.onError(new ExceptionCallback<Exception>() {
@Override
public void processException(Exception exception) {
future.processException(exception);
}
});
return future;
}