diff --git a/smack-core/src/main/java/org/jivesoftware/smack/AbstractXMPPConnection.java b/smack-core/src/main/java/org/jivesoftware/smack/AbstractXMPPConnection.java index 4b6b73e5c..484e07873 100644 --- a/smack-core/src/main/java/org/jivesoftware/smack/AbstractXMPPConnection.java +++ b/smack-core/src/main/java/org/jivesoftware/smack/AbstractXMPPConnection.java @@ -49,6 +49,7 @@ import org.jivesoftware.smack.SmackException.NotConnectedException; import org.jivesoftware.smack.SmackException.ResourceBindingNotOfferedException; import org.jivesoftware.smack.SmackException.SecurityRequiredByClientException; import org.jivesoftware.smack.SmackException.SecurityRequiredException; +import org.jivesoftware.smack.SmackFuture.InternalSmackFuture; import org.jivesoftware.smack.XMPPException.StreamErrorException; import org.jivesoftware.smack.XMPPException.XMPPErrorException; import org.jivesoftware.smack.compress.packet.Compress; @@ -1489,6 +1490,87 @@ public abstract class AbstractXMPPConnection implements XMPPConnection { getReplyTimeout()); } + @Override + public SmackFuture sendIqRequestAsync(IQ request) { + return sendIqRequestAsync(request, getReplyTimeout()); + } + + @Override + public SmackFuture sendIqRequestAsync(IQ request, long timeout) { + StanzaFilter replyFilter = new IQReplyFilter(request, this); + return sendAsync(request, replyFilter, timeout); + } + + @Override + public SmackFuture sendAsync(S stanza, final StanzaFilter replyFilter) { + return sendAsync(stanza, replyFilter, getReplyTimeout()); + } + + @SuppressWarnings("FutureReturnValueIgnored") + @Override + public SmackFuture sendAsync(S stanza, final StanzaFilter replyFilter, long timeout) { + Objects.requireNonNull(stanza, "stanza must not be null"); + // While Smack allows to add PacketListeners with a PacketFilter value of 'null', we + // disallow it here in the async API as it makes no sense + Objects.requireNonNull(replyFilter, "replyFilter must not be null"); + + final InternalSmackFuture future = new InternalSmackFuture<>(); + + final StanzaListener stanzaListener = new StanzaListener() { + @Override + public void processStanza(Stanza stanza) throws NotConnectedException, InterruptedException { + boolean removed = removeAsyncStanzaListener(this); + if (!removed) { + // We lost a race against the "no response" handling runnable. Avoid calling the callback, as the + // exception callback will be invoked (if any). + return; + } + try { + XMPPErrorException.ifHasErrorThenThrow(stanza); + @SuppressWarnings("unchecked") + S s = (S) stanza; + future.setResult(s); + } + catch (XMPPErrorException exception) { + future.setException(exception); + } + } + }; + removeCallbacksService.schedule(new Runnable() { + @Override + public void run() { + boolean removed = removeAsyncStanzaListener(stanzaListener); + if (!removed) { + // We lost a race against the stanza listener, he already removed itself because he received a + // reply. There is nothing more to do here. + return; + } + + // If the packetListener got removed, then it was never run and + // we never received a response, inform the exception callback + Exception exception; + if (!isConnected()) { + // If the connection is no longer connected, throw a not connected exception. + exception = new NotConnectedException(AbstractXMPPConnection.this, replyFilter); + } + else { + exception = NoResponseException.newWith(AbstractXMPPConnection.this, replyFilter); + } + future.setException(exception); + } + }, timeout, TimeUnit.MILLISECONDS); + + addAsyncStanzaListener(stanzaListener, replyFilter); + try { + sendStanza(stanza); + } + catch (NotConnectedException | InterruptedException exception) { + future.setException(exception); + } + + return future; + } + @SuppressWarnings("FutureReturnValueIgnored") @Override public void sendStanzaWithResponseCallback(Stanza stanza, final StanzaFilter replyFilter, diff --git a/smack-core/src/main/java/org/jivesoftware/smack/SmackFuture.java b/smack-core/src/main/java/org/jivesoftware/smack/SmackFuture.java index 36dbda2cb..076a16d4c 100644 --- a/smack-core/src/main/java/org/jivesoftware/smack/SmackFuture.java +++ b/smack-core/src/main/java/org/jivesoftware/smack/SmackFuture.java @@ -16,25 +16,34 @@ */ package org.jivesoftware.smack; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import org.jivesoftware.smack.SmackException.NotConnectedException; import org.jivesoftware.smack.packet.Stanza; +import org.jivesoftware.smack.util.CallbackRecipient; +import org.jivesoftware.smack.util.ExceptionCallback; +import org.jivesoftware.smack.util.SuccessCallback; -public abstract class SmackFuture implements Future { +public abstract class SmackFuture implements Future, CallbackRecipient { private boolean cancelled; - private V result; + protected V result; - protected Exception exception; + protected E exception; private SuccessCallback successCallback; - private ExceptionCallback exceptionCallback; + private ExceptionCallback exceptionCallback; @Override public synchronized final boolean cancel(boolean mayInterruptIfRunning) { @@ -43,6 +52,11 @@ public abstract class SmackFuture implements Future { } cancelled = true; + + if (mayInterruptIfRunning) { + notifyAll(); + } + return true; } @@ -56,37 +70,62 @@ public abstract class SmackFuture implements Future { return result != null; } - public void onSuccessOrError(SuccessCallback successCallback, ExceptionCallback exceptionCallback) { + @Override + public CallbackRecipient onSuccess(SuccessCallback successCallback) { this.successCallback = successCallback; - this.exceptionCallback = exceptionCallback; - maybeInvokeCallbacks(); + return this; } - public void onSuccess(SuccessCallback successCallback) { - onSuccessOrError(successCallback, null); + @Override + public CallbackRecipient onError(ExceptionCallback exceptionCallback) { + this.exceptionCallback = exceptionCallback; + maybeInvokeCallbacks(); + return this; } - public void onError(ExceptionCallback exceptionCallback) { - onSuccessOrError(null, exceptionCallback); - } - - private final V getResultOrThrow() throws ExecutionException { - assert (result != null || exception != null); + private final V getOrThrowExecutionException() throws ExecutionException { + assert (result != null || exception != null || cancelled); if (result != null) { return result; } + if (exception != null) { + throw new ExecutionException(exception); + } - throw new ExecutionException(exception); + assert (cancelled); + throw new CancellationException(); } @Override public synchronized final V get() throws InterruptedException, ExecutionException { - while (result == null && exception == null) { + while (result == null && exception == null && !cancelled) { wait(); } - return getResultOrThrow(); + return getOrThrowExecutionException(); + } + + public synchronized final V getOrThrow() throws E { + while (result == null && exception == null && !cancelled) { + try { + wait(); + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + if (exception != null) { + throw exception; + } + + if (cancelled) { + throw new CancellationException(); + } + + assert result != null; + return result; } @Override @@ -100,45 +139,101 @@ public abstract class SmackFuture implements Future { } } + if (cancelled) { + throw new CancellationException(); + } + if (result == null || exception == null) { throw new TimeoutException(); } - return getResultOrThrow(); + return getOrThrowExecutionException(); } + private static final ExecutorService EXECUTOR_SERVICE; + + static { + ThreadFactory threadFactory = new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + Thread thread = new Thread(r); + thread.setDaemon(true); + thread.setName("SmackFuture Thread"); + return thread; + } + }; + BlockingQueue blockingQueue = new ArrayBlockingQueue<>(128); + RejectedExecutionHandler rejectedExecutionHandler = new RejectedExecutionHandler() { + @Override + public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { + r.run(); + } + }; + int cores = Runtime.getRuntime().availableProcessors(); + int maximumPoolSize = cores <= 4 ? 2 : cores; + ExecutorService executorService = new ThreadPoolExecutor(0, maximumPoolSize, 60L, TimeUnit.SECONDS, + blockingQueue, threadFactory, rejectedExecutionHandler); + + EXECUTOR_SERVICE = executorService; + } + + @SuppressWarnings("FutureReturnValueIgnored") protected final synchronized void maybeInvokeCallbacks() { + if (cancelled) { + return; + } + if (result != null && successCallback != null) { - successCallback.onSuccess(result); - } else if (exception != null && exceptionCallback != null) { - exceptionCallback.processException(exception); + EXECUTOR_SERVICE.submit(new Runnable() { + @Override + public void run() { + successCallback.onSuccess(result); + } + }); + } + else if (exception != null && exceptionCallback != null) { + EXECUTOR_SERVICE.submit(new Runnable() { + @Override + public void run() { + exceptionCallback.processException(exception); + } + }); } } - /** - * This method checks if the given exception is not fatal. If this method returns false, then - * the future will automatically set the given exception as failure reason and notify potential waiting threads. - * - * @param exception the exception to check. - * @return true if the exception is not fatal, false otherwise. - */ - protected abstract boolean isNonFatalException(Exception exception); + public static class InternalSmackFuture extends SmackFuture { + public final synchronized void setResult(V result) { + this.result = result; + this.notifyAll(); - protected abstract void handleStanza(Stanza stanza) throws NotConnectedException, InterruptedException; + maybeInvokeCallbacks(); + } - protected final void setResult(V result) { - assert (Thread.holdsLock(this)); + public final synchronized void setException(E exception) { + this.exception = exception; + this.notifyAll(); - this.result = result; - this.notifyAll(); - - maybeInvokeCallbacks(); + maybeInvokeCallbacks(); + } } - public static abstract class InternalSmackFuture extends SmackFuture implements StanzaListener, ExceptionCallback { + public static abstract class InternalProcessStanzaSmackFuture extends InternalSmackFuture + implements StanzaListener, ExceptionCallback { + + /** + * This method checks if the given exception is not fatal. If this method returns false, + * then the future will automatically set the given exception as failure reason and notify potential waiting + * threads. + * + * @param exception the exception to check. + * @return true if the exception is not fatal, false otherwise. + */ + protected abstract boolean isNonFatalException(E exception); + + protected abstract void handleStanza(Stanza stanza); @Override - public synchronized final void processException(Exception exception) { + public synchronized final void processException(E exception) { if (!isNonFatalException(exception)) { this.exception = exception; this.notifyAll(); @@ -151,20 +246,29 @@ public abstract class SmackFuture implements Future { * Wrapper method for {@link #handleStanza(Stanza)}. Note that this method is synchronized. */ @Override - public synchronized final void processStanza(Stanza stanza) throws NotConnectedException, InterruptedException { + public synchronized final void processStanza(Stanza stanza) { handleStanza(stanza); } } /** - * A simple version of InternalSmackFuture which implements {@link #isNonFatalException(Exception)} as always returning false method. + * A simple version of InternalSmackFuture which implements isNonFatalException(E) as always returning + * false method. * * @param */ - public static abstract class SimpleInternalSmackFuture extends InternalSmackFuture { + public static abstract class SimpleInternalProcessStanzaSmackFuture + extends InternalProcessStanzaSmackFuture { @Override - protected boolean isNonFatalException(Exception exception) { + protected boolean isNonFatalException(E exception) { return false; } } + + public static SmackFuture from(V result) { + InternalSmackFuture future = new InternalSmackFuture<>(); + future.setResult(result); + return future; + } + } diff --git a/smack-core/src/main/java/org/jivesoftware/smack/XMPPConnection.java b/smack-core/src/main/java/org/jivesoftware/smack/XMPPConnection.java index 10894237f..ce4c2f138 100644 --- a/smack-core/src/main/java/org/jivesoftware/smack/XMPPConnection.java +++ b/smack-core/src/main/java/org/jivesoftware/smack/XMPPConnection.java @@ -428,6 +428,43 @@ public interface XMPPConnection { */ public boolean hasFeature(String element, String namespace); + + /** + * Send an IQ request asynchronously. The connection's default reply timeout will be used. + * + * @param request the IQ request to send. + * @return a SmackFuture for the response. + */ + public SmackFuture sendIqRequestAsync(IQ request); + + /** + * Send an IQ request asynchronously. + * + * @param request the IQ request to send. + * @param timeout the reply timeout in milliseconds. + * @return a SmackFuture for the response. + */ + public SmackFuture sendIqRequestAsync(IQ request, long timeout); + + /** + * Send a stanza asynchronously, waiting for exactly one response stanza using the given reply filter. The connection's default reply timeout will be used. + * + * @param stanza the stanza to send. + * @param replyFilter the filter used for the response stanza. + * @return a SmackFuture for the response. + */ + public SmackFuture sendAsync(S stanza, StanzaFilter replyFilter); + + /** + * Send a stanza asynchronously, waiting for exactly one response stanza using the given reply filter. + * + * @param stanza the stanza to send. + * @param replyFilter the filter used for the response stanza. + * @param timeout the reply timeout in milliseconds. + * @return a SmackFuture for the response. + */ + public SmackFuture sendAsync(S stanza, StanzaFilter replyFilter, long timeout); + /** * Send a stanza and wait asynchronously for a response by using replyFilter. *

@@ -442,6 +479,7 @@ public interface XMPPConnection { * @throws NotConnectedException * @throws InterruptedException */ + // TODO: Mark deprecated in favor of the new SmackFuture based async API. public void sendStanzaWithResponseCallback(Stanza stanza, StanzaFilter replyFilter, StanzaListener callback) throws NotConnectedException, InterruptedException; @@ -460,6 +498,7 @@ public interface XMPPConnection { * @throws NotConnectedException * @throws InterruptedException */ + // TODO: Mark deprecated in favor of the new SmackFuture based async API. And do not forget to mark smack.ExceptionCallback deprecated too. public void sendStanzaWithResponseCallback(Stanza stanza, StanzaFilter replyFilter, StanzaListener callback, ExceptionCallback exceptionCallback) throws NotConnectedException, InterruptedException; @@ -479,6 +518,7 @@ public interface XMPPConnection { * @throws NotConnectedException * @throws InterruptedException */ + // TODO: Mark deprecated in favor of the new SmackFuture based async API. public void sendStanzaWithResponseCallback(Stanza stanza, StanzaFilter replyFilter, final StanzaListener callback, final ExceptionCallback exceptionCallback, long timeout) throws NotConnectedException, InterruptedException; @@ -493,6 +533,7 @@ public interface XMPPConnection { * @throws NotConnectedException * @throws InterruptedException */ + // TODO: Mark deprecated in favor of the new SmackFuture based async API. public void sendIqWithResponseCallback(IQ iqRequest, StanzaListener callback) throws NotConnectedException, InterruptedException; /** @@ -509,6 +550,7 @@ public interface XMPPConnection { * @throws NotConnectedException * @throws InterruptedException */ + // TODO: Mark deprecated in favor of the new SmackFuture based async API. public void sendIqWithResponseCallback(IQ iqRequest, StanzaListener callback, ExceptionCallback exceptionCallback) throws NotConnectedException, InterruptedException; @@ -527,6 +569,7 @@ public interface XMPPConnection { * @throws NotConnectedException * @throws InterruptedException */ + // TODO: Mark deprecated in favor of the new SmackFuture based async API. public void sendIqWithResponseCallback(IQ iqRequest, final StanzaListener callback, final ExceptionCallback exceptionCallback, long timeout) throws NotConnectedException, InterruptedException; diff --git a/smack-core/src/main/java/org/jivesoftware/smack/util/CallbackRecipient.java b/smack-core/src/main/java/org/jivesoftware/smack/util/CallbackRecipient.java new file mode 100644 index 000000000..18d631361 --- /dev/null +++ b/smack-core/src/main/java/org/jivesoftware/smack/util/CallbackRecipient.java @@ -0,0 +1,25 @@ +/** + * + * Copyright 2017 Florian Schmaus + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.jivesoftware.smack.util; + +public interface CallbackRecipient { + + CallbackRecipient onSuccess(SuccessCallback successCallback); + + CallbackRecipient onError(ExceptionCallback exceptionCallback); + +} diff --git a/smack-core/src/main/java/org/jivesoftware/smack/util/ExceptionCallback.java b/smack-core/src/main/java/org/jivesoftware/smack/util/ExceptionCallback.java new file mode 100644 index 000000000..52ffb4454 --- /dev/null +++ b/smack-core/src/main/java/org/jivesoftware/smack/util/ExceptionCallback.java @@ -0,0 +1,23 @@ +/** + * + * Copyright 2017 Florian Schmaus + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.jivesoftware.smack.util; + +public interface ExceptionCallback { + + public void processException(E exception); + +} diff --git a/smack-core/src/main/java/org/jivesoftware/smack/SuccessCallback.java b/smack-core/src/main/java/org/jivesoftware/smack/util/SuccessCallback.java similarity index 94% rename from smack-core/src/main/java/org/jivesoftware/smack/SuccessCallback.java rename to smack-core/src/main/java/org/jivesoftware/smack/util/SuccessCallback.java index 49106a9f1..ba4d24162 100644 --- a/smack-core/src/main/java/org/jivesoftware/smack/SuccessCallback.java +++ b/smack-core/src/main/java/org/jivesoftware/smack/util/SuccessCallback.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.jivesoftware.smack; +package org.jivesoftware.smack.util; public interface SuccessCallback { diff --git a/smack-core/src/test/java/org/jivesoftware/smack/SmackFutureTest.java b/smack-core/src/test/java/org/jivesoftware/smack/SmackFutureTest.java index 90bb7cb20..a8f4a8442 100644 --- a/smack-core/src/test/java/org/jivesoftware/smack/SmackFutureTest.java +++ b/smack-core/src/test/java/org/jivesoftware/smack/SmackFutureTest.java @@ -22,9 +22,8 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import org.jivesoftware.smack.SmackException.NotConnectedException; -import org.jivesoftware.smack.SmackFuture.InternalSmackFuture; -import org.jivesoftware.smack.SmackFuture.SimpleInternalSmackFuture; +import org.jivesoftware.smack.SmackFuture.InternalProcessStanzaSmackFuture; +import org.jivesoftware.smack.SmackFuture.SimpleInternalProcessStanzaSmackFuture; import org.jivesoftware.smack.packet.Stanza; import org.junit.Test; @@ -32,10 +31,10 @@ import org.junit.Test; public class SmackFutureTest { @Test - public void simpleSmackFutureSuccessTest() throws NotConnectedException, InterruptedException, ExecutionException { - InternalSmackFuture future = new SimpleInternalSmackFuture() { + public void simpleSmackFutureSuccessTest() throws InterruptedException, ExecutionException { + InternalProcessStanzaSmackFuture future = new SimpleInternalProcessStanzaSmackFuture() { @Override - protected void handleStanza(Stanza stanza) throws NotConnectedException, InterruptedException { + protected void handleStanza(Stanza stanza) { setResult(true); } }; @@ -47,9 +46,9 @@ public class SmackFutureTest { @Test(expected = TimeoutException.class) public void simpleSmackFutureTimeoutTest() throws InterruptedException, ExecutionException, TimeoutException { - InternalSmackFuture future = new SimpleInternalSmackFuture() { + InternalProcessStanzaSmackFuture future = new SimpleInternalProcessStanzaSmackFuture() { @Override - protected void handleStanza(Stanza stanza) throws NotConnectedException, InterruptedException { + protected void handleStanza(Stanza stanza) { } }; diff --git a/smack-extensions/src/main/java/org/jivesoftware/smackx/ping/PingManager.java b/smack-extensions/src/main/java/org/jivesoftware/smackx/ping/PingManager.java index 2eb2452bb..958ef7cc5 100644 --- a/smack-extensions/src/main/java/org/jivesoftware/smackx/ping/PingManager.java +++ b/smack-extensions/src/main/java/org/jivesoftware/smackx/ping/PingManager.java @@ -33,9 +33,8 @@ import org.jivesoftware.smack.Manager; import org.jivesoftware.smack.SmackException; import org.jivesoftware.smack.SmackException.NoResponseException; import org.jivesoftware.smack.SmackException.NotConnectedException; -import org.jivesoftware.smack.SmackException.NotLoggedInException; import org.jivesoftware.smack.SmackFuture; -import org.jivesoftware.smack.SmackFuture.InternalSmackFuture; +import org.jivesoftware.smack.SmackFuture.InternalProcessStanzaSmackFuture; import org.jivesoftware.smack.XMPPConnection; import org.jivesoftware.smack.XMPPConnectionRegistry; import org.jivesoftware.smack.XMPPException.XMPPErrorException; @@ -45,7 +44,9 @@ import org.jivesoftware.smack.packet.IQ; import org.jivesoftware.smack.packet.IQ.Type; import org.jivesoftware.smack.packet.Stanza; import org.jivesoftware.smack.packet.XMPPError; +import org.jivesoftware.smack.util.ExceptionCallback; import org.jivesoftware.smack.util.SmackExecutorThreadFactory; +import org.jivesoftware.smack.util.SuccessCallback; import org.jivesoftware.smackx.disco.ServiceDiscoveryManager; import org.jivesoftware.smackx.ping.packet.Ping; @@ -178,14 +179,14 @@ public final class PingManager extends Manager { return type == XMPPError.Type.CANCEL && condition == XMPPError.Condition.feature_not_implemented; } - public SmackFuture pingAsync(Jid jid) { + public SmackFuture pingAsync(Jid jid) { return pingAsync(jid, connection().getReplyTimeout()); } - public SmackFuture pingAsync(final Jid jid, long pongTimeout) { - final InternalSmackFuture future = new InternalSmackFuture() { + public SmackFuture pingAsync(final Jid jid, long pongTimeout) { + final InternalProcessStanzaSmackFuture future = new InternalProcessStanzaSmackFuture() { @Override - public void handleStanza(Stanza packet) throws NotConnectedException, InterruptedException { + public void handleStanza(Stanza packet) { setResult(true); } @Override @@ -202,13 +203,19 @@ public final class PingManager extends Manager { }; Ping ping = new Ping(jid); - try { - XMPPConnection connection = getAuthenticatedConnectionOrThrow(); - connection.sendIqWithResponseCallback(ping, future, future, pongTimeout); - } - catch (NotLoggedInException | NotConnectedException | InterruptedException e) { - future.processException(e); - } + connection().sendIqRequestAsync(ping, pongTimeout) + .onSuccess(new SuccessCallback() { + @Override + public void onSuccess(IQ result) { + future.processStanza(result); + } + }) + .onError(new ExceptionCallback() { + @Override + public void processException(Exception exception) { + future.processException(exception); + } + }); return future; }