diff --git a/smack-core/src/main/java/org/jivesoftware/smack/AbstractXMPPConnection.java b/smack-core/src/main/java/org/jivesoftware/smack/AbstractXMPPConnection.java index 7b4959b36..c43747ebc 100644 --- a/smack-core/src/main/java/org/jivesoftware/smack/AbstractXMPPConnection.java +++ b/smack-core/src/main/java/org/jivesoftware/smack/AbstractXMPPConnection.java @@ -76,6 +76,7 @@ import org.jivesoftware.smack.SmackException.NotLoggedInException; import org.jivesoftware.smack.SmackException.ResourceBindingNotOfferedException; import org.jivesoftware.smack.SmackException.SecurityRequiredByClientException; import org.jivesoftware.smack.SmackException.SecurityRequiredException; +import org.jivesoftware.smack.SmackException.SmackWrappedException; import org.jivesoftware.smack.SmackFuture.InternalSmackFuture; import org.jivesoftware.smack.XMPPException.FailedNonzaException; import org.jivesoftware.smack.XMPPException.StreamErrorException; @@ -276,7 +277,7 @@ public abstract class AbstractXMPPConnection implements XMPPConnection { * stanza is send by the server. This is set to true once the last feature stanza has been * parsed. */ - protected final SynchronizationPoint lastFeaturesReceived = new SynchronizationPoint( + protected final SynchronizationPoint lastFeaturesReceived = new SynchronizationPoint<>( AbstractXMPPConnection.this, "last stream features received from server"); /** @@ -612,7 +613,7 @@ public abstract class AbstractXMPPConnection implements XMPPConnection { // - the servers last features stanza has been parsed // - the timeout occurs LOGGER.finer("Waiting for last features to be received before continuing with resource binding"); - lastFeaturesReceived.checkIfSuccessOrWait(); + lastFeaturesReceived.checkIfSuccessOrWaitOrThrow(); if (!hasFeature(Bind.ELEMENT, Bind.NAMESPACE)) { @@ -841,7 +842,10 @@ public abstract class AbstractXMPPConnection implements XMPPConnection { for (StanzaCollector collector : collectors) { collector.notifyConnectionError(exception); } - // TODO: We should also notify things like the SASL authentication machinery about the exception. + SmackWrappedException smackWrappedException = new SmackWrappedException(exception); + tlsHandled.reportGenericFailure(smackWrappedException); + saslFeatureReceived.reportGenericFailure(smackWrappedException); + lastFeaturesReceived.reportGenericFailure(smackWrappedException); // Closes the connection temporary. A if the connection supports stream management, then a reconnection is // possible. Note that a connection listener of e.g. XMPPTCPConnection will drop the SM state in diff --git a/smack-core/src/main/java/org/jivesoftware/smack/SmackException.java b/smack-core/src/main/java/org/jivesoftware/smack/SmackException.java index 5081629ac..743a3f844 100644 --- a/smack-core/src/main/java/org/jivesoftware/smack/SmackException.java +++ b/smack-core/src/main/java/org/jivesoftware/smack/SmackException.java @@ -356,4 +356,16 @@ public class SmackException extends Exception { super("Resource binding was not offered by server"); } } + + public static class SmackWrappedException extends SmackException { + + /** + * + */ + private static final long serialVersionUID = 1L; + + public SmackWrappedException(Exception exception) { + super(exception); + } + } } diff --git a/smack-core/src/main/java/org/jivesoftware/smack/SynchronizationPoint.java b/smack-core/src/main/java/org/jivesoftware/smack/SynchronizationPoint.java index 4d91b67d3..f274ad293 100644 --- a/smack-core/src/main/java/org/jivesoftware/smack/SynchronizationPoint.java +++ b/smack-core/src/main/java/org/jivesoftware/smack/SynchronizationPoint.java @@ -1,6 +1,6 @@ /** * - * Copyright © 2014-2018 Florian Schmaus + * Copyright © 2014-2019 Florian Schmaus * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,6 +22,7 @@ import java.util.concurrent.locks.Lock; import org.jivesoftware.smack.SmackException.NoResponseException; import org.jivesoftware.smack.SmackException.NotConnectedException; +import org.jivesoftware.smack.SmackException.SmackWrappedException; import org.jivesoftware.smack.packet.Nonza; import org.jivesoftware.smack.packet.Stanza; import org.jivesoftware.smack.packet.TopLevelStreamElement; @@ -37,6 +38,7 @@ public class SynchronizationPoint { // same memory synchronization effects as synchronization block enter and leave. private State state; private E failureException; + private SmackWrappedException smackWrappedExcpetion; private volatile long waitStart; @@ -62,6 +64,7 @@ public class SynchronizationPoint { connectionLock.lock(); state = State.Initial; failureException = null; + smackWrappedExcpetion = null; connectionLock.unlock(); } @@ -74,7 +77,7 @@ public class SynchronizationPoint { * @throws InterruptedException if the connection is interrupted. * @return null if synchronization point was successful, or the failure Exception. */ - public E sendAndWaitForResponse(TopLevelStreamElement request) throws NoResponseException, + public Exception sendAndWaitForResponse(TopLevelStreamElement request) throws NoResponseException, NotConnectedException, InterruptedException { assert (state == State.Initial); connectionLock.lock(); @@ -106,15 +109,14 @@ public class SynchronizationPoint { * @throws NoResponseException if no response was received. * @throws NotConnectedException if the connection is not connected. * @throws InterruptedException if the connection is interrupted. + * @throws SmackWrappedException in case of a wrapped exception; */ public void sendAndWaitForResponseOrThrow(Nonza request) throws E, NoResponseException, - NotConnectedException, InterruptedException { + NotConnectedException, InterruptedException, SmackWrappedException { sendAndWaitForResponse(request); switch (state) { case Failure: - if (failureException != null) { - throw failureException; - } + throwException(); break; default: // Success, do nothing @@ -126,11 +128,12 @@ public class SynchronizationPoint { * @throws NoResponseException if there was no response marking the synchronization point as success or failed. * @throws E if there was a failure * @throws InterruptedException if the connection is interrupted. + * @throws SmackWrappedException in case of a wrapped exception; */ - public void checkIfSuccessOrWaitOrThrow() throws NoResponseException, E, InterruptedException { + public void checkIfSuccessOrWaitOrThrow() throws NoResponseException, E, InterruptedException, SmackWrappedException { checkIfSuccessOrWait(); if (state == State.Failure) { - throw failureException; + throwException(); } } @@ -140,7 +143,7 @@ public class SynchronizationPoint { * @throws InterruptedException * @return null if synchronization point was successful, or the failure Exception. */ - public E checkIfSuccessOrWait() throws NoResponseException, InterruptedException { + public Exception checkIfSuccessOrWait() throws NoResponseException, InterruptedException { connectionLock.lock(); try { switch (state) { @@ -148,7 +151,7 @@ public class SynchronizationPoint { case Success: return null; case Failure: - return failureException; + return getException(); default: // Do nothing break; @@ -201,6 +204,24 @@ public class SynchronizationPoint { } } + /** + * Report this synchronization point as failed because of the given exception. The {@code failureException} must be set. + * + * @param exception the exception causing this synchronization point to fail. + */ + public void reportGenericFailure(SmackWrappedException exception) { + assert exception != null; + connectionLock.lock(); + try { + state = State.Failure; + this.smackWrappedExcpetion = exception; + condition.signalAll(); + } + finally { + connectionLock.unlock(); + } + } + /** * Check if this synchronization point was successful. * @@ -216,6 +237,16 @@ public class SynchronizationPoint { } } + public boolean isNotInInitialState() { + connectionLock.lock(); + try { + return state != State.Initial; + } + finally { + connectionLock.unlock(); + } + } + /** * Check if this synchronization point has its request already sent. * @@ -273,6 +304,20 @@ public class SynchronizationPoint { } } + private Exception getException() { + if (failureException != null) { + return failureException; + } + return smackWrappedExcpetion; + } + + private void throwException() throws E, SmackWrappedException { + if (failureException != null) { + throw failureException; + } + throw smackWrappedExcpetion; + } + /** * Check for a response and throw a {@link NoResponseException} if there was none. *

@@ -281,7 +326,7 @@ public class SynchronizationPoint { * @return true if synchronization point was successful, false on failure. * @throws NoResponseException */ - private E checkForResponse() throws NoResponseException { + private Exception checkForResponse() throws NoResponseException { switch (state) { case Initial: case NoResponse: @@ -290,7 +335,7 @@ public class SynchronizationPoint { case Success: return null; case Failure: - return failureException; + return getException(); default: throw new AssertionError("Unknown state " + state); } diff --git a/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java b/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java index f8d81dd0b..f9b4e50e6 100644 --- a/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java +++ b/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java @@ -44,6 +44,7 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Level; @@ -148,15 +149,17 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { private SSLSocket secureSocket; - /** - * Protected access level because of unit test purposes - */ - protected PacketWriter packetWriter; + private final Semaphore readerWriterSemaphore = new Semaphore(2); /** * Protected access level because of unit test purposes */ - protected PacketReader packetReader; + protected final PacketWriter packetWriter = new PacketWriter(); + + /** + * Protected access level because of unit test purposes + */ + protected final PacketReader packetReader = new PacketReader(); private final SynchronizationPoint initialOpenStreamSend = new SynchronizationPoint<>( this, "initial open stream element send to server"); @@ -464,20 +467,16 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { // First shutdown the writer, this will result in a closing stream element getting send to // the server - if (packetWriter != null) { - LOGGER.finer("PacketWriter shutdown()"); - packetWriter.shutdown(instant); - } + LOGGER.finer("PacketWriter shutdown()"); + packetWriter.shutdown(instant); LOGGER.finer("PacketWriter has been shut down"); if (!instant) { waitForClosingStreamTagFromServer(); } - if (packetReader != null) { - LOGGER.finer("PacketReader shutdown()"); - packetReader.shutdown(); - } + LOGGER.finer("PacketReader shutdown()"); + packetReader.shutdown(); LOGGER.finer("PacketReader has been shut down"); CloseableUtil.maybeClose(socket, LOGGER); @@ -599,18 +598,23 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { * @throws XMPPException if establishing a connection to the server fails. * @throws SmackException if the server fails to respond back or if there is anther error. * @throws IOException + * @throws InterruptedException */ - private void initConnection() throws IOException { - boolean isFirstInitialization = packetReader == null || packetWriter == null; + private void initConnection() throws IOException, InterruptedException { compressionHandler = null; // Set the reader and writer instance variables initReaderAndWriter(); - if (isFirstInitialization) { - packetWriter = new PacketWriter(); - packetReader = new PacketReader(); + int availableReaderWriterSemaphorePermits = readerWriterSemaphore.availablePermits(); + if (availableReaderWriterSemaphorePermits < 2) { + Object[] logObjects = new Object[] { + this, + availableReaderWriterSemaphorePermits, + }; + LOGGER.log(Level.FINE, "Not every reader/writer threads where terminated on connection re-initializtion of {0}. Available permits {1}", logObjects); } + readerWriterSemaphore.acquire(2); // Start the writer thread. This will open an XMPP stream to the server packetWriter.init(); // Start the reader thread. The startup() method will block until we @@ -841,7 +845,11 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { Async.go(new Runnable() { @Override public void run() { - parsePackets(); + try { + parsePackets(); + } finally { + XMPPTCPConnection.this.readerWriterSemaphore.release(); + } } }, "Smack Reader (" + getConnectionCounter() + ")"); } @@ -1132,7 +1140,11 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { Async.go(new Runnable() { @Override public void run() { - writePackets(); + try { + writePackets(); + } finally { + XMPPTCPConnection.this.readerWriterSemaphore.release(); + } } }, "Smack Writer (" + getConnectionCounter() + ")"); } @@ -1185,11 +1197,12 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { instantShutdown = instant; queue.shutdown(); shutdownTimestamp = System.currentTimeMillis(); - try { - shutdownDone.checkIfSuccessOrWait(); - } - catch (NoResponseException | InterruptedException e) { - LOGGER.log(Level.WARNING, "shutdownDone was not marked as successful by the writer thread", e); + if (shutdownDone.isNotInInitialState()) { + try { + shutdownDone.checkIfSuccessOrWait(); + } catch (NoResponseException | InterruptedException e) { + LOGGER.log(Level.WARNING, "shutdownDone was not marked as successful by the writer thread", e); + } } } diff --git a/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XmppNioTcpConnection.java b/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XmppNioTcpConnection.java index efc2f322e..e37b407c9 100644 --- a/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XmppNioTcpConnection.java +++ b/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XmppNioTcpConnection.java @@ -62,6 +62,7 @@ import org.jivesoftware.smack.SmackException.NoResponseException; import org.jivesoftware.smack.SmackException.NotConnectedException; import org.jivesoftware.smack.SmackException.SecurityRequiredByClientException; import org.jivesoftware.smack.SmackException.SecurityRequiredByServerException; +import org.jivesoftware.smack.SmackException.SmackWrappedException; import org.jivesoftware.smack.SmackReactor.ChannelSelectedCallback; import org.jivesoftware.smack.SmackReactor.SelectionKeyAttachment; import org.jivesoftware.smack.SynchronizationPoint; @@ -920,7 +921,12 @@ public class XmppNioTcpConnection extends AbstractXmppNioConnection { failedAddresses, this); connectionAttemptState.establishTcpConnection(); - connectionAttemptState.tcpConnectionEstablishedSyncPoint.checkIfSuccessOrWaitOrThrow(); + try { + connectionAttemptState.tcpConnectionEstablishedSyncPoint.checkIfSuccessOrWaitOrThrow(); + } catch (SmackWrappedException e) { + // Should never throw SmackWrappedException. + throw new AssertionError(e); + } socketChannel = connectionAttemptState.socketChannel; remoteAddress = (InetSocketAddress) socketChannel.socket().getRemoteSocketAddress(); diff --git a/smack-tcp/src/test/java/org/jivesoftware/smack/tcp/PacketWriterTest.java b/smack-tcp/src/test/java/org/jivesoftware/smack/tcp/PacketWriterTest.java index c72996783..c28ec85e4 100644 --- a/smack-tcp/src/test/java/org/jivesoftware/smack/tcp/PacketWriterTest.java +++ b/smack-tcp/src/test/java/org/jivesoftware/smack/tcp/PacketWriterTest.java @@ -1,6 +1,6 @@ /** * - * Copyright 2014 Florian Schmaus + * Copyright 2014-2019 Florian Schmaus * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -49,11 +49,9 @@ public class PacketWriterTest { @Test public void shouldBlockAndUnblockTest() throws InterruptedException, BrokenBarrierException, NotConnectedException, XmppStringprepException { XMPPTCPConnection connection = new XMPPTCPConnection("user", "pass", "example.org"); - final PacketWriter pw = connection.new PacketWriter(); - connection.packetWriter = pw; - connection.packetReader = connection.new PacketReader(); + final PacketWriter pw = connection.packetWriter; connection.setWriter(new BlockingStringWriter()); - pw.init(); + connection.packetWriter.init(); for (int i = 0; i < XMPPTCPConnection.PacketWriter.QUEUE_SIZE; i++) { pw.sendStreamElement(new Message());