From f5115f46669e2d17fab080f490e0be0e390c4597 Mon Sep 17 00:00:00 2001 From: Florian Schmaus Date: Wed, 21 Oct 2015 07:42:47 +0200 Subject: [PATCH 1/6] Empty unacknowledgedStanzas when 'resumed' is received by changing stanzasToResend.addAll(unacknowledgedStanzas); to unacknowledgedStanzas.drainTo(stanzasToResend); Also use sendStanzaInternal to call the callbacks, which also requires the smEnabledSyncPoint to got signaled. Fixes SMACK-700. Thanks to Juan Antonio for reporting this. --- .../org/jivesoftware/smack/tcp/XMPPTCPConnection.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java b/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java index bd179028a..28070cdfc 100644 --- a/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java +++ b/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java @@ -1086,16 +1086,17 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { if (!smSessionId.equals(resumed.getPrevId())) { throw new StreamIdDoesNotMatchException(smSessionId, resumed.getPrevId()); } + // Mark SM as enabled and resumption as successful. + smResumedSyncPoint.reportSuccess(); + smEnabledSyncPoint.reportSuccess(); // First, drop the stanzas already handled by the server processHandledCount(resumed.getHandledCount()); // Then re-send what is left in the unacknowledged queue - List stanzasToResend = new LinkedList(); - stanzasToResend.addAll(unacknowledgedStanzas); + List stanzasToResend = new ArrayList<>(unacknowledgedStanzas.size()); + unacknowledgedStanzas.drainTo(stanzasToResend); for (Stanza stanza : stanzasToResend) { - packetWriter.sendStreamElement(stanza); + sendStanzaInternal(stanza); } - smResumedSyncPoint.reportSuccess(); - smEnabledSyncPoint.reportSuccess(); // If there where stanzas resent, then request a SM ack for them. // Writer's sendStreamElement() won't do it automatically based on // predicates. From 640849dac5eff7374ff465973a70323522a2c0a8 Mon Sep 17 00:00:00 2001 From: Florian Schmaus Date: Wed, 21 Oct 2015 10:59:21 +0200 Subject: [PATCH 2/6] Revert "Typo in XMPPTCPConnection s/ResumptiodDefault/ResumptionDefault/" This reverts commit 7ceb5f09df8f98d0d8155e05335535a404e1d959. Somehow 7ceb5f09 introduced from the master branch into the 4.1 branch. --- .../smack/tcp/XMPPTCPConnection.java | 339 +++++++++--------- 1 file changed, 160 insertions(+), 179 deletions(-) diff --git a/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java b/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java index 28070cdfc..ea27a0227 100644 --- a/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java +++ b/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java @@ -20,6 +20,7 @@ import org.jivesoftware.smack.AbstractConnectionListener; import org.jivesoftware.smack.AbstractXMPPConnection; import org.jivesoftware.smack.ConnectionConfiguration; import org.jivesoftware.smack.ConnectionConfiguration.SecurityMode; +import org.jivesoftware.smack.ConnectionCreationListener; import org.jivesoftware.smack.StanzaListener; import org.jivesoftware.smack.SmackConfiguration; import org.jivesoftware.smack.SmackException; @@ -67,19 +68,14 @@ import org.jivesoftware.smack.sm.packet.StreamManagement.Resumed; import org.jivesoftware.smack.sm.packet.StreamManagement.StreamManagementFeature; import org.jivesoftware.smack.sm.predicates.Predicate; import org.jivesoftware.smack.sm.provider.ParseStreamManagement; -import org.jivesoftware.smack.packet.Nonza; +import org.jivesoftware.smack.packet.PlainStreamElement; import org.jivesoftware.smack.packet.XMPPError; -import org.jivesoftware.smack.proxy.ProxyInfo; import org.jivesoftware.smack.util.ArrayBlockingQueueWithShutdown; import org.jivesoftware.smack.util.Async; import org.jivesoftware.smack.util.PacketParserUtils; import org.jivesoftware.smack.util.StringUtils; import org.jivesoftware.smack.util.TLSUtils; -import org.jivesoftware.smack.util.XmlStringBuilder; import org.jivesoftware.smack.util.dns.HostAddress; -import org.jxmpp.jid.impl.JidCreate; -import org.jxmpp.jid.parts.Resourcepart; -import org.jxmpp.stringprep.XmppStringprepException; import org.jxmpp.util.XmppStringUtils; import org.xmlpull.v1.XmlPullParser; import org.xmlpull.v1.XmlPullParserException; @@ -156,6 +152,14 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { */ private boolean disconnectedButResumeable = false; + /** + * Flag to indicate if the socket was closed intentionally by Smack. + *

+ * This boolean flag is used concurrently, therefore it is marked volatile. + *

+ */ + private volatile boolean socketClosed = false; + private boolean usingTLS = false; /** @@ -168,27 +172,19 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { */ protected PacketReader packetReader; - private final SynchronizationPoint initalOpenStreamSend = new SynchronizationPoint<>( - this, "initial open stream element send to server"); + private final SynchronizationPoint initalOpenStreamSend = new SynchronizationPoint(this); /** * */ private final SynchronizationPoint maybeCompressFeaturesReceived = new SynchronizationPoint( - this, "stream compression feature"); + this); /** * */ private final SynchronizationPoint compressSyncPoint = new SynchronizationPoint( - this, "stream compression"); - - /** - * A synchronization point which is successful if this connection has received the closing - * stream element from the remote end-point, i.e. the server. - */ - private final SynchronizationPoint closingStreamReceived = new SynchronizationPoint<>( - this, "stream closing element received"); + this); /** * The default bundle and defer callback, used for new connections. @@ -217,10 +213,10 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { private String smSessionId; private final SynchronizationPoint smResumedSyncPoint = new SynchronizationPoint( - this, "stream resumed element"); + this); private final SynchronizationPoint smEnabledSyncPoint = new SynchronizationPoint( - this, "stream enabled element"); + this); /** * The client's preferred maximum resumption time in seconds. @@ -320,9 +316,8 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { * * @param jid the bare JID used by the client. * @param password the password or authentication token. - * @throws XmppStringprepException */ - public XMPPTCPConnection(CharSequence jid, String password) throws XmppStringprepException { + public XMPPTCPConnection(CharSequence jid, String password) { this(XmppStringUtils.parseLocalpart(jid.toString()), password, XmppStringUtils.parseDomain(jid.toString())); } @@ -336,11 +331,10 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { * @param username * @param password * @param serviceName - * @throws XmppStringprepException */ - public XMPPTCPConnection(CharSequence username, String password, String serviceName) throws XmppStringprepException { - this(XMPPTCPConnectionConfiguration.builder().setUsernameAndPassword(username, password).setXmppDomain( - JidCreate.domainBareFrom(serviceName)).build()); + public XMPPTCPConnection(CharSequence username, String password, String serviceName) { + this(XMPPTCPConnectionConfiguration.builder().setUsernameAndPassword(username, password).setServiceName( + serviceName).build()); } @Override @@ -366,21 +360,31 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { } @Override - protected void afterSuccessfulLogin(final boolean resumed) throws NotConnectedException, InterruptedException { + protected void afterSuccessfulLogin(final boolean resumed) throws NotConnectedException { // Reset the flag in case it was set disconnectedButResumeable = false; super.afterSuccessfulLogin(resumed); } @Override - protected synchronized void loginInternal(String username, String password, Resourcepart resource) throws XMPPException, - SmackException, IOException, InterruptedException { - // Authenticate using SASL - saslAuthentication.authenticate(username, password, config.getAuthzid()); + protected synchronized void loginNonAnonymously(String username, String password, String resource) throws XMPPException, SmackException, IOException { + if (saslAuthentication.hasNonAnonymousAuthentication()) { + // Authenticate using SASL + if (password != null) { + saslAuthentication.authenticate(username, password, resource); + } + else { + saslAuthentication.authenticate(resource, config.getCallbackHandler()); + } + } else { + throw new SmackException("No non-anonymous SASL authentication mechanism available"); + } // If compression is enabled then request the server to use stream compression. XEP-170 // recommends to perform stream compression before resource binding. - maybeEnableCompression(); + if (config.isCompressionEnabled()) { + useCompression(); + } if (isSmResumptionPossible()) { smResumedSyncPoint.sendAndWaitForResponse(new Resume(clientHandledStanzasCount, smSessionId)); @@ -432,11 +436,37 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { afterSuccessfulLogin(false); } + @Override + public synchronized void loginAnonymously() throws XMPPException, SmackException, IOException { + // Wait with SASL auth until the SASL mechanisms have been received + saslFeatureReceived.checkIfSuccessOrWaitOrThrow(); + + if (saslAuthentication.hasAnonymousAuthentication()) { + saslAuthentication.authenticateAnonymously(); + } + else { + throw new SmackException("No anonymous SASL authentication mechanism available"); + } + + // If compression is enabled then request the server to use stream compression + if (config.isCompressionEnabled()) { + useCompression(); + } + + bindResourceAndEstablishSession(null); + + afterSuccessfulLogin(false); + } + @Override public boolean isSecureConnection() { return usingTLS; } + public boolean isSocketClosed() { + return socketClosed; + } + /** * Shuts the current connection down. After this method returns, the connection must be ready * for re-use by connect. @@ -448,7 +478,7 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { // Try to send a last SM Acknowledgement. Most servers won't find this information helpful, as the SM // state is dropped after a clean disconnect anyways. OTOH it doesn't hurt much either. sendSmAcknowledgementInternal(); - } catch (InterruptedException | NotConnectedException e) { + } catch (NotConnectedException e) { LOGGER.log(Level.FINE, "Can not send final SM ack as connection is not connected", e); } } @@ -466,27 +496,18 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { if (disconnectedButResumeable) { return; } - - // First shutdown the writer, this will result in a closing stream element getting send to - // the server - if (packetWriter != null) { - packetWriter.shutdown(instant); - } - - try { - // After we send the closing stream element, check if there was already a - // closing stream element sent by the server or wait with a timeout for a - // closing stream element to be received from the server. - Exception res = closingStreamReceived.checkIfSuccessOrWait(); - LOGGER.info("closingstream " + res); - } catch (InterruptedException | NoResponseException e) { - LOGGER.log(Level.INFO, "Exception while waiting for closing stream element from the server " + this, e); - } - if (packetReader != null) { packetReader.shutdown(); } + if (packetWriter != null) { + packetWriter.shutdown(instant); + } + // Set socketClosed to true. This will cause the PacketReader + // and PacketWriter to ignore any Exceptions that are thrown + // because of a read/write from/to a closed stream. + // It is *important* that this is done before socket.close()! + socketClosed = true; try { socket.close(); } catch (Exception e) { @@ -519,12 +540,12 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { } @Override - public void sendNonza(Nonza element) throws NotConnectedException, InterruptedException { + public void send(PlainStreamElement element) throws NotConnectedException { packetWriter.sendStreamElement(element); } @Override - protected void sendStanzaInternal(Stanza packet) throws NotConnectedException, InterruptedException { + protected void sendStanzaInternal(Stanza packet) throws NotConnectedException { packetWriter.sendStreamElement(packet); if (isSmEnabled()) { for (StanzaFilter requestAckPredicate : requestAckPredicates) { @@ -539,8 +560,6 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { private void connectUsingConfiguration() throws IOException, ConnectionException { List failedAddresses = populateHostAddresses(); SocketFactory socketFactory = config.getSocketFactory(); - ProxyInfo proxyInfo = config.getProxyInfo(); - int timeout = config.getConnectTimeout(); if (socketFactory == null) { socketFactory = SocketFactory.getDefault(); } @@ -560,12 +579,7 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { final String inetAddressAndPort = inetAddress + " at port " + port; LOGGER.finer("Trying to establish TCP connection to " + inetAddressAndPort); try { - if (proxyInfo == null) { - socket.connect(new InetSocketAddress(inetAddress, port), timeout); - } - else { - proxyInfo.getProxySocketConnection().connect(socket, inetAddress, port, timeout); - } + socket.connect(new InetSocketAddress(inetAddress, port), config.getConnectTimeout()); } catch (Exception e) { if (inetAddresses.hasNext()) { continue innerloop; @@ -624,6 +638,13 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { // Start the packet reader. The startup() method will block until we // get an opening stream packet back from server packetReader.init(); + + if (isFirstInitialization) { + // Notify listeners that a new connection has been established + for (ConnectionCreationListener listener : getConnectionCreationListeners()) { + listener.connectionCreated(this); + } + } } private void initReaderAndWriter() throws IOException { @@ -661,8 +682,14 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { KeyManager[] kms = null; PasswordCallback pcb = null; - if (context == null) { - if(config.getKeystoreType().equals("PKCS11")) { + if(config.getCallbackHandler() == null) { + ks = null; + } else if (context == null) { + if(config.getKeystoreType().equals("NONE")) { + ks = null; + pcb = null; + } + else if(config.getKeystoreType().equals("PKCS11")) { try { Constructor c = Class.forName("sun.security.pkcs11.SunPKCS11").getConstructor(InputStream.class); String pkcs11Config = "name = SmartCard\nlibrary = "+config.getPKCS11Library(); @@ -732,8 +759,8 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { final HostnameVerifier verifier = getConfiguration().getHostnameVerifier(); if (verifier == null) { throw new IllegalStateException("No HostnameVerifier set. Use connectionConfiguration.setHostnameVerifier() to configure."); - } else if (!verifier.verify(getXMPPServiceDomain().toString(), sslSocket.getSession())) { - throw new CertificateException("Hostname verification of certificate failed. Certificate does not authenticate " + getXMPPServiceDomain()); + } else if (!verifier.verify(getServiceName(), sslSocket.getSession())) { + throw new CertificateException("Hostname verification of certificate failed. Certificate does not authenticate " + getServiceName()); } // Set that TLS was successful @@ -746,7 +773,12 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { * @return a instance of XMPPInputOutputStream or null if no suitable instance was found * */ - private static XMPPInputOutputStream maybeGetCompressionHandler(Compress.Feature compression) { + private XMPPInputOutputStream maybeGetCompressionHandler() { + Compress.Feature compression = getFeature(Compress.Feature.ELEMENT, Compress.NAMESPACE); + if (compression == null) { + // Server does not support compression + return null; + } for (XMPPInputOutputStream handler : SmackConfiguration.getCompresionHandlers()) { String method = handler.getCompressionMethod(); if (compression.getMethods().contains(method)) @@ -776,21 +808,12 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { * @throws NotConnectedException * @throws XMPPException * @throws NoResponseException - * @throws InterruptedException */ - private void maybeEnableCompression() throws NotConnectedException, NoResponseException, XMPPException, InterruptedException { - if (!config.isCompressionEnabled()) { - return; - } + private void useCompression() throws NotConnectedException, NoResponseException, XMPPException { maybeCompressFeaturesReceived.checkIfSuccessOrWait(); - Compress.Feature compression = getFeature(Compress.Feature.ELEMENT, Compress.NAMESPACE); - if (compression == null) { - // Server does not support compression - return; - } // If stream compression was offered by the server and we want to use // compression then send compression request to the server - if ((compressionHandler = maybeGetCompressionHandler(compression)) != null) { + if ((compressionHandler = maybeGetCompressionHandler()) != null) { compressSyncPoint.sendAndWaitForResponseOrThrow(new Compress(compressionHandler.getCompressionMethod())); } else { LOGGER.warning("Could not enable compression because no matching handler/method pair was found"); @@ -798,26 +821,25 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { } /** - * Establishes a connection to the XMPP server. It basically - * creates and maintains a socket connection to the server. - *

+ * Establishes a connection to the XMPP server and performs an automatic login + * only if the previous connection state was logged (authenticated). It basically + * creates and maintains a socket connection to the server.

+ *

* Listeners will be preserved from a previous connection if the reconnection * occurs after an abrupt termination. - *

* * @throws XMPPException if an error occurs while trying to establish the connection. * @throws SmackException * @throws IOException - * @throws InterruptedException */ @Override - protected void connectInternal() throws SmackException, IOException, XMPPException, InterruptedException { - closingStreamReceived.init(); + protected void connectInternal() throws SmackException, IOException, XMPPException { // Establishes the TCP connection to the server and does setup the reader and writer. Throws an exception if // there is an error establishing the connection connectUsingConfiguration(); // We connected successfully to the servers TCP port + socketClosed = false; initConnection(); // Wait with SASL auth until the SASL mechanisms have been received @@ -826,6 +848,13 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { // Make note of the fact that we're now connected. connected = true; callConnectionConnectedListener(); + + // Automatically makes the login if the user was previously connected successfully + // to the server and the connection was terminated abruptly + if (wasAuthenticated) { + login(); + notifyReconnection(); + } } /** @@ -856,7 +885,7 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { } @Override - protected void afterFeaturesReceived() throws SecurityRequiredException, NotConnectedException, InterruptedException { + protected void afterFeaturesReceived() throws SecurityRequiredException, NotConnectedException { StartTls startTlsFeature = getFeature(StartTls.ELEMENT, StartTls.NAMESPACE); if (startTlsFeature != null) { if (startTlsFeature.required() && config.getSecurityMode() == SecurityMode.disabled) { @@ -865,7 +894,7 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { } if (config.getSecurityMode() != ConnectionConfiguration.SecurityMode.disabled) { - sendNonza(new StartTls()); + send(new StartTls()); } } // If TLS is required but the server doesn't offer it, disconnect @@ -890,21 +919,20 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { * to be sent by the server. * * @throws SmackException if the parser could not be reset. - * @throws InterruptedException */ - void openStream() throws SmackException, InterruptedException { + void openStream() throws SmackException { // If possible, provide the receiving entity of the stream open tag, i.e. the server, as much information as // possible. The 'to' attribute is *always* available. The 'from' attribute if set by the user and no external // mechanism is used to determine the local entity (user). And the 'id' attribute is available after the first // response from the server (see e.g. RFC 6120 ยง 9.1.1 Step 2.) - CharSequence to = getXMPPServiceDomain(); + CharSequence to = getServiceName(); CharSequence from = null; CharSequence localpart = config.getUsername(); if (localpart != null) { from = XmppStringUtils.completeJidFrom(localpart, to); } String id = getStreamId(); - sendNonza(new StreamOpen(to, from, id)); + send(new StreamOpen(to, from, id)); try { packetReader.parser = PacketParserUtils.newXmppParser(reader); } @@ -967,8 +995,8 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { // We found an opening stream. if ("jabber:client".equals(parser.getNamespace(null))) { streamId = parser.getAttributeValue("", "id"); - String reportedServerDomain = parser.getAttributeValue("", "from"); - assert(config.getXMPPServiceDomain().equals(reportedServerDomain)); + String reportedServiceName = parser.getAttributeValue("", "from"); + assert(reportedServiceName.equals(config.getServiceName())); } break; case "error": @@ -1124,33 +1152,8 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { break; case XmlPullParser.END_TAG: if (parser.getName().equals("stream")) { - if (!parser.getNamespace().equals("http://etherx.jabber.org/streams")) { - LOGGER.warning(XMPPTCPConnection.this + " but different namespace " + parser.getNamespace()); - break; - } - - // Check if the queue was already shut down before reporting success on closing stream tag - // received. This avoids a race if there is a disconnect(), followed by a connect(), which - // did re-start the queue again, causing this writer to assume that the queue is not - // shutdown, which results in a call to disconnect(). - final boolean queueWasShutdown = packetWriter.queue.isShutdown(); - closingStreamReceived.reportSuccess(); - - if (queueWasShutdown) { - // We received a closing stream element *after* we initiated the - // termination of the session by sending a closing stream element to - // the server first - return; - } else { - // We received a closing stream element from the server without us - // sending a closing stream element first. This means that the - // server wants to terminate the session, therefore disconnect - // the connection - LOGGER.info(XMPPTCPConnection.this - + " received closing element." - + " Server wants to terminate the connection, calling disconnect()"); - disconnect(); - } + // Disconnect the connection + disconnect(); } break; case XmlPullParser.END_DOCUMENT: @@ -1163,10 +1166,9 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { } } catch (Exception e) { - closingStreamReceived.reportFailure(e); // The exception can be ignored if the the connection is 'done' // or if the it was caused because the socket got closed - if (!(done || packetWriter.queue.isShutdown())) { + if (!(done || isSocketClosed())) { // Close the connection and notify connection listeners of the // error. notifyConnectionError(e); @@ -1185,7 +1187,7 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { * Needs to be protected for unit testing purposes. */ protected SynchronizationPoint shutdownDone = new SynchronizationPoint( - XMPPTCPConnection.this, "shutdown completed"); + XMPPTCPConnection.this); /** * If set, the stanza(/packet) writer is shut down @@ -1233,14 +1235,9 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { } protected void throwNotConnectedExceptionIfDoneAndResumptionNotPossible() throws NotConnectedException { - final boolean done = done(); - if (done) { - final boolean smResumptionPossbile = isSmResumptionPossible(); + if (done() && !isSmResumptionPossible()) { // Don't throw a NotConnectedException is there is an resumable stream available - if (!smResumptionPossbile) { - throw new NotConnectedException(XMPPTCPConnection.this, "done=" + done - + " smResumptionPossible=" + smResumptionPossbile); - } + throw new NotConnectedException(); } } @@ -1249,37 +1246,39 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { * * @param element the element to send. * @throws NotConnectedException - * @throws InterruptedException */ - protected void sendStreamElement(Element element) throws NotConnectedException, InterruptedException { + protected void sendStreamElement(Element element) throws NotConnectedException { throwNotConnectedExceptionIfDoneAndResumptionNotPossible(); - try { - queue.put(element); - } - catch (InterruptedException e) { - // put() may throw an InterruptedException for two reasons: - // 1. If the queue was shut down - // 2. If the thread was interrupted - // so we have to check which is the case - throwNotConnectedExceptionIfDoneAndResumptionNotPossible(); - // If the method above did not throw, then the sending thread was interrupted - throw e; + + boolean enqueued = false; + while (!enqueued) { + try { + queue.put(element); + enqueued = true; + } + catch (InterruptedException e) { + throwNotConnectedExceptionIfDoneAndResumptionNotPossible(); + // If the method above did not throw, then the sending thread was interrupted + // TODO in a later version of Smack the InterruptedException should be thrown to + // allow users to interrupt a sending thread that is currently blocking because + // the queue is full. + LOGGER.log(Level.WARNING, "Sending thread was interrupted", e); + } } } /** * Shuts down the stanza(/packet) writer. Once this method has been called, no further * packets will be written to the server. - * @throws InterruptedException */ void shutdown(boolean instant) { instantShutdown = instant; - queue.shutdown(); shutdownTimestamp = System.currentTimeMillis(); + queue.shutdown(); try { shutdownDone.checkIfSuccessOrWait(); } - catch (NoResponseException | InterruptedException e) { + catch (NoResponseException e) { LOGGER.log(Level.WARNING, "shutdownDone was not marked as successful by the writer thread", e); } } @@ -1376,15 +1375,7 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { throw new IllegalStateException(e); } } - - CharSequence elementXml = element.toXML(); - if (elementXml instanceof XmlStringBuilder) { - ((XmlStringBuilder) elementXml).write(writer); - } - else { - writer.write(elementXml.toString()); - } - + writer.write(element.toXML().toString()); if (queue.isEmpty()) { writer.flush(); } @@ -1415,7 +1406,6 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { catch (Exception e) { LOGGER.log(Level.WARNING, "Exception writing closing stream element", e); } - // Delete the queue contents (hopefully nothing is left). queue.clear(); } else if (instantShutdown && isSmEnabled()) { @@ -1423,15 +1413,19 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { // into the unacknowledgedStanzas queue drainWriterQueueToUnacknowledgedStanzas(); } - // Do *not* close the writer here, as it will cause the socket - // to get closed. But we may want to receive further stanzas - // until the closing stream tag is received. The socket will be - // closed in shutdown(). + + try { + writer.close(); + } + catch (Exception e) { + // Do nothing + } + } catch (Exception e) { // The exception can be ignored if the the connection is 'done' // or if the it was caused because the socket got closed - if (!(done() || queue.isShutdown())) { + if (!(done() || isSocketClosed())) { notifyConnectionError(e); } else { LOGGER.log(Level.FINE, "Ignoring Exception in writePackets()", e); @@ -1466,19 +1460,8 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { * Set if Stream Management resumption should be used by default for new connections. * * @param useSmResumptionDefault true to use Stream Management resumption for new connections. - * @deprecated use {@link #setUseStreamManagementResumptionDefault(boolean)} instead. */ - @Deprecated public static void setUseStreamManagementResumptiodDefault(boolean useSmResumptionDefault) { - setUseStreamManagementDefault(useSmResumptionDefault); - } - - /** - * Set if Stream Management resumption should be used by default for new connections. - * - * @param useSmResumptionDefault true to use Stream Management resumption for new connections. - */ - public static void setUseStreamManagementResumptionDefault(boolean useSmResumptionDefault) { if (useSmResumptionDefault) { // Also enable SM is resumption is enabled setUseStreamManagementDefault(useSmResumptionDefault); @@ -1560,16 +1543,15 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { * * @throws StreamManagementNotEnabledException if Stream Mangement is not enabled. * @throws NotConnectedException if the connection is not connected. - * @throws InterruptedException */ - public void requestSmAcknowledgement() throws StreamManagementNotEnabledException, NotConnectedException, InterruptedException { + public void requestSmAcknowledgement() throws StreamManagementNotEnabledException, NotConnectedException { if (!isSmEnabled()) { throw new StreamManagementException.StreamManagementNotEnabledException(); } requestSmAcknowledgementInternal(); } - private void requestSmAcknowledgementInternal() throws NotConnectedException, InterruptedException { + private void requestSmAcknowledgementInternal() throws NotConnectedException { packetWriter.sendStreamElement(AckRequest.INSTANCE); } @@ -1583,16 +1565,15 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { * * @throws StreamManagementNotEnabledException if Stream Management is not enabled. * @throws NotConnectedException if the connection is not connected. - * @throws InterruptedException */ - public void sendSmAcknowledgement() throws StreamManagementNotEnabledException, NotConnectedException, InterruptedException { + public void sendSmAcknowledgement() throws StreamManagementNotEnabledException, NotConnectedException { if (!isSmEnabled()) { throw new StreamManagementException.StreamManagementNotEnabledException(); } sendSmAcknowledgementInternal(); } - private void sendSmAcknowledgementInternal() throws NotConnectedException, InterruptedException { + private void sendSmAcknowledgementInternal() throws NotConnectedException { packetWriter.sendStreamElement(new AckAnswer(clientHandledStanzasCount)); } @@ -1805,8 +1786,8 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { try { listener.processPacket(ackedStanza); } - catch (InterruptedException | NotConnectedException e) { - LOGGER.log(Level.FINER, "Received exception", e); + catch (NotConnectedException e) { + LOGGER.log(Level.FINER, "Received not connected exception", e); } } String id = ackedStanza.getStanzaId(); @@ -1818,8 +1799,8 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { try { listener.processPacket(ackedStanza); } - catch (InterruptedException | NotConnectedException e) { - LOGGER.log(Level.FINER, "Received exception", e); + catch (NotConnectedException e) { + LOGGER.log(Level.FINER, "Received not connected exception", e); } } } From 9c9d1f893a88e12170f17db2db1dd4eaa252f1fe Mon Sep 17 00:00:00 2001 From: Florian Schmaus Date: Wed, 21 Oct 2015 11:04:09 +0200 Subject: [PATCH 3/6] Typo in XMPPTCPConnection s/ResumptiodDefault/ResumptionDefault/ --- .../org/jivesoftware/smack/tcp/XMPPTCPConnection.java | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java b/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java index ea27a0227..cb1a3216b 100644 --- a/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java +++ b/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java @@ -1460,8 +1460,19 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { * Set if Stream Management resumption should be used by default for new connections. * * @param useSmResumptionDefault true to use Stream Management resumption for new connections. + * @deprecated use {@link #setUseStreamManagementResumptionDefault(boolean)} instead. */ + @Deprecated public static void setUseStreamManagementResumptiodDefault(boolean useSmResumptionDefault) { + setUseStreamManagementResumptionDefault(useSmResumptionDefault); + } + + /** + * Set if Stream Management resumption should be used by default for new connections. + * + * @param useSmResumptionDefault true to use Stream Management resumption for new connections. + */ + public static void setUseStreamManagementResumptionDefault(boolean useSmResumptionDefault) { if (useSmResumptionDefault) { // Also enable SM is resumption is enabled setUseStreamManagementDefault(useSmResumptionDefault); From cc758b8f591c9517571058b45ae0f888c64b2dd6 Mon Sep 17 00:00:00 2001 From: Florian Schmaus Date: Wed, 21 Oct 2015 16:03:45 +0200 Subject: [PATCH 4/6] Use ackedStanzasCount instead of handledCount in processHandledCount for the initial size of the ackedStanzas list. Thanks to Juan Antonio for reporting. --- .../main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java b/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java index cb1a3216b..cd4e320c6 100644 --- a/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java +++ b/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java @@ -1758,7 +1758,7 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { private void processHandledCount(long handledCount) throws StreamManagementCounterError { long ackedStanzasCount = SMUtils.calculateDelta(handledCount, serverHandledStanzasCount); final List ackedStanzas = new ArrayList( - handledCount <= Integer.MAX_VALUE ? (int) handledCount + ackedStanzasCount <= Integer.MAX_VALUE ? (int) ackedStanzasCount : Integer.MAX_VALUE); for (long i = 0; i < ackedStanzasCount; i++) { Stanza ackedStanza = unacknowledgedStanzas.poll(); From 81fb1ed93ceb3a2dad475099015ea749802457fe Mon Sep 17 00:00:00 2001 From: Florian Schmaus Date: Fri, 20 Nov 2015 15:21:47 +0100 Subject: [PATCH 5/6] Make executorService blocking in AbstractXMPPConnection Otherwise AbstractXMPPConnection.processPacket() will throw a RejectedExecutionException if the underlying queue is full. Fixes SMACK-702. --- .../smack/AbstractXMPPConnection.java | 12 ++-- .../smack/util/BoundedThreadPoolExecutor.java | 63 +++++++++++++++++++ 2 files changed, 69 insertions(+), 6 deletions(-) create mode 100644 smack-core/src/main/java/org/jivesoftware/smack/util/BoundedThreadPoolExecutor.java diff --git a/smack-core/src/main/java/org/jivesoftware/smack/AbstractXMPPConnection.java b/smack-core/src/main/java/org/jivesoftware/smack/AbstractXMPPConnection.java index 7653da444..7b680b1a3 100644 --- a/smack-core/src/main/java/org/jivesoftware/smack/AbstractXMPPConnection.java +++ b/smack-core/src/main/java/org/jivesoftware/smack/AbstractXMPPConnection.java @@ -27,14 +27,12 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; @@ -73,6 +71,7 @@ import org.jivesoftware.smack.parsing.ParsingExceptionCallback; import org.jivesoftware.smack.parsing.UnparsablePacket; import org.jivesoftware.smack.provider.ExtensionElementProvider; import org.jivesoftware.smack.provider.ProviderManager; +import org.jivesoftware.smack.util.BoundedThreadPoolExecutor; import org.jivesoftware.smack.util.DNSUtil; import org.jivesoftware.smack.util.Objects; import org.jivesoftware.smack.util.PacketParserUtils; @@ -237,8 +236,8 @@ public abstract class AbstractXMPPConnection implements XMPPConnection { * important that we use a single threaded ExecutorService in order to guarantee that the * PacketListeners are invoked in the same order the stanzas arrived. */ - private final ThreadPoolExecutor executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, - new ArrayBlockingQueue(100), new SmackExecutorThreadFactory(connectionCounterValue, "Incoming Processor")); + private final BoundedThreadPoolExecutor executorService = new BoundedThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, + 100, new SmackExecutorThreadFactory(connectionCounterValue, "Incoming Processor")); /** * This scheduled thread pool executor is used to remove pending callbacks. @@ -976,12 +975,13 @@ public abstract class AbstractXMPPConnection implements XMPPConnection { * they are a match with the filter. * * @param packet the stanza(/packet) to process. + * @throws InterruptedException */ - protected void processPacket(Stanza packet) { + protected void processPacket(Stanza packet) throws InterruptedException { assert(packet != null); lastStanzaReceived = System.currentTimeMillis(); // Deliver the incoming packet to listeners. - executorService.submit(new ListenerNotification(packet)); + executorService.executeBlocking(new ListenerNotification(packet)); } /** diff --git a/smack-core/src/main/java/org/jivesoftware/smack/util/BoundedThreadPoolExecutor.java b/smack-core/src/main/java/org/jivesoftware/smack/util/BoundedThreadPoolExecutor.java new file mode 100644 index 000000000..c09ddd5d5 --- /dev/null +++ b/smack-core/src/main/java/org/jivesoftware/smack/util/BoundedThreadPoolExecutor.java @@ -0,0 +1,63 @@ +/** + * + * Copyright 2015 Florian Schmaus + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.jivesoftware.smack.util; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.Semaphore; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +public class BoundedThreadPoolExecutor extends ThreadPoolExecutor { + + private final Semaphore semaphore; + + public BoundedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, + TimeUnit unit, int bound, ThreadFactory threadFactory) { + // One could think that the array blocking queue bound should be "bound - 1" because the bound protected by the + // Semaphore also includes the "slot" in the worker thread executing the Runnable. But using that as bound could + // actually cause a RejectedExecutionException as the queue could fill up while the worker thread remains + // unscheduled and is thus not removing any entries. + super(corePoolSize, maximumPoolSize, keepAliveTime, + unit, new ArrayBlockingQueue(bound), threadFactory); + semaphore = new Semaphore(bound); + } + + public void executeBlocking(final Runnable command) throws InterruptedException { + semaphore.acquire(); + try { + execute(new Runnable() { + @Override + public void run() { + try { + command.run(); + } finally { + semaphore.release(); + } + } + }); + } catch (Exception e) { + semaphore.release(); + if (e instanceof RejectedExecutionException) { + throw (RejectedExecutionException) e; + } else { + throw new RuntimeException(e); + } + } + } +} From 06becd4ac284d72a90deeddbf4768666b2fd71f2 Mon Sep 17 00:00:00 2001 From: Florian Schmaus Date: Sun, 22 Nov 2015 22:34:30 +0100 Subject: [PATCH 6/6] Smack 4.1.5 --- resources/releasedocs/changelog.html | 12 ++++++++++++ version.gradle | 2 +- 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/resources/releasedocs/changelog.html b/resources/releasedocs/changelog.html index 688d8bd6f..7ffbe2f85 100644 --- a/resources/releasedocs/changelog.html +++ b/resources/releasedocs/changelog.html @@ -141,6 +141,18 @@ hr {
+

4.1.5 -- 2015-11-22

+ +

Bug +

+
    +
  • [SMACK-698] - Time creates invalid XML +
  • +
  • [SMACK-700] - Duplicate stanzas in unacknowledgedStanzas queue when stream is resumed +
  • +
  • [SMACK-702] - RejectedExecutionException in AbstractXMPPConnection.processPacket() causes connection Termination +
  • +

4.1.4 -- 2015-09-14

diff --git a/version.gradle b/version.gradle index 3b6cfa0c9..dfb8ec0b5 100644 --- a/version.gradle +++ b/version.gradle @@ -1,7 +1,7 @@ allprojects { ext { shortVersion = '4.1.5' - isSnapshot = true + isSnapshot = false jxmppVersion = '0.4.2' smackMinAndroidSdk = 8 }