1
0
Fork 0
mirror of https://github.com/vanitasvitae/Smack.git synced 2024-11-25 13:32:07 +01:00

Merge branch '4.3'

This commit is contained in:
Florian Schmaus 2018-12-21 13:21:50 +01:00
commit 7a3f6f29cc
3 changed files with 97 additions and 13 deletions

View file

@ -16,11 +16,18 @@
*/ */
package org.jivesoftware.smack; package org.jivesoftware.smack;
import java.io.IOException;
import java.net.Socket;
import java.net.SocketAddress;
import java.util.concurrent.CancellationException; import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.net.SocketFactory;
import org.jivesoftware.smack.packet.Stanza; import org.jivesoftware.smack.packet.Stanza;
import org.jivesoftware.smack.util.CallbackRecipient; import org.jivesoftware.smack.util.CallbackRecipient;
@ -29,6 +36,8 @@ import org.jivesoftware.smack.util.SuccessCallback;
public abstract class SmackFuture<V, E extends Exception> implements Future<V>, CallbackRecipient<V, E> { public abstract class SmackFuture<V, E extends Exception> implements Future<V>, CallbackRecipient<V, E> {
private static final Logger LOGGER = Logger.getLogger(SmackFuture.class.getName());
private boolean cancelled; private boolean cancelled;
protected V result; protected V result;
@ -94,7 +103,7 @@ public abstract class SmackFuture<V, E extends Exception> implements Future<V>,
@Override @Override
public final synchronized V get() throws InterruptedException, ExecutionException { public final synchronized V get() throws InterruptedException, ExecutionException {
while (result == null && exception == null && !cancelled) { while (result == null && exception == null && !cancelled) {
wait(); futureWait();
} }
return getOrThrowExecutionException(); return getOrThrowExecutionException();
@ -102,7 +111,7 @@ public abstract class SmackFuture<V, E extends Exception> implements Future<V>,
public final synchronized V getOrThrow() throws E, InterruptedException { public final synchronized V getOrThrow() throws E, InterruptedException {
while (result == null && exception == null && !cancelled) { while (result == null && exception == null && !cancelled) {
wait(); futureWait();
} }
if (exception != null) { if (exception != null) {
@ -124,7 +133,7 @@ public abstract class SmackFuture<V, E extends Exception> implements Future<V>,
while (result != null && exception != null) { while (result != null && exception != null) {
final long waitTimeRemaining = deadline - System.currentTimeMillis(); final long waitTimeRemaining = deadline - System.currentTimeMillis();
if (waitTimeRemaining > 0) { if (waitTimeRemaining > 0) {
wait(waitTimeRemaining); futureWait(waitTimeRemaining);
} }
} }
@ -162,6 +171,15 @@ public abstract class SmackFuture<V, E extends Exception> implements Future<V>,
} }
} }
protected final void futureWait() throws InterruptedException {
futureWait(0);
}
@SuppressWarnings("WaitNotInLoop")
protected void futureWait(long timeout) throws InterruptedException {
wait(timeout);
}
public static class InternalSmackFuture<V, E extends Exception> extends SmackFuture<V, E> { public static class InternalSmackFuture<V, E extends Exception> extends SmackFuture<V, E> {
public final synchronized void setResult(V result) { public final synchronized void setResult(V result) {
this.result = result; this.result = result;
@ -178,6 +196,64 @@ public abstract class SmackFuture<V, E extends Exception> implements Future<V>,
} }
} }
public static class SocketFuture extends InternalSmackFuture<Socket, IOException> {
private final Socket socket;
private final Object wasInterruptedLock = new Object();
private boolean wasInterrupted;
public SocketFuture(SocketFactory socketFactory) throws IOException {
socket = socketFactory.createSocket();
}
@Override
protected void futureWait(long timeout) throws InterruptedException {
try {
super.futureWait(timeout);
} catch (InterruptedException interruptedException) {
synchronized (wasInterruptedLock) {
wasInterrupted = true;
if (!socket.isClosed()) {
closeSocket();
}
}
throw interruptedException;
}
}
public void connectAsync(final SocketAddress socketAddress, final int timeout) {
AbstractXMPPConnection.asyncGo(new Runnable() {
@Override
public void run() {
try {
socket.connect(socketAddress, timeout);
}
catch (IOException e) {
setException(e);
return;
}
synchronized (wasInterruptedLock) {
if (wasInterrupted) {
closeSocket();
return;
}
}
setResult(socket);
}
});
}
private void closeSocket() {
try {
socket.close();
}
catch (IOException ioException) {
LOGGER.log(Level.WARNING, "Could not close socket", ioException);
}
}
}
public abstract static class InternalProcessStanzaSmackFuture<V, E extends Exception> extends InternalSmackFuture<V, E> public abstract static class InternalProcessStanzaSmackFuture<V, E extends Exception> extends InternalSmackFuture<V, E>
implements StanzaListener, ExceptionCallback<E> { implements StanzaListener, ExceptionCallback<E> {

View file

@ -114,8 +114,8 @@ import org.jxmpp.jid.Jid;
* <pre> * <pre>
* {@code * {@code
* MamQueryArgs mamQueryArgs = MamQueryArgs.builder() * MamQueryArgs mamQueryArgs = MamQueryArgs.builder()
* .withJid(jid) * .limitResultsToJid(jid)
* .setResultPageSize(10) * .setResultPageSizeTo(10)
* .queryLastPage() * .queryLastPage()
* .build(); * .build();
* MamQuery mamQuery = mamManager.queryArchive(mamQueryArgs); * MamQuery mamQuery = mamManager.queryArchive(mamQueryArgs);

View file

@ -83,6 +83,7 @@ import org.jivesoftware.smack.SmackException.NoResponseException;
import org.jivesoftware.smack.SmackException.NotConnectedException; import org.jivesoftware.smack.SmackException.NotConnectedException;
import org.jivesoftware.smack.SmackException.NotLoggedInException; import org.jivesoftware.smack.SmackException.NotLoggedInException;
import org.jivesoftware.smack.SmackException.SecurityRequiredByServerException; import org.jivesoftware.smack.SmackException.SecurityRequiredByServerException;
import org.jivesoftware.smack.SmackFuture;
import org.jivesoftware.smack.StanzaListener; import org.jivesoftware.smack.StanzaListener;
import org.jivesoftware.smack.SynchronizationPoint; import org.jivesoftware.smack.SynchronizationPoint;
import org.jivesoftware.smack.XMPPConnection; import org.jivesoftware.smack.XMPPConnection;
@ -388,6 +389,11 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
SSLSession sslSession = secureSocket != null ? secureSocket.getSession() : null; SSLSession sslSession = secureSocket != null ? secureSocket.getSession() : null;
saslAuthentication.authenticate(username, password, config.getAuthzid(), sslSession); saslAuthentication.authenticate(username, password, config.getAuthzid(), sslSession);
// Wait for stream features after the authentication.
// TODO: The name of this synchronization point "maybeCompressFeaturesReceived" is not perfect. It should be
// renamed to "streamFeaturesAfterAuthenticationReceived".
maybeCompressFeaturesReceived.checkIfSuccessOrWait();
// If compression is enabled then request the server to use stream compression. XEP-170 // If compression is enabled then request the server to use stream compression. XEP-170
// recommends to perform stream compression before resource binding. // recommends to perform stream compression before resource binding.
maybeEnableCompression(); maybeEnableCompression();
@ -552,7 +558,7 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
} }
} }
private void connectUsingConfiguration() throws ConnectionException, IOException { private void connectUsingConfiguration() throws ConnectionException, IOException, InterruptedException {
List<HostAddress> failedAddresses = populateHostAddresses(); List<HostAddress> failedAddresses = populateHostAddresses();
SocketFactory socketFactory = config.getSocketFactory(); SocketFactory socketFactory = config.getSocketFactory();
ProxyInfo proxyInfo = config.getProxyInfo(); ProxyInfo proxyInfo = config.getProxyInfo();
@ -571,14 +577,16 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
innerloop: while (inetAddresses.hasNext()) { innerloop: while (inetAddresses.hasNext()) {
// Create a *new* Socket before every connection attempt, i.e. connect() call, since Sockets are not // Create a *new* Socket before every connection attempt, i.e. connect() call, since Sockets are not
// re-usable after a failed connection attempt. See also SMACK-724. // re-usable after a failed connection attempt. See also SMACK-724.
socket = socketFactory.createSocket(); SmackFuture.SocketFuture socketFuture = new SmackFuture.SocketFuture(socketFactory);
final InetAddress inetAddress = inetAddresses.next(); final InetAddress inetAddress = inetAddresses.next();
final String inetAddressAndPort = inetAddress + " at port " + port; final InetSocketAddress inetSocketAddress = new InetSocketAddress(inetAddress, port);
LOGGER.finer("Trying to establish TCP connection to " + inetAddressAndPort); LOGGER.finer("Trying to establish TCP connection to " + inetSocketAddress);
socketFuture.connectAsync(inetSocketAddress, timeout);
try { try {
socket.connect(new InetSocketAddress(inetAddress, port), timeout); socket = socketFuture.getOrThrow();
} catch (Exception e) { } catch (IOException e) {
hostAddress.setException(inetAddress, e); hostAddress.setException(inetAddress, e);
if (inetAddresses.hasNext()) { if (inetAddresses.hasNext()) {
continue innerloop; continue innerloop;
@ -586,7 +594,7 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
break innerloop; break innerloop;
} }
} }
LOGGER.finer("Established TCP connection to " + inetAddressAndPort); LOGGER.finer("Established TCP connection to " + inetSocketAddress);
// We found a host to connect to, return here // We found a host to connect to, return here
this.host = host; this.host = host;
this.port = port; this.port = port;
@ -856,7 +864,7 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
if (!config.isCompressionEnabled()) { if (!config.isCompressionEnabled()) {
return; return;
} }
maybeCompressFeaturesReceived.checkIfSuccessOrWait();
Compress.Feature compression = getFeature(Compress.Feature.ELEMENT, Compress.NAMESPACE); Compress.Feature compression = getFeature(Compress.Feature.ELEMENT, Compress.NAMESPACE);
if (compression == null) { if (compression == null) {
// Server does not support compression // Server does not support compression