1
0
Fork 0
mirror of https://codeberg.org/Mercury-IM/Smack synced 2024-11-22 06:12:05 +01:00

Revert "Typo in XMPPTCPConnection s/ResumptiodDefault/ResumptionDefault/"

This reverts commit 7ceb5f09df.

Somehow 7ceb5f09 introduced from the master branch into the 4.1 branch.
This commit is contained in:
Florian Schmaus 2015-10-21 10:59:21 +02:00
parent f5115f4666
commit 640849dac5

View file

@ -20,6 +20,7 @@ import org.jivesoftware.smack.AbstractConnectionListener;
import org.jivesoftware.smack.AbstractXMPPConnection; import org.jivesoftware.smack.AbstractXMPPConnection;
import org.jivesoftware.smack.ConnectionConfiguration; import org.jivesoftware.smack.ConnectionConfiguration;
import org.jivesoftware.smack.ConnectionConfiguration.SecurityMode; import org.jivesoftware.smack.ConnectionConfiguration.SecurityMode;
import org.jivesoftware.smack.ConnectionCreationListener;
import org.jivesoftware.smack.StanzaListener; import org.jivesoftware.smack.StanzaListener;
import org.jivesoftware.smack.SmackConfiguration; import org.jivesoftware.smack.SmackConfiguration;
import org.jivesoftware.smack.SmackException; import org.jivesoftware.smack.SmackException;
@ -67,19 +68,14 @@ import org.jivesoftware.smack.sm.packet.StreamManagement.Resumed;
import org.jivesoftware.smack.sm.packet.StreamManagement.StreamManagementFeature; import org.jivesoftware.smack.sm.packet.StreamManagement.StreamManagementFeature;
import org.jivesoftware.smack.sm.predicates.Predicate; import org.jivesoftware.smack.sm.predicates.Predicate;
import org.jivesoftware.smack.sm.provider.ParseStreamManagement; import org.jivesoftware.smack.sm.provider.ParseStreamManagement;
import org.jivesoftware.smack.packet.Nonza; import org.jivesoftware.smack.packet.PlainStreamElement;
import org.jivesoftware.smack.packet.XMPPError; import org.jivesoftware.smack.packet.XMPPError;
import org.jivesoftware.smack.proxy.ProxyInfo;
import org.jivesoftware.smack.util.ArrayBlockingQueueWithShutdown; import org.jivesoftware.smack.util.ArrayBlockingQueueWithShutdown;
import org.jivesoftware.smack.util.Async; import org.jivesoftware.smack.util.Async;
import org.jivesoftware.smack.util.PacketParserUtils; import org.jivesoftware.smack.util.PacketParserUtils;
import org.jivesoftware.smack.util.StringUtils; import org.jivesoftware.smack.util.StringUtils;
import org.jivesoftware.smack.util.TLSUtils; import org.jivesoftware.smack.util.TLSUtils;
import org.jivesoftware.smack.util.XmlStringBuilder;
import org.jivesoftware.smack.util.dns.HostAddress; import org.jivesoftware.smack.util.dns.HostAddress;
import org.jxmpp.jid.impl.JidCreate;
import org.jxmpp.jid.parts.Resourcepart;
import org.jxmpp.stringprep.XmppStringprepException;
import org.jxmpp.util.XmppStringUtils; import org.jxmpp.util.XmppStringUtils;
import org.xmlpull.v1.XmlPullParser; import org.xmlpull.v1.XmlPullParser;
import org.xmlpull.v1.XmlPullParserException; import org.xmlpull.v1.XmlPullParserException;
@ -156,6 +152,14 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
*/ */
private boolean disconnectedButResumeable = false; private boolean disconnectedButResumeable = false;
/**
* Flag to indicate if the socket was closed intentionally by Smack.
* <p>
* This boolean flag is used concurrently, therefore it is marked volatile.
* </p>
*/
private volatile boolean socketClosed = false;
private boolean usingTLS = false; private boolean usingTLS = false;
/** /**
@ -168,27 +172,19 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
*/ */
protected PacketReader packetReader; protected PacketReader packetReader;
private final SynchronizationPoint<Exception> initalOpenStreamSend = new SynchronizationPoint<>( private final SynchronizationPoint<Exception> initalOpenStreamSend = new SynchronizationPoint<Exception>(this);
this, "initial open stream element send to server");
/** /**
* *
*/ */
private final SynchronizationPoint<XMPPException> maybeCompressFeaturesReceived = new SynchronizationPoint<XMPPException>( private final SynchronizationPoint<XMPPException> maybeCompressFeaturesReceived = new SynchronizationPoint<XMPPException>(
this, "stream compression feature"); this);
/** /**
* *
*/ */
private final SynchronizationPoint<XMPPException> compressSyncPoint = new SynchronizationPoint<XMPPException>( private final SynchronizationPoint<XMPPException> compressSyncPoint = new SynchronizationPoint<XMPPException>(
this, "stream compression"); this);
/**
* A synchronization point which is successful if this connection has received the closing
* stream element from the remote end-point, i.e. the server.
*/
private final SynchronizationPoint<Exception> closingStreamReceived = new SynchronizationPoint<>(
this, "stream closing element received");
/** /**
* The default bundle and defer callback, used for new connections. * The default bundle and defer callback, used for new connections.
@ -217,10 +213,10 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
private String smSessionId; private String smSessionId;
private final SynchronizationPoint<XMPPException> smResumedSyncPoint = new SynchronizationPoint<XMPPException>( private final SynchronizationPoint<XMPPException> smResumedSyncPoint = new SynchronizationPoint<XMPPException>(
this, "stream resumed element"); this);
private final SynchronizationPoint<XMPPException> smEnabledSyncPoint = new SynchronizationPoint<XMPPException>( private final SynchronizationPoint<XMPPException> smEnabledSyncPoint = new SynchronizationPoint<XMPPException>(
this, "stream enabled element"); this);
/** /**
* The client's preferred maximum resumption time in seconds. * The client's preferred maximum resumption time in seconds.
@ -320,9 +316,8 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
* *
* @param jid the bare JID used by the client. * @param jid the bare JID used by the client.
* @param password the password or authentication token. * @param password the password or authentication token.
* @throws XmppStringprepException
*/ */
public XMPPTCPConnection(CharSequence jid, String password) throws XmppStringprepException { public XMPPTCPConnection(CharSequence jid, String password) {
this(XmppStringUtils.parseLocalpart(jid.toString()), password, XmppStringUtils.parseDomain(jid.toString())); this(XmppStringUtils.parseLocalpart(jid.toString()), password, XmppStringUtils.parseDomain(jid.toString()));
} }
@ -336,11 +331,10 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
* @param username * @param username
* @param password * @param password
* @param serviceName * @param serviceName
* @throws XmppStringprepException
*/ */
public XMPPTCPConnection(CharSequence username, String password, String serviceName) throws XmppStringprepException { public XMPPTCPConnection(CharSequence username, String password, String serviceName) {
this(XMPPTCPConnectionConfiguration.builder().setUsernameAndPassword(username, password).setXmppDomain( this(XMPPTCPConnectionConfiguration.builder().setUsernameAndPassword(username, password).setServiceName(
JidCreate.domainBareFrom(serviceName)).build()); serviceName).build());
} }
@Override @Override
@ -366,21 +360,31 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
} }
@Override @Override
protected void afterSuccessfulLogin(final boolean resumed) throws NotConnectedException, InterruptedException { protected void afterSuccessfulLogin(final boolean resumed) throws NotConnectedException {
// Reset the flag in case it was set // Reset the flag in case it was set
disconnectedButResumeable = false; disconnectedButResumeable = false;
super.afterSuccessfulLogin(resumed); super.afterSuccessfulLogin(resumed);
} }
@Override @Override
protected synchronized void loginInternal(String username, String password, Resourcepart resource) throws XMPPException, protected synchronized void loginNonAnonymously(String username, String password, String resource) throws XMPPException, SmackException, IOException {
SmackException, IOException, InterruptedException { if (saslAuthentication.hasNonAnonymousAuthentication()) {
// Authenticate using SASL // Authenticate using SASL
saslAuthentication.authenticate(username, password, config.getAuthzid()); if (password != null) {
saslAuthentication.authenticate(username, password, resource);
}
else {
saslAuthentication.authenticate(resource, config.getCallbackHandler());
}
} else {
throw new SmackException("No non-anonymous SASL authentication mechanism available");
}
// If compression is enabled then request the server to use stream compression. XEP-170 // If compression is enabled then request the server to use stream compression. XEP-170
// recommends to perform stream compression before resource binding. // recommends to perform stream compression before resource binding.
maybeEnableCompression(); if (config.isCompressionEnabled()) {
useCompression();
}
if (isSmResumptionPossible()) { if (isSmResumptionPossible()) {
smResumedSyncPoint.sendAndWaitForResponse(new Resume(clientHandledStanzasCount, smSessionId)); smResumedSyncPoint.sendAndWaitForResponse(new Resume(clientHandledStanzasCount, smSessionId));
@ -432,11 +436,37 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
afterSuccessfulLogin(false); afterSuccessfulLogin(false);
} }
@Override
public synchronized void loginAnonymously() throws XMPPException, SmackException, IOException {
// Wait with SASL auth until the SASL mechanisms have been received
saslFeatureReceived.checkIfSuccessOrWaitOrThrow();
if (saslAuthentication.hasAnonymousAuthentication()) {
saslAuthentication.authenticateAnonymously();
}
else {
throw new SmackException("No anonymous SASL authentication mechanism available");
}
// If compression is enabled then request the server to use stream compression
if (config.isCompressionEnabled()) {
useCompression();
}
bindResourceAndEstablishSession(null);
afterSuccessfulLogin(false);
}
@Override @Override
public boolean isSecureConnection() { public boolean isSecureConnection() {
return usingTLS; return usingTLS;
} }
public boolean isSocketClosed() {
return socketClosed;
}
/** /**
* Shuts the current connection down. After this method returns, the connection must be ready * Shuts the current connection down. After this method returns, the connection must be ready
* for re-use by connect. * for re-use by connect.
@ -448,7 +478,7 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
// Try to send a last SM Acknowledgement. Most servers won't find this information helpful, as the SM // Try to send a last SM Acknowledgement. Most servers won't find this information helpful, as the SM
// state is dropped after a clean disconnect anyways. OTOH it doesn't hurt much either. // state is dropped after a clean disconnect anyways. OTOH it doesn't hurt much either.
sendSmAcknowledgementInternal(); sendSmAcknowledgementInternal();
} catch (InterruptedException | NotConnectedException e) { } catch (NotConnectedException e) {
LOGGER.log(Level.FINE, "Can not send final SM ack as connection is not connected", e); LOGGER.log(Level.FINE, "Can not send final SM ack as connection is not connected", e);
} }
} }
@ -466,27 +496,18 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
if (disconnectedButResumeable) { if (disconnectedButResumeable) {
return; return;
} }
// First shutdown the writer, this will result in a closing stream element getting send to
// the server
if (packetWriter != null) {
packetWriter.shutdown(instant);
}
try {
// After we send the closing stream element, check if there was already a
// closing stream element sent by the server or wait with a timeout for a
// closing stream element to be received from the server.
Exception res = closingStreamReceived.checkIfSuccessOrWait();
LOGGER.info("closingstream " + res);
} catch (InterruptedException | NoResponseException e) {
LOGGER.log(Level.INFO, "Exception while waiting for closing stream element from the server " + this, e);
}
if (packetReader != null) { if (packetReader != null) {
packetReader.shutdown(); packetReader.shutdown();
} }
if (packetWriter != null) {
packetWriter.shutdown(instant);
}
// Set socketClosed to true. This will cause the PacketReader
// and PacketWriter to ignore any Exceptions that are thrown
// because of a read/write from/to a closed stream.
// It is *important* that this is done before socket.close()!
socketClosed = true;
try { try {
socket.close(); socket.close();
} catch (Exception e) { } catch (Exception e) {
@ -519,12 +540,12 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
} }
@Override @Override
public void sendNonza(Nonza element) throws NotConnectedException, InterruptedException { public void send(PlainStreamElement element) throws NotConnectedException {
packetWriter.sendStreamElement(element); packetWriter.sendStreamElement(element);
} }
@Override @Override
protected void sendStanzaInternal(Stanza packet) throws NotConnectedException, InterruptedException { protected void sendStanzaInternal(Stanza packet) throws NotConnectedException {
packetWriter.sendStreamElement(packet); packetWriter.sendStreamElement(packet);
if (isSmEnabled()) { if (isSmEnabled()) {
for (StanzaFilter requestAckPredicate : requestAckPredicates) { for (StanzaFilter requestAckPredicate : requestAckPredicates) {
@ -539,8 +560,6 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
private void connectUsingConfiguration() throws IOException, ConnectionException { private void connectUsingConfiguration() throws IOException, ConnectionException {
List<HostAddress> failedAddresses = populateHostAddresses(); List<HostAddress> failedAddresses = populateHostAddresses();
SocketFactory socketFactory = config.getSocketFactory(); SocketFactory socketFactory = config.getSocketFactory();
ProxyInfo proxyInfo = config.getProxyInfo();
int timeout = config.getConnectTimeout();
if (socketFactory == null) { if (socketFactory == null) {
socketFactory = SocketFactory.getDefault(); socketFactory = SocketFactory.getDefault();
} }
@ -560,12 +579,7 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
final String inetAddressAndPort = inetAddress + " at port " + port; final String inetAddressAndPort = inetAddress + " at port " + port;
LOGGER.finer("Trying to establish TCP connection to " + inetAddressAndPort); LOGGER.finer("Trying to establish TCP connection to " + inetAddressAndPort);
try { try {
if (proxyInfo == null) { socket.connect(new InetSocketAddress(inetAddress, port), config.getConnectTimeout());
socket.connect(new InetSocketAddress(inetAddress, port), timeout);
}
else {
proxyInfo.getProxySocketConnection().connect(socket, inetAddress, port, timeout);
}
} catch (Exception e) { } catch (Exception e) {
if (inetAddresses.hasNext()) { if (inetAddresses.hasNext()) {
continue innerloop; continue innerloop;
@ -624,6 +638,13 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
// Start the packet reader. The startup() method will block until we // Start the packet reader. The startup() method will block until we
// get an opening stream packet back from server // get an opening stream packet back from server
packetReader.init(); packetReader.init();
if (isFirstInitialization) {
// Notify listeners that a new connection has been established
for (ConnectionCreationListener listener : getConnectionCreationListeners()) {
listener.connectionCreated(this);
}
}
} }
private void initReaderAndWriter() throws IOException { private void initReaderAndWriter() throws IOException {
@ -661,8 +682,14 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
KeyManager[] kms = null; KeyManager[] kms = null;
PasswordCallback pcb = null; PasswordCallback pcb = null;
if (context == null) { if(config.getCallbackHandler() == null) {
if(config.getKeystoreType().equals("PKCS11")) { ks = null;
} else if (context == null) {
if(config.getKeystoreType().equals("NONE")) {
ks = null;
pcb = null;
}
else if(config.getKeystoreType().equals("PKCS11")) {
try { try {
Constructor<?> c = Class.forName("sun.security.pkcs11.SunPKCS11").getConstructor(InputStream.class); Constructor<?> c = Class.forName("sun.security.pkcs11.SunPKCS11").getConstructor(InputStream.class);
String pkcs11Config = "name = SmartCard\nlibrary = "+config.getPKCS11Library(); String pkcs11Config = "name = SmartCard\nlibrary = "+config.getPKCS11Library();
@ -732,8 +759,8 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
final HostnameVerifier verifier = getConfiguration().getHostnameVerifier(); final HostnameVerifier verifier = getConfiguration().getHostnameVerifier();
if (verifier == null) { if (verifier == null) {
throw new IllegalStateException("No HostnameVerifier set. Use connectionConfiguration.setHostnameVerifier() to configure."); throw new IllegalStateException("No HostnameVerifier set. Use connectionConfiguration.setHostnameVerifier() to configure.");
} else if (!verifier.verify(getXMPPServiceDomain().toString(), sslSocket.getSession())) { } else if (!verifier.verify(getServiceName(), sslSocket.getSession())) {
throw new CertificateException("Hostname verification of certificate failed. Certificate does not authenticate " + getXMPPServiceDomain()); throw new CertificateException("Hostname verification of certificate failed. Certificate does not authenticate " + getServiceName());
} }
// Set that TLS was successful // Set that TLS was successful
@ -746,7 +773,12 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
* @return a instance of XMPPInputOutputStream or null if no suitable instance was found * @return a instance of XMPPInputOutputStream or null if no suitable instance was found
* *
*/ */
private static XMPPInputOutputStream maybeGetCompressionHandler(Compress.Feature compression) { private XMPPInputOutputStream maybeGetCompressionHandler() {
Compress.Feature compression = getFeature(Compress.Feature.ELEMENT, Compress.NAMESPACE);
if (compression == null) {
// Server does not support compression
return null;
}
for (XMPPInputOutputStream handler : SmackConfiguration.getCompresionHandlers()) { for (XMPPInputOutputStream handler : SmackConfiguration.getCompresionHandlers()) {
String method = handler.getCompressionMethod(); String method = handler.getCompressionMethod();
if (compression.getMethods().contains(method)) if (compression.getMethods().contains(method))
@ -776,21 +808,12 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
* @throws NotConnectedException * @throws NotConnectedException
* @throws XMPPException * @throws XMPPException
* @throws NoResponseException * @throws NoResponseException
* @throws InterruptedException
*/ */
private void maybeEnableCompression() throws NotConnectedException, NoResponseException, XMPPException, InterruptedException { private void useCompression() throws NotConnectedException, NoResponseException, XMPPException {
if (!config.isCompressionEnabled()) {
return;
}
maybeCompressFeaturesReceived.checkIfSuccessOrWait(); maybeCompressFeaturesReceived.checkIfSuccessOrWait();
Compress.Feature compression = getFeature(Compress.Feature.ELEMENT, Compress.NAMESPACE);
if (compression == null) {
// Server does not support compression
return;
}
// If stream compression was offered by the server and we want to use // If stream compression was offered by the server and we want to use
// compression then send compression request to the server // compression then send compression request to the server
if ((compressionHandler = maybeGetCompressionHandler(compression)) != null) { if ((compressionHandler = maybeGetCompressionHandler()) != null) {
compressSyncPoint.sendAndWaitForResponseOrThrow(new Compress(compressionHandler.getCompressionMethod())); compressSyncPoint.sendAndWaitForResponseOrThrow(new Compress(compressionHandler.getCompressionMethod()));
} else { } else {
LOGGER.warning("Could not enable compression because no matching handler/method pair was found"); LOGGER.warning("Could not enable compression because no matching handler/method pair was found");
@ -798,26 +821,25 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
} }
/** /**
* Establishes a connection to the XMPP server. It basically * Establishes a connection to the XMPP server and performs an automatic login
* creates and maintains a socket connection to the server. * only if the previous connection state was logged (authenticated). It basically
* <p> * creates and maintains a socket connection to the server.<p>
* <p/>
* Listeners will be preserved from a previous connection if the reconnection * Listeners will be preserved from a previous connection if the reconnection
* occurs after an abrupt termination. * occurs after an abrupt termination.
* </p>
* *
* @throws XMPPException if an error occurs while trying to establish the connection. * @throws XMPPException if an error occurs while trying to establish the connection.
* @throws SmackException * @throws SmackException
* @throws IOException * @throws IOException
* @throws InterruptedException
*/ */
@Override @Override
protected void connectInternal() throws SmackException, IOException, XMPPException, InterruptedException { protected void connectInternal() throws SmackException, IOException, XMPPException {
closingStreamReceived.init();
// Establishes the TCP connection to the server and does setup the reader and writer. Throws an exception if // Establishes the TCP connection to the server and does setup the reader and writer. Throws an exception if
// there is an error establishing the connection // there is an error establishing the connection
connectUsingConfiguration(); connectUsingConfiguration();
// We connected successfully to the servers TCP port // We connected successfully to the servers TCP port
socketClosed = false;
initConnection(); initConnection();
// Wait with SASL auth until the SASL mechanisms have been received // Wait with SASL auth until the SASL mechanisms have been received
@ -826,6 +848,13 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
// Make note of the fact that we're now connected. // Make note of the fact that we're now connected.
connected = true; connected = true;
callConnectionConnectedListener(); callConnectionConnectedListener();
// Automatically makes the login if the user was previously connected successfully
// to the server and the connection was terminated abruptly
if (wasAuthenticated) {
login();
notifyReconnection();
}
} }
/** /**
@ -856,7 +885,7 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
} }
@Override @Override
protected void afterFeaturesReceived() throws SecurityRequiredException, NotConnectedException, InterruptedException { protected void afterFeaturesReceived() throws SecurityRequiredException, NotConnectedException {
StartTls startTlsFeature = getFeature(StartTls.ELEMENT, StartTls.NAMESPACE); StartTls startTlsFeature = getFeature(StartTls.ELEMENT, StartTls.NAMESPACE);
if (startTlsFeature != null) { if (startTlsFeature != null) {
if (startTlsFeature.required() && config.getSecurityMode() == SecurityMode.disabled) { if (startTlsFeature.required() && config.getSecurityMode() == SecurityMode.disabled) {
@ -865,7 +894,7 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
} }
if (config.getSecurityMode() != ConnectionConfiguration.SecurityMode.disabled) { if (config.getSecurityMode() != ConnectionConfiguration.SecurityMode.disabled) {
sendNonza(new StartTls()); send(new StartTls());
} }
} }
// If TLS is required but the server doesn't offer it, disconnect // If TLS is required but the server doesn't offer it, disconnect
@ -890,21 +919,20 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
* to be sent by the server. * to be sent by the server.
* *
* @throws SmackException if the parser could not be reset. * @throws SmackException if the parser could not be reset.
* @throws InterruptedException
*/ */
void openStream() throws SmackException, InterruptedException { void openStream() throws SmackException {
// If possible, provide the receiving entity of the stream open tag, i.e. the server, as much information as // If possible, provide the receiving entity of the stream open tag, i.e. the server, as much information as
// possible. The 'to' attribute is *always* available. The 'from' attribute if set by the user and no external // possible. The 'to' attribute is *always* available. The 'from' attribute if set by the user and no external
// mechanism is used to determine the local entity (user). And the 'id' attribute is available after the first // mechanism is used to determine the local entity (user). And the 'id' attribute is available after the first
// response from the server (see e.g. RFC 6120 § 9.1.1 Step 2.) // response from the server (see e.g. RFC 6120 § 9.1.1 Step 2.)
CharSequence to = getXMPPServiceDomain(); CharSequence to = getServiceName();
CharSequence from = null; CharSequence from = null;
CharSequence localpart = config.getUsername(); CharSequence localpart = config.getUsername();
if (localpart != null) { if (localpart != null) {
from = XmppStringUtils.completeJidFrom(localpart, to); from = XmppStringUtils.completeJidFrom(localpart, to);
} }
String id = getStreamId(); String id = getStreamId();
sendNonza(new StreamOpen(to, from, id)); send(new StreamOpen(to, from, id));
try { try {
packetReader.parser = PacketParserUtils.newXmppParser(reader); packetReader.parser = PacketParserUtils.newXmppParser(reader);
} }
@ -967,8 +995,8 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
// We found an opening stream. // We found an opening stream.
if ("jabber:client".equals(parser.getNamespace(null))) { if ("jabber:client".equals(parser.getNamespace(null))) {
streamId = parser.getAttributeValue("", "id"); streamId = parser.getAttributeValue("", "id");
String reportedServerDomain = parser.getAttributeValue("", "from"); String reportedServiceName = parser.getAttributeValue("", "from");
assert(config.getXMPPServiceDomain().equals(reportedServerDomain)); assert(reportedServiceName.equals(config.getServiceName()));
} }
break; break;
case "error": case "error":
@ -1124,33 +1152,8 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
break; break;
case XmlPullParser.END_TAG: case XmlPullParser.END_TAG:
if (parser.getName().equals("stream")) { if (parser.getName().equals("stream")) {
if (!parser.getNamespace().equals("http://etherx.jabber.org/streams")) { // Disconnect the connection
LOGGER.warning(XMPPTCPConnection.this + " </stream> but different namespace " + parser.getNamespace()); disconnect();
break;
}
// Check if the queue was already shut down before reporting success on closing stream tag
// received. This avoids a race if there is a disconnect(), followed by a connect(), which
// did re-start the queue again, causing this writer to assume that the queue is not
// shutdown, which results in a call to disconnect().
final boolean queueWasShutdown = packetWriter.queue.isShutdown();
closingStreamReceived.reportSuccess();
if (queueWasShutdown) {
// We received a closing stream element *after* we initiated the
// termination of the session by sending a closing stream element to
// the server first
return;
} else {
// We received a closing stream element from the server without us
// sending a closing stream element first. This means that the
// server wants to terminate the session, therefore disconnect
// the connection
LOGGER.info(XMPPTCPConnection.this
+ " received closing </stream> element."
+ " Server wants to terminate the connection, calling disconnect()");
disconnect();
}
} }
break; break;
case XmlPullParser.END_DOCUMENT: case XmlPullParser.END_DOCUMENT:
@ -1163,10 +1166,9 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
} }
} }
catch (Exception e) { catch (Exception e) {
closingStreamReceived.reportFailure(e);
// The exception can be ignored if the the connection is 'done' // The exception can be ignored if the the connection is 'done'
// or if the it was caused because the socket got closed // or if the it was caused because the socket got closed
if (!(done || packetWriter.queue.isShutdown())) { if (!(done || isSocketClosed())) {
// Close the connection and notify connection listeners of the // Close the connection and notify connection listeners of the
// error. // error.
notifyConnectionError(e); notifyConnectionError(e);
@ -1185,7 +1187,7 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
* Needs to be protected for unit testing purposes. * Needs to be protected for unit testing purposes.
*/ */
protected SynchronizationPoint<NoResponseException> shutdownDone = new SynchronizationPoint<NoResponseException>( protected SynchronizationPoint<NoResponseException> shutdownDone = new SynchronizationPoint<NoResponseException>(
XMPPTCPConnection.this, "shutdown completed"); XMPPTCPConnection.this);
/** /**
* If set, the stanza(/packet) writer is shut down * If set, the stanza(/packet) writer is shut down
@ -1233,14 +1235,9 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
} }
protected void throwNotConnectedExceptionIfDoneAndResumptionNotPossible() throws NotConnectedException { protected void throwNotConnectedExceptionIfDoneAndResumptionNotPossible() throws NotConnectedException {
final boolean done = done(); if (done() && !isSmResumptionPossible()) {
if (done) {
final boolean smResumptionPossbile = isSmResumptionPossible();
// Don't throw a NotConnectedException is there is an resumable stream available // Don't throw a NotConnectedException is there is an resumable stream available
if (!smResumptionPossbile) { throw new NotConnectedException();
throw new NotConnectedException(XMPPTCPConnection.this, "done=" + done
+ " smResumptionPossible=" + smResumptionPossbile);
}
} }
} }
@ -1249,37 +1246,39 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
* *
* @param element the element to send. * @param element the element to send.
* @throws NotConnectedException * @throws NotConnectedException
* @throws InterruptedException
*/ */
protected void sendStreamElement(Element element) throws NotConnectedException, InterruptedException { protected void sendStreamElement(Element element) throws NotConnectedException {
throwNotConnectedExceptionIfDoneAndResumptionNotPossible(); throwNotConnectedExceptionIfDoneAndResumptionNotPossible();
try {
queue.put(element); boolean enqueued = false;
} while (!enqueued) {
catch (InterruptedException e) { try {
// put() may throw an InterruptedException for two reasons: queue.put(element);
// 1. If the queue was shut down enqueued = true;
// 2. If the thread was interrupted }
// so we have to check which is the case catch (InterruptedException e) {
throwNotConnectedExceptionIfDoneAndResumptionNotPossible(); throwNotConnectedExceptionIfDoneAndResumptionNotPossible();
// If the method above did not throw, then the sending thread was interrupted // If the method above did not throw, then the sending thread was interrupted
throw e; // TODO in a later version of Smack the InterruptedException should be thrown to
// allow users to interrupt a sending thread that is currently blocking because
// the queue is full.
LOGGER.log(Level.WARNING, "Sending thread was interrupted", e);
}
} }
} }
/** /**
* Shuts down the stanza(/packet) writer. Once this method has been called, no further * Shuts down the stanza(/packet) writer. Once this method has been called, no further
* packets will be written to the server. * packets will be written to the server.
* @throws InterruptedException
*/ */
void shutdown(boolean instant) { void shutdown(boolean instant) {
instantShutdown = instant; instantShutdown = instant;
queue.shutdown();
shutdownTimestamp = System.currentTimeMillis(); shutdownTimestamp = System.currentTimeMillis();
queue.shutdown();
try { try {
shutdownDone.checkIfSuccessOrWait(); shutdownDone.checkIfSuccessOrWait();
} }
catch (NoResponseException | InterruptedException e) { catch (NoResponseException e) {
LOGGER.log(Level.WARNING, "shutdownDone was not marked as successful by the writer thread", e); LOGGER.log(Level.WARNING, "shutdownDone was not marked as successful by the writer thread", e);
} }
} }
@ -1376,15 +1375,7 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
throw new IllegalStateException(e); throw new IllegalStateException(e);
} }
} }
writer.write(element.toXML().toString());
CharSequence elementXml = element.toXML();
if (elementXml instanceof XmlStringBuilder) {
((XmlStringBuilder) elementXml).write(writer);
}
else {
writer.write(elementXml.toString());
}
if (queue.isEmpty()) { if (queue.isEmpty()) {
writer.flush(); writer.flush();
} }
@ -1415,7 +1406,6 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
catch (Exception e) { catch (Exception e) {
LOGGER.log(Level.WARNING, "Exception writing closing stream element", e); LOGGER.log(Level.WARNING, "Exception writing closing stream element", e);
} }
// Delete the queue contents (hopefully nothing is left). // Delete the queue contents (hopefully nothing is left).
queue.clear(); queue.clear();
} else if (instantShutdown && isSmEnabled()) { } else if (instantShutdown && isSmEnabled()) {
@ -1423,15 +1413,19 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
// into the unacknowledgedStanzas queue // into the unacknowledgedStanzas queue
drainWriterQueueToUnacknowledgedStanzas(); drainWriterQueueToUnacknowledgedStanzas();
} }
// Do *not* close the writer here, as it will cause the socket
// to get closed. But we may want to receive further stanzas try {
// until the closing stream tag is received. The socket will be writer.close();
// closed in shutdown(). }
catch (Exception e) {
// Do nothing
}
} }
catch (Exception e) { catch (Exception e) {
// The exception can be ignored if the the connection is 'done' // The exception can be ignored if the the connection is 'done'
// or if the it was caused because the socket got closed // or if the it was caused because the socket got closed
if (!(done() || queue.isShutdown())) { if (!(done() || isSocketClosed())) {
notifyConnectionError(e); notifyConnectionError(e);
} else { } else {
LOGGER.log(Level.FINE, "Ignoring Exception in writePackets()", e); LOGGER.log(Level.FINE, "Ignoring Exception in writePackets()", e);
@ -1466,19 +1460,8 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
* Set if Stream Management resumption should be used by default for new connections. * Set if Stream Management resumption should be used by default for new connections.
* *
* @param useSmResumptionDefault true to use Stream Management resumption for new connections. * @param useSmResumptionDefault true to use Stream Management resumption for new connections.
* @deprecated use {@link #setUseStreamManagementResumptionDefault(boolean)} instead.
*/ */
@Deprecated
public static void setUseStreamManagementResumptiodDefault(boolean useSmResumptionDefault) { public static void setUseStreamManagementResumptiodDefault(boolean useSmResumptionDefault) {
setUseStreamManagementDefault(useSmResumptionDefault);
}
/**
* Set if Stream Management resumption should be used by default for new connections.
*
* @param useSmResumptionDefault true to use Stream Management resumption for new connections.
*/
public static void setUseStreamManagementResumptionDefault(boolean useSmResumptionDefault) {
if (useSmResumptionDefault) { if (useSmResumptionDefault) {
// Also enable SM is resumption is enabled // Also enable SM is resumption is enabled
setUseStreamManagementDefault(useSmResumptionDefault); setUseStreamManagementDefault(useSmResumptionDefault);
@ -1560,16 +1543,15 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
* *
* @throws StreamManagementNotEnabledException if Stream Mangement is not enabled. * @throws StreamManagementNotEnabledException if Stream Mangement is not enabled.
* @throws NotConnectedException if the connection is not connected. * @throws NotConnectedException if the connection is not connected.
* @throws InterruptedException
*/ */
public void requestSmAcknowledgement() throws StreamManagementNotEnabledException, NotConnectedException, InterruptedException { public void requestSmAcknowledgement() throws StreamManagementNotEnabledException, NotConnectedException {
if (!isSmEnabled()) { if (!isSmEnabled()) {
throw new StreamManagementException.StreamManagementNotEnabledException(); throw new StreamManagementException.StreamManagementNotEnabledException();
} }
requestSmAcknowledgementInternal(); requestSmAcknowledgementInternal();
} }
private void requestSmAcknowledgementInternal() throws NotConnectedException, InterruptedException { private void requestSmAcknowledgementInternal() throws NotConnectedException {
packetWriter.sendStreamElement(AckRequest.INSTANCE); packetWriter.sendStreamElement(AckRequest.INSTANCE);
} }
@ -1583,16 +1565,15 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
* *
* @throws StreamManagementNotEnabledException if Stream Management is not enabled. * @throws StreamManagementNotEnabledException if Stream Management is not enabled.
* @throws NotConnectedException if the connection is not connected. * @throws NotConnectedException if the connection is not connected.
* @throws InterruptedException
*/ */
public void sendSmAcknowledgement() throws StreamManagementNotEnabledException, NotConnectedException, InterruptedException { public void sendSmAcknowledgement() throws StreamManagementNotEnabledException, NotConnectedException {
if (!isSmEnabled()) { if (!isSmEnabled()) {
throw new StreamManagementException.StreamManagementNotEnabledException(); throw new StreamManagementException.StreamManagementNotEnabledException();
} }
sendSmAcknowledgementInternal(); sendSmAcknowledgementInternal();
} }
private void sendSmAcknowledgementInternal() throws NotConnectedException, InterruptedException { private void sendSmAcknowledgementInternal() throws NotConnectedException {
packetWriter.sendStreamElement(new AckAnswer(clientHandledStanzasCount)); packetWriter.sendStreamElement(new AckAnswer(clientHandledStanzasCount));
} }
@ -1805,8 +1786,8 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
try { try {
listener.processPacket(ackedStanza); listener.processPacket(ackedStanza);
} }
catch (InterruptedException | NotConnectedException e) { catch (NotConnectedException e) {
LOGGER.log(Level.FINER, "Received exception", e); LOGGER.log(Level.FINER, "Received not connected exception", e);
} }
} }
String id = ackedStanza.getStanzaId(); String id = ackedStanza.getStanzaId();
@ -1818,8 +1799,8 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
try { try {
listener.processPacket(ackedStanza); listener.processPacket(ackedStanza);
} }
catch (InterruptedException | NotConnectedException e) { catch (NotConnectedException e) {
LOGGER.log(Level.FINER, "Received exception", e); LOGGER.log(Level.FINER, "Received not connected exception", e);
} }
} }
} }