diff --git a/smack-core/src/main/java/org/jivesoftware/smack/SmackFuture.java b/smack-core/src/main/java/org/jivesoftware/smack/SmackFuture.java index 88a951677..9a6511d5f 100644 --- a/smack-core/src/main/java/org/jivesoftware/smack/SmackFuture.java +++ b/smack-core/src/main/java/org/jivesoftware/smack/SmackFuture.java @@ -16,11 +16,18 @@ */ package org.jivesoftware.smack; +import java.io.IOException; +import java.net.Socket; +import java.net.SocketAddress; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.logging.Level; +import java.util.logging.Logger; + +import javax.net.SocketFactory; import org.jivesoftware.smack.packet.Stanza; import org.jivesoftware.smack.util.CallbackRecipient; @@ -29,6 +36,8 @@ import org.jivesoftware.smack.util.SuccessCallback; public abstract class SmackFuture implements Future, CallbackRecipient { + private static final Logger LOGGER = Logger.getLogger(SmackFuture.class.getName()); + private boolean cancelled; protected V result; @@ -94,7 +103,7 @@ public abstract class SmackFuture implements Future, @Override public final synchronized V get() throws InterruptedException, ExecutionException { while (result == null && exception == null && !cancelled) { - wait(); + futureWait(); } return getOrThrowExecutionException(); @@ -102,7 +111,7 @@ public abstract class SmackFuture implements Future, public final synchronized V getOrThrow() throws E, InterruptedException { while (result == null && exception == null && !cancelled) { - wait(); + futureWait(); } if (exception != null) { @@ -124,7 +133,7 @@ public abstract class SmackFuture implements Future, while (result != null && exception != null) { final long waitTimeRemaining = deadline - System.currentTimeMillis(); if (waitTimeRemaining > 0) { - wait(waitTimeRemaining); + futureWait(waitTimeRemaining); } } @@ -162,6 +171,15 @@ public abstract class SmackFuture implements Future, } } + protected final void futureWait() throws InterruptedException { + futureWait(0); + } + + @SuppressWarnings("WaitNotInLoop") + protected void futureWait(long timeout) throws InterruptedException { + wait(timeout); + } + public static class InternalSmackFuture extends SmackFuture { public final synchronized void setResult(V result) { this.result = result; @@ -178,6 +196,64 @@ public abstract class SmackFuture implements Future, } } + public static class SocketFuture extends InternalSmackFuture { + private final Socket socket; + + private final Object wasInterruptedLock = new Object(); + + private boolean wasInterrupted; + + public SocketFuture(SocketFactory socketFactory) throws IOException { + socket = socketFactory.createSocket(); + } + + @Override + protected void futureWait(long timeout) throws InterruptedException { + try { + super.futureWait(timeout); + } catch (InterruptedException interruptedException) { + synchronized (wasInterruptedLock) { + wasInterrupted = true; + if (!socket.isClosed()) { + closeSocket(); + } + } + throw interruptedException; + } + } + + public void connectAsync(final SocketAddress socketAddress, final int timeout) { + AbstractXMPPConnection.asyncGo(new Runnable() { + @Override + public void run() { + try { + socket.connect(socketAddress, timeout); + } + catch (IOException e) { + setException(e); + return; + } + synchronized (wasInterruptedLock) { + if (wasInterrupted) { + closeSocket(); + return; + } + } + setResult(socket); + } + }); + } + + private void closeSocket() { + try { + socket.close(); + } + catch (IOException ioException) { + LOGGER.log(Level.WARNING, "Could not close socket", ioException); + } + } + } + public abstract static class InternalProcessStanzaSmackFuture extends InternalSmackFuture implements StanzaListener, ExceptionCallback { diff --git a/smack-experimental/src/main/java/org/jivesoftware/smackx/mam/MamManager.java b/smack-experimental/src/main/java/org/jivesoftware/smackx/mam/MamManager.java index 4cf570eee..76c1ee442 100644 --- a/smack-experimental/src/main/java/org/jivesoftware/smackx/mam/MamManager.java +++ b/smack-experimental/src/main/java/org/jivesoftware/smackx/mam/MamManager.java @@ -114,8 +114,8 @@ import org.jxmpp.jid.Jid; *
  * {@code
  * MamQueryArgs mamQueryArgs = MamQueryArgs.builder()
- *                                 .withJid(jid)
- *                                 .setResultPageSize(10)
+ *                                 .limitResultsToJid(jid)
+ *                                 .setResultPageSizeTo(10)
  *                                 .queryLastPage()
  *                                 .build();
  * MamQuery mamQuery = mamManager.queryArchive(mamQueryArgs);
diff --git a/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java b/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java
index d34896fc2..a58172c41 100644
--- a/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java
+++ b/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java
@@ -83,6 +83,7 @@ import org.jivesoftware.smack.SmackException.NoResponseException;
 import org.jivesoftware.smack.SmackException.NotConnectedException;
 import org.jivesoftware.smack.SmackException.NotLoggedInException;
 import org.jivesoftware.smack.SmackException.SecurityRequiredByServerException;
+import org.jivesoftware.smack.SmackFuture;
 import org.jivesoftware.smack.StanzaListener;
 import org.jivesoftware.smack.SynchronizationPoint;
 import org.jivesoftware.smack.XMPPConnection;
@@ -388,6 +389,11 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
         SSLSession sslSession = secureSocket != null ? secureSocket.getSession() : null;
         saslAuthentication.authenticate(username, password, config.getAuthzid(), sslSession);
 
+        // Wait for stream features after the authentication.
+        // TODO: The name of this synchronization point "maybeCompressFeaturesReceived" is not perfect. It should be
+        // renamed to "streamFeaturesAfterAuthenticationReceived".
+        maybeCompressFeaturesReceived.checkIfSuccessOrWait();
+
         // If compression is enabled then request the server to use stream compression. XEP-170
         // recommends to perform stream compression before resource binding.
         maybeEnableCompression();
@@ -552,7 +558,7 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
         }
     }
 
-    private void connectUsingConfiguration() throws ConnectionException, IOException {
+    private void connectUsingConfiguration() throws ConnectionException, IOException, InterruptedException {
         List failedAddresses = populateHostAddresses();
         SocketFactory socketFactory = config.getSocketFactory();
         ProxyInfo proxyInfo = config.getProxyInfo();
@@ -571,14 +577,16 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
                 innerloop: while (inetAddresses.hasNext()) {
                     // Create a *new* Socket before every connection attempt, i.e. connect() call, since Sockets are not
                     // re-usable after a failed connection attempt. See also SMACK-724.
-                    socket = socketFactory.createSocket();
+                    SmackFuture.SocketFuture socketFuture = new SmackFuture.SocketFuture(socketFactory);
 
                     final InetAddress inetAddress = inetAddresses.next();
-                    final String inetAddressAndPort = inetAddress + " at port " + port;
-                    LOGGER.finer("Trying to establish TCP connection to " + inetAddressAndPort);
+                    final InetSocketAddress inetSocketAddress = new InetSocketAddress(inetAddress, port);
+                    LOGGER.finer("Trying to establish TCP connection to " + inetSocketAddress);
+                    socketFuture.connectAsync(inetSocketAddress, timeout);
+
                     try {
-                        socket.connect(new InetSocketAddress(inetAddress, port), timeout);
-                    } catch (Exception e) {
+                        socket = socketFuture.getOrThrow();
+                    } catch (IOException e) {
                         hostAddress.setException(inetAddress, e);
                         if (inetAddresses.hasNext()) {
                             continue innerloop;
@@ -586,7 +594,7 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
                             break innerloop;
                         }
                     }
-                    LOGGER.finer("Established TCP connection to " + inetAddressAndPort);
+                    LOGGER.finer("Established TCP connection to " + inetSocketAddress);
                     // We found a host to connect to, return here
                     this.host = host;
                     this.port = port;
@@ -856,7 +864,7 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
         if (!config.isCompressionEnabled()) {
             return;
         }
-        maybeCompressFeaturesReceived.checkIfSuccessOrWait();
+
         Compress.Feature compression = getFeature(Compress.Feature.ELEMENT, Compress.NAMESPACE);
         if (compression == null) {
             // Server does not support compression