From 62cba0d96fdf9d2a307ae6c46571d351bd56448b Mon Sep 17 00:00:00 2001 From: Florian Schmaus Date: Wed, 6 Feb 2019 21:49:58 +0100 Subject: [PATCH 1/5] XMPPTCPConnection: Ensure both writer/reader threads are terminated This should fix SMACK-855. --- .../smack/tcp/XMPPTCPConnection.java | 20 ++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java b/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java index 24fcf37bc..ba71e69bf 100644 --- a/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java +++ b/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java @@ -51,6 +51,7 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Level; @@ -166,6 +167,8 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { private SSLSocket secureSocket; + private final Semaphore readerWriterSemaphore = new Semaphore(2); + /** * Protected access level because of unit test purposes */ @@ -635,8 +638,9 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { * @throws XMPPException if establishing a connection to the server fails. * @throws SmackException if the server fails to respond back or if there is anther error. * @throws IOException + * @throws InterruptedException */ - private void initConnection() throws IOException { + private void initConnection() throws IOException, InterruptedException { boolean isFirstInitialization = packetReader == null || packetWriter == null; compressionHandler = null; @@ -647,6 +651,8 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { packetWriter = new PacketWriter(); packetReader = new PacketReader(); } + + readerWriterSemaphore.acquire(2); // Start the writer thread. This will open an XMPP stream to the server packetWriter.init(); // Start the reader thread. The startup() method will block until we @@ -1014,7 +1020,11 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { Async.go(new Runnable() { @Override public void run() { - parsePackets(); + try { + parsePackets(); + } finally { + XMPPTCPConnection.this.readerWriterSemaphore.release(); + } } }, "Smack Reader (" + getConnectionCounter() + ")"); } @@ -1310,7 +1320,11 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { Async.go(new Runnable() { @Override public void run() { - writePackets(); + try { + writePackets(); + } finally { + XMPPTCPConnection.this.readerWriterSemaphore.release(); + } } }, "Smack Writer (" + getConnectionCounter() + ")"); } From 5c8e83015798ef8b897ea0c023355a4a0ed1ce39 Mon Sep 17 00:00:00 2001 From: Florian Schmaus Date: Fri, 8 Feb 2019 13:08:51 +0100 Subject: [PATCH 2/5] Log if not all reader/writer threads where terminated --- .../org/jivesoftware/smack/tcp/XMPPTCPConnection.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java b/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java index ba71e69bf..4b44e61dd 100644 --- a/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java +++ b/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java @@ -652,6 +652,14 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { packetReader = new PacketReader(); } + int availableReaderWriterSemaphorePermits = readerWriterSemaphore.availablePermits(); + if (availableReaderWriterSemaphorePermits < 2) { + Object[] logObjects = new Object[] { + this, + availableReaderWriterSemaphorePermits, + }; + LOGGER.log(Level.FINE, "Not every reader/writer threads where terminated on connection re-initializtion of {0}. Available permits {1}", logObjects); + } readerWriterSemaphore.acquire(2); // Start the writer thread. This will open an XMPP stream to the server packetWriter.init(); From 09bffb8dca5d6850c022212b58310536978ce0a3 Mon Sep 17 00:00:00 2001 From: Florian Schmaus Date: Thu, 7 Feb 2019 20:43:45 +0100 Subject: [PATCH 3/5] Fail sync points on exception in XMPPTCPConnection --- .../smack/AbstractXMPPConnection.java | 4 +- .../jivesoftware/smack/SmackException.java | 12 ++++ .../smack/SynchronizationPoint.java | 59 +++++++++++++++---- .../smack/tcp/XMPPTCPConnection.java | 7 +++ 4 files changed, 68 insertions(+), 14 deletions(-) diff --git a/smack-core/src/main/java/org/jivesoftware/smack/AbstractXMPPConnection.java b/smack-core/src/main/java/org/jivesoftware/smack/AbstractXMPPConnection.java index 52fe2138e..7a9bf2ee1 100644 --- a/smack-core/src/main/java/org/jivesoftware/smack/AbstractXMPPConnection.java +++ b/smack-core/src/main/java/org/jivesoftware/smack/AbstractXMPPConnection.java @@ -235,7 +235,7 @@ public abstract class AbstractXMPPConnection implements XMPPConnection { * stanza is send by the server. This is set to true once the last feature stanza has been * parsed. */ - protected final SynchronizationPoint lastFeaturesReceived = new SynchronizationPoint( + protected final SynchronizationPoint lastFeaturesReceived = new SynchronizationPoint<>( AbstractXMPPConnection.this, "last stream features received from server"); /** @@ -550,7 +550,7 @@ public abstract class AbstractXMPPConnection implements XMPPConnection { // - the servers last features stanza has been parsed // - the timeout occurs LOGGER.finer("Waiting for last features to be received before continuing with resource binding"); - lastFeaturesReceived.checkIfSuccessOrWait(); + lastFeaturesReceived.checkIfSuccessOrWaitOrThrow(); if (!hasFeature(Bind.ELEMENT, Bind.NAMESPACE)) { diff --git a/smack-core/src/main/java/org/jivesoftware/smack/SmackException.java b/smack-core/src/main/java/org/jivesoftware/smack/SmackException.java index 74d391539..a84b75211 100644 --- a/smack-core/src/main/java/org/jivesoftware/smack/SmackException.java +++ b/smack-core/src/main/java/org/jivesoftware/smack/SmackException.java @@ -334,4 +334,16 @@ public class SmackException extends Exception { super("Resource binding was not offered by server"); } } + + public static class SmackWrappedException extends SmackException { + + /** + * + */ + private static final long serialVersionUID = 1L; + + public SmackWrappedException(Exception exception) { + super(exception); + } + } } diff --git a/smack-core/src/main/java/org/jivesoftware/smack/SynchronizationPoint.java b/smack-core/src/main/java/org/jivesoftware/smack/SynchronizationPoint.java index 54be30a36..a7113b284 100644 --- a/smack-core/src/main/java/org/jivesoftware/smack/SynchronizationPoint.java +++ b/smack-core/src/main/java/org/jivesoftware/smack/SynchronizationPoint.java @@ -1,6 +1,6 @@ /** * - * Copyright © 2014-2015 Florian Schmaus + * Copyright © 2014-2019 Florian Schmaus * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,6 +22,7 @@ import java.util.concurrent.locks.Lock; import org.jivesoftware.smack.SmackException.NoResponseException; import org.jivesoftware.smack.SmackException.NotConnectedException; +import org.jivesoftware.smack.SmackException.SmackWrappedException; import org.jivesoftware.smack.packet.Nonza; import org.jivesoftware.smack.packet.Stanza; import org.jivesoftware.smack.packet.TopLevelStreamElement; @@ -37,6 +38,7 @@ public class SynchronizationPoint { // same memory synchronization effects as synchronization block enter and leave. private State state; private E failureException; + private SmackWrappedException smackWrappedExcpetion; /** * Construct a new synchronization point for the given connection. @@ -59,6 +61,7 @@ public class SynchronizationPoint { connectionLock.lock(); state = State.Initial; failureException = null; + smackWrappedExcpetion = null; connectionLock.unlock(); } @@ -71,7 +74,7 @@ public class SynchronizationPoint { * @throws InterruptedException if the connection is interrupted. * @return null if synchronization point was successful, or the failure Exception. */ - public E sendAndWaitForResponse(TopLevelStreamElement request) throws NoResponseException, + public Exception sendAndWaitForResponse(TopLevelStreamElement request) throws NoResponseException, NotConnectedException, InterruptedException { assert (state == State.Initial); connectionLock.lock(); @@ -103,15 +106,14 @@ public class SynchronizationPoint { * @throws NoResponseException if no response was received. * @throws NotConnectedException if the connection is not connected. * @throws InterruptedException if the connection is interrupted. + * @throws SmackWrappedException in case of a wrapped exception; */ public void sendAndWaitForResponseOrThrow(Nonza request) throws E, NoResponseException, - NotConnectedException, InterruptedException { + NotConnectedException, InterruptedException, SmackWrappedException { sendAndWaitForResponse(request); switch (state) { case Failure: - if (failureException != null) { - throw failureException; - } + throwException(); break; default: // Success, do nothing @@ -123,11 +125,12 @@ public class SynchronizationPoint { * @throws NoResponseException if there was no response marking the synchronization point as success or failed. * @throws E if there was a failure * @throws InterruptedException if the connection is interrupted. + * @throws SmackWrappedException in case of a wrapped exception; */ - public void checkIfSuccessOrWaitOrThrow() throws NoResponseException, E, InterruptedException { + public void checkIfSuccessOrWaitOrThrow() throws NoResponseException, E, InterruptedException, SmackWrappedException { checkIfSuccessOrWait(); if (state == State.Failure) { - throw failureException; + throwException(); } } @@ -137,7 +140,7 @@ public class SynchronizationPoint { * @throws InterruptedException * @return null if synchronization point was successful, or the failure Exception. */ - public E checkIfSuccessOrWait() throws NoResponseException, InterruptedException { + public Exception checkIfSuccessOrWait() throws NoResponseException, InterruptedException { connectionLock.lock(); try { switch (state) { @@ -145,7 +148,7 @@ public class SynchronizationPoint { case Success: return null; case Failure: - return failureException; + return getException(); default: // Do nothing break; @@ -198,6 +201,24 @@ public class SynchronizationPoint { } } + /** + * Report this synchronization point as failed because of the given exception. The {@code failureException} must be set. + * + * @param exception the exception causing this synchronization point to fail. + */ + public void reportGenericFailure(SmackWrappedException exception) { + assert exception != null; + connectionLock.lock(); + try { + state = State.Failure; + this.smackWrappedExcpetion = exception; + condition.signalAll(); + } + finally { + connectionLock.unlock(); + } + } + /** * Check if this synchronization point was successful. * @@ -256,6 +277,20 @@ public class SynchronizationPoint { } } + private Exception getException() { + if (failureException != null) { + return failureException; + } + return smackWrappedExcpetion; + } + + private void throwException() throws E, SmackWrappedException { + if (failureException != null) { + throw failureException; + } + throw smackWrappedExcpetion; + } + /** * Check for a response and throw a {@link NoResponseException} if there was none. *

@@ -264,7 +299,7 @@ public class SynchronizationPoint { * @return true if synchronization point was successful, false on failure. * @throws NoResponseException */ - private E checkForResponse() throws NoResponseException { + private Exception checkForResponse() throws NoResponseException { switch (state) { case Initial: case NoResponse: @@ -273,7 +308,7 @@ public class SynchronizationPoint { case Success: return null; case Failure: - return failureException; + return getException(); default: throw new AssertionError("Unknown state " + state); } diff --git a/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java b/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java index 4b44e61dd..0774e4375 100644 --- a/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java +++ b/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java @@ -84,6 +84,7 @@ import org.jivesoftware.smack.SmackException.NoResponseException; import org.jivesoftware.smack.SmackException.NotConnectedException; import org.jivesoftware.smack.SmackException.NotLoggedInException; import org.jivesoftware.smack.SmackException.SecurityRequiredByServerException; +import org.jivesoftware.smack.SmackException.SmackWrappedException; import org.jivesoftware.smack.SmackFuture; import org.jivesoftware.smack.StanzaListener; import org.jivesoftware.smack.SynchronizationPoint; @@ -937,6 +938,12 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { if ((packetReader == null || packetReader.done) && (packetWriter == null || packetWriter.done())) return; + SmackWrappedException smackWrappedException = new SmackWrappedException(e); + tlsHandled.reportGenericFailure(smackWrappedException); + saslFeatureReceived.reportGenericFailure(smackWrappedException); + maybeCompressFeaturesReceived.reportGenericFailure(smackWrappedException); + lastFeaturesReceived.reportGenericFailure(smackWrappedException); + // Closes the connection temporary. A reconnection is possible // Note that a connection listener of XMPPTCPConnection will drop the SM state in // case the Exception is a StreamErrorException. From 60f324eb1b8db2b910cc952ef6accdad22d45afc Mon Sep 17 00:00:00 2001 From: Florian Schmaus Date: Sat, 9 Feb 2019 13:59:06 +0100 Subject: [PATCH 4/5] Make writer/reader fields final in XMPPTCPConnection --- .../org/jivesoftware/smack/tcp/XMPPTCPConnection.java | 10 ++-------- .../org/jivesoftware/smack/tcp/PacketWriterTest.java | 8 +++----- 2 files changed, 5 insertions(+), 13 deletions(-) diff --git a/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java b/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java index 0774e4375..af7ffdeeb 100644 --- a/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java +++ b/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java @@ -173,12 +173,12 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { /** * Protected access level because of unit test purposes */ - protected PacketWriter packetWriter; + protected final PacketWriter packetWriter = new PacketWriter(); /** * Protected access level because of unit test purposes */ - protected PacketReader packetReader; + protected final PacketReader packetReader = new PacketReader(); private final SynchronizationPoint initialOpenStreamSend = new SynchronizationPoint<>( this, "initial open stream element send to server"); @@ -642,17 +642,11 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { * @throws InterruptedException */ private void initConnection() throws IOException, InterruptedException { - boolean isFirstInitialization = packetReader == null || packetWriter == null; compressionHandler = null; // Set the reader and writer instance variables initReaderAndWriter(); - if (isFirstInitialization) { - packetWriter = new PacketWriter(); - packetReader = new PacketReader(); - } - int availableReaderWriterSemaphorePermits = readerWriterSemaphore.availablePermits(); if (availableReaderWriterSemaphorePermits < 2) { Object[] logObjects = new Object[] { diff --git a/smack-tcp/src/test/java/org/jivesoftware/smack/tcp/PacketWriterTest.java b/smack-tcp/src/test/java/org/jivesoftware/smack/tcp/PacketWriterTest.java index c72996783..c28ec85e4 100644 --- a/smack-tcp/src/test/java/org/jivesoftware/smack/tcp/PacketWriterTest.java +++ b/smack-tcp/src/test/java/org/jivesoftware/smack/tcp/PacketWriterTest.java @@ -1,6 +1,6 @@ /** * - * Copyright 2014 Florian Schmaus + * Copyright 2014-2019 Florian Schmaus * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -49,11 +49,9 @@ public class PacketWriterTest { @Test public void shouldBlockAndUnblockTest() throws InterruptedException, BrokenBarrierException, NotConnectedException, XmppStringprepException { XMPPTCPConnection connection = new XMPPTCPConnection("user", "pass", "example.org"); - final PacketWriter pw = connection.new PacketWriter(); - connection.packetWriter = pw; - connection.packetReader = connection.new PacketReader(); + final PacketWriter pw = connection.packetWriter; connection.setWriter(new BlockingStringWriter()); - pw.init(); + connection.packetWriter.init(); for (int i = 0; i < XMPPTCPConnection.PacketWriter.QUEUE_SIZE; i++) { pw.sendStreamElement(new Message()); From 78dcaec75bbd26f795e37776d3d8b8748a03226f Mon Sep 17 00:00:00 2001 From: Florian Schmaus Date: Sat, 9 Feb 2019 18:20:55 +0100 Subject: [PATCH 5/5] Remove null checks for writer/reader fields in XMPPTCPConnection as those are never null since 60f324eb1b8db2b910cc952ef6accdad22d45afc (the previous commit). --- .../smack/SynchronizationPoint.java | 10 +++++++ .../smack/tcp/XMPPTCPConnection.java | 26 ++++++++----------- 2 files changed, 21 insertions(+), 15 deletions(-) diff --git a/smack-core/src/main/java/org/jivesoftware/smack/SynchronizationPoint.java b/smack-core/src/main/java/org/jivesoftware/smack/SynchronizationPoint.java index a7113b284..a40c3aeb5 100644 --- a/smack-core/src/main/java/org/jivesoftware/smack/SynchronizationPoint.java +++ b/smack-core/src/main/java/org/jivesoftware/smack/SynchronizationPoint.java @@ -234,6 +234,16 @@ public class SynchronizationPoint { } } + public boolean isNotInInitialState() { + connectionLock.lock(); + try { + return state != State.Initial; + } + finally { + connectionLock.unlock(); + } + } + /** * Check if this synchronization point has its request already sent. * diff --git a/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java b/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java index af7ffdeeb..8b1bdb70f 100644 --- a/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java +++ b/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java @@ -492,10 +492,8 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { // First shutdown the writer, this will result in a closing stream element getting send to // the server - if (packetWriter != null) { - LOGGER.finer("PacketWriter shutdown()"); - packetWriter.shutdown(instant); - } + LOGGER.finer("PacketWriter shutdown()"); + packetWriter.shutdown(instant); LOGGER.finer("PacketWriter has been shut down"); if (!instant) { @@ -510,10 +508,8 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { } } - if (packetReader != null) { - LOGGER.finer("PacketReader shutdown()"); - packetReader.shutdown(); - } + LOGGER.finer("PacketReader shutdown()"); + packetReader.shutdown(); LOGGER.finer("PacketReader has been shut down"); try { @@ -929,8 +925,7 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { */ private synchronized void notifyConnectionError(Exception e) { // Listeners were already notified of the exception, return right here. - if ((packetReader == null || packetReader.done) && - (packetWriter == null || packetWriter.done())) return; + if (packetReader.done && packetWriter.done()) return; SmackWrappedException smackWrappedException = new SmackWrappedException(e); tlsHandled.reportGenericFailure(smackWrappedException); @@ -1386,11 +1381,12 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { instantShutdown = instant; queue.shutdown(); shutdownTimestamp = System.currentTimeMillis(); - try { - shutdownDone.checkIfSuccessOrWait(); - } - catch (NoResponseException | InterruptedException e) { - LOGGER.log(Level.WARNING, "shutdownDone was not marked as successful by the writer thread", e); + if (shutdownDone.isNotInInitialState()) { + try { + shutdownDone.checkIfSuccessOrWait(); + } catch (NoResponseException | InterruptedException e) { + LOGGER.log(Level.WARNING, "shutdownDone was not marked as successful by the writer thread", e); + } } }