diff --git a/smack-core/src/main/java/org/jivesoftware/smack/SmackFuture.java b/smack-core/src/main/java/org/jivesoftware/smack/SmackFuture.java new file mode 100644 index 000000000..36dbda2cb --- /dev/null +++ b/smack-core/src/main/java/org/jivesoftware/smack/SmackFuture.java @@ -0,0 +1,170 @@ +/** + * + * Copyright 2017 Florian Schmaus + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.jivesoftware.smack; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.jivesoftware.smack.SmackException.NotConnectedException; +import org.jivesoftware.smack.packet.Stanza; + +public abstract class SmackFuture implements Future { + + private boolean cancelled; + + private V result; + + protected Exception exception; + + private SuccessCallback successCallback; + + private ExceptionCallback exceptionCallback; + + @Override + public synchronized final boolean cancel(boolean mayInterruptIfRunning) { + if (isDone()) { + return false; + } + + cancelled = true; + return true; + } + + @Override + public synchronized final boolean isCancelled() { + return cancelled; + } + + @Override + public synchronized final boolean isDone() { + return result != null; + } + + public void onSuccessOrError(SuccessCallback successCallback, ExceptionCallback exceptionCallback) { + this.successCallback = successCallback; + this.exceptionCallback = exceptionCallback; + + maybeInvokeCallbacks(); + } + + public void onSuccess(SuccessCallback successCallback) { + onSuccessOrError(successCallback, null); + } + + public void onError(ExceptionCallback exceptionCallback) { + onSuccessOrError(null, exceptionCallback); + } + + private final V getResultOrThrow() throws ExecutionException { + assert (result != null || exception != null); + if (result != null) { + return result; + } + + throw new ExecutionException(exception); + } + + @Override + public synchronized final V get() throws InterruptedException, ExecutionException { + while (result == null && exception == null) { + wait(); + } + + return getResultOrThrow(); + } + + @Override + public synchronized final V get(long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + final long deadline = System.currentTimeMillis() + unit.toMillis(timeout); + while (result != null && exception != null) { + final long waitTimeRemaining = deadline - System.currentTimeMillis(); + if (waitTimeRemaining > 0) { + wait(waitTimeRemaining); + } + } + + if (result == null || exception == null) { + throw new TimeoutException(); + } + + return getResultOrThrow(); + } + + protected final synchronized void maybeInvokeCallbacks() { + if (result != null && successCallback != null) { + successCallback.onSuccess(result); + } else if (exception != null && exceptionCallback != null) { + exceptionCallback.processException(exception); + } + } + + /** + * This method checks if the given exception is not fatal. If this method returns false, then + * the future will automatically set the given exception as failure reason and notify potential waiting threads. + * + * @param exception the exception to check. + * @return true if the exception is not fatal, false otherwise. + */ + protected abstract boolean isNonFatalException(Exception exception); + + protected abstract void handleStanza(Stanza stanza) throws NotConnectedException, InterruptedException; + + protected final void setResult(V result) { + assert (Thread.holdsLock(this)); + + this.result = result; + this.notifyAll(); + + maybeInvokeCallbacks(); + } + + public static abstract class InternalSmackFuture extends SmackFuture implements StanzaListener, ExceptionCallback { + + @Override + public synchronized final void processException(Exception exception) { + if (!isNonFatalException(exception)) { + this.exception = exception; + this.notifyAll(); + + maybeInvokeCallbacks(); + } + } + + /** + * Wrapper method for {@link #handleStanza(Stanza)}. Note that this method is synchronized. + */ + @Override + public synchronized final void processStanza(Stanza stanza) throws NotConnectedException, InterruptedException { + handleStanza(stanza); + } + } + + /** + * A simple version of InternalSmackFuture which implements {@link #isNonFatalException(Exception)} as always returning false method. + * + * @param + */ + public static abstract class SimpleInternalSmackFuture extends InternalSmackFuture { + @Override + protected boolean isNonFatalException(Exception exception) { + return false; + } + } +} diff --git a/smack-core/src/main/java/org/jivesoftware/smack/SuccessCallback.java b/smack-core/src/main/java/org/jivesoftware/smack/SuccessCallback.java new file mode 100644 index 000000000..49106a9f1 --- /dev/null +++ b/smack-core/src/main/java/org/jivesoftware/smack/SuccessCallback.java @@ -0,0 +1,23 @@ +/** + * + * Copyright 2017 Florian Schmaus + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.jivesoftware.smack; + +public interface SuccessCallback { + + public void onSuccess(T result); + +} diff --git a/smack-core/src/test/java/org/jivesoftware/smack/SmackFutureTest.java b/smack-core/src/test/java/org/jivesoftware/smack/SmackFutureTest.java new file mode 100644 index 000000000..52a592f4b --- /dev/null +++ b/smack-core/src/test/java/org/jivesoftware/smack/SmackFutureTest.java @@ -0,0 +1,57 @@ +/** + * + * Copyright 2017 Florian Schmaus + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.jivesoftware.smack; + +import static org.junit.Assert.assertTrue; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.jivesoftware.smack.SmackException.NotConnectedException; +import org.jivesoftware.smack.SmackFuture.InternalSmackFuture; +import org.jivesoftware.smack.SmackFuture.SimpleInternalSmackFuture; +import org.jivesoftware.smack.packet.Stanza; +import org.junit.Test; + +public class SmackFutureTest { + + @Test + public void simpleSmackFutureSuccessTest() throws NotConnectedException, InterruptedException, ExecutionException { + InternalSmackFuture future = new SimpleInternalSmackFuture() { + @Override + protected void handleStanza(Stanza stanza) throws NotConnectedException, InterruptedException { + setResult(true); + } + }; + + future.processStanza(null); + + assertTrue(future.get()); + } + + @Test(expected = TimeoutException.class) + public void simpleSmackFutureTimeoutTest() throws InterruptedException, ExecutionException, TimeoutException { + InternalSmackFuture future = new SimpleInternalSmackFuture() { + @Override + protected void handleStanza(Stanza stanza) throws NotConnectedException, InterruptedException { + } + }; + + future.get(5, TimeUnit.SECONDS); + } +} diff --git a/smack-extensions/src/main/java/org/jivesoftware/smackx/ping/PingManager.java b/smack-extensions/src/main/java/org/jivesoftware/smackx/ping/PingManager.java index 8ee8c4a09..fe329b9f7 100644 --- a/smack-extensions/src/main/java/org/jivesoftware/smackx/ping/PingManager.java +++ b/smack-extensions/src/main/java/org/jivesoftware/smackx/ping/PingManager.java @@ -31,17 +31,20 @@ import org.jivesoftware.smack.AbstractConnectionClosedListener; import org.jivesoftware.smack.SmackException; import org.jivesoftware.smack.SmackException.NoResponseException; import org.jivesoftware.smack.SmackException.NotConnectedException; +import org.jivesoftware.smack.SmackException.NotLoggedInException; +import org.jivesoftware.smack.SmackFuture; +import org.jivesoftware.smack.SmackFuture.InternalSmackFuture; import org.jivesoftware.smack.XMPPConnection; import org.jivesoftware.smack.ConnectionCreationListener; import org.jivesoftware.smack.Manager; import org.jivesoftware.smack.XMPPConnectionRegistry; -import org.jivesoftware.smack.XMPPException; import org.jivesoftware.smack.XMPPException.XMPPErrorException; import org.jivesoftware.smack.iqrequest.AbstractIqRequestHandler; import org.jivesoftware.smack.iqrequest.IQRequestHandler.Mode; import org.jivesoftware.smack.packet.IQ; import org.jivesoftware.smack.packet.IQ.Type; import org.jivesoftware.smack.packet.XMPPError; +import org.jivesoftware.smack.packet.Stanza; import org.jivesoftware.smack.util.SmackExecutorThreadFactory; import org.jivesoftware.smackx.disco.ServiceDiscoveryManager; import org.jivesoftware.smackx.ping.packet.Ping; @@ -173,6 +176,41 @@ public final class PingManager extends Manager { return type == XMPPError.Type.CANCEL && condition == XMPPError.Condition.feature_not_implemented; } + public SmackFuture pingAsync(Jid jid) { + return pingAsync(jid, connection().getReplyTimeout()); + } + + public SmackFuture pingAsync(final Jid jid, long pongTimeout) { + final InternalSmackFuture future = new InternalSmackFuture() { + @Override + public void handleStanza(Stanza packet) throws NotConnectedException, InterruptedException { + setResult(true); + } + @Override + public boolean isNonFatalException(Exception exception) { + if (exception instanceof XMPPErrorException) { + XMPPErrorException xmppErrorException = (XMPPErrorException) exception; + if (isValidErrorPong(jid, xmppErrorException)) { + setResult(true); + return true; + } + } + return false; + } + }; + + Ping ping = new Ping(jid); + try { + XMPPConnection connection = getAuthenticatedConnectionOrThrow(); + connection.sendIqWithResponseCallback(ping, future, future, pongTimeout); + } + catch (NotLoggedInException | NotConnectedException | InterruptedException e) { + future.processException(e); + } + + return future; + } + /** * Pings the given jid. This method will return false if an error occurs. The exception * to this, is a server ping, which will always return true if the server is reachable, diff --git a/smack-integration-test/src/main/java/org/jivesoftware/smackx/ping/PingIntegrationTest.java b/smack-integration-test/src/main/java/org/jivesoftware/smackx/ping/PingIntegrationTest.java index 5bcb22bc9..4347d7382 100644 --- a/smack-integration-test/src/main/java/org/jivesoftware/smackx/ping/PingIntegrationTest.java +++ b/smack-integration-test/src/main/java/org/jivesoftware/smackx/ping/PingIntegrationTest.java @@ -1,6 +1,6 @@ /** * - * Copyright 2015 Florian Schmaus + * Copyright 2015-2017 Florian Schmaus * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,10 +18,23 @@ package org.jivesoftware.smackx.ping; import static org.junit.Assert.assertTrue; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + import org.igniterealtime.smack.inttest.AbstractSmackIntegrationTest; import org.igniterealtime.smack.inttest.SmackIntegrationTest; import org.igniterealtime.smack.inttest.SmackIntegrationTestEnvironment; import org.jivesoftware.smack.SmackException.NotConnectedException; +import org.jivesoftware.smack.XMPPConnection; +import org.jxmpp.jid.Jid; public class PingIntegrationTest extends AbstractSmackIntegrationTest { @@ -35,4 +48,52 @@ public class PingIntegrationTest extends AbstractSmackIntegrationTest { assertTrue(pingManager.pingMyServer()); } + private static final class Pinger implements Runnable { + private final List toPing; + private final Collection> pongFutures; + + private final PingManager pingManager; + + private Pinger(XMPPConnection connection, Collection> pongFutures, Jid... toPing) { + this(connection, pongFutures, Arrays.asList(toPing)); + } + + private Pinger(XMPPConnection connection, Collection> pongFutures, List toPing) { + this.toPing = toPing; + this.pongFutures = pongFutures; + + this.pingManager = PingManager.getInstanceFor(connection); + } + + @Override + public void run() { + List> futures = new ArrayList<>(); + for (Jid jid : toPing) { + Future future = pingManager.pingAsync(jid); + futures.add(future); + } + pongFutures.addAll(futures); + } + } + + @SmackIntegrationTest + public void pingAsync() throws InterruptedException, ExecutionException { + List> pongFutures = Collections.synchronizedList(new ArrayList>()); + Runnable[] pinger = new Runnable[3]; + pinger[0] = new Pinger(conOne, pongFutures, conTwo.getUser(), conThree.getUser()); + pinger[1] = new Pinger(conTwo, pongFutures, conOne.getUser(), conThree.getUser()); + pinger[2] = new Pinger(conThree, pongFutures, conOne.getUser(), conTwo.getUser()); + + ExecutorService executorService = Executors.newFixedThreadPool(pinger.length); + for (Runnable runnable : pinger) { + executorService.execute(runnable); + } + + executorService.shutdown(); + executorService.awaitTermination(1, TimeUnit.MINUTES); + + for (Future pongFuture : pongFutures) { + assertTrue(pongFuture.get()); + } + } }