From 57961a8cc1f2df6ecc1afa8c4f8460794d8d2dce Mon Sep 17 00:00:00 2001 From: Florian Schmaus Date: Tue, 26 May 2020 21:39:08 +0200 Subject: [PATCH] Remove SynchronizationPoint This continues the design started with e98d42790 ("SmackReactor/NIO, Java8/Android19, Pretty print XML, FSM connections"), where the exceptions that caused an operation to fail, are not recorded within SynchronizationPoint but within the connection instance itself. --- .../smack/AbstractXMPPConnection.java | 163 +++++--- .../jivesoftware/smack/SmackException.java | 25 +- .../smack/SynchronizationPoint.java | 352 ------------------ .../smack/XmppInputOutputFilter.java | 3 +- .../ModularXmppClientToServerConnection.java | 84 ++--- ...rXmppClientToServerConnectionInternal.java | 10 +- .../smack/compression/CompressionModule.java | 9 +- .../org/jivesoftware/smack/fsm/State.java | 7 +- .../smack/fsm/StateTransitionResult.java | 8 + .../org/jivesoftware/smack/util/Supplier.java | 24 ++ .../WaitForClosingStreamElementTest.java | 9 +- .../smack/tcp/ConnectionAttemptState.java | 58 ++- .../smack/tcp/XMPPTCPConnection.java | 217 +++++------ .../smack/tcp/XmppTcpTransportModule.java | 63 +--- .../smack/tcp/PacketWriterTest.java | 4 +- 15 files changed, 352 insertions(+), 684 deletions(-) delete mode 100644 smack-core/src/main/java/org/jivesoftware/smack/SynchronizationPoint.java create mode 100644 smack-core/src/main/java/org/jivesoftware/smack/util/Supplier.java diff --git a/smack-core/src/main/java/org/jivesoftware/smack/AbstractXMPPConnection.java b/smack-core/src/main/java/org/jivesoftware/smack/AbstractXMPPConnection.java index 136976a09..2d0f7c693 100644 --- a/smack-core/src/main/java/org/jivesoftware/smack/AbstractXMPPConnection.java +++ b/smack-core/src/main/java/org/jivesoftware/smack/AbstractXMPPConnection.java @@ -111,6 +111,7 @@ import org.jivesoftware.smack.util.PacketParserUtils; import org.jivesoftware.smack.util.ParserUtils; import org.jivesoftware.smack.util.Predicate; import org.jivesoftware.smack.util.StringUtils; +import org.jivesoftware.smack.util.Supplier; import org.jivesoftware.smack.xml.XmlPullParser; import org.jivesoftware.smack.xml.XmlPullParserException; @@ -174,6 +175,12 @@ public abstract class AbstractXMPPConnection implements XMPPConnection { SmackConfiguration.getVersion(); } + protected enum SyncPointState { + initial, + request_sent, + successful, + } + /** * A collection of ConnectionListeners which listen for connection closing * and reconnection events. @@ -271,30 +278,29 @@ public abstract class AbstractXMPPConnection implements XMPPConnection { */ protected Writer writer; - protected final SynchronizationPoint tlsHandled = new SynchronizationPoint<>(this, "establishing TLS"); + protected SmackException currentSmackException; + protected XMPPException currentXmppException; + + protected boolean tlsHandled; /** - * Set to success if the last features stanza from the server has been parsed. A XMPP connection + * Set to true if the last features stanza from the server has been parsed. A XMPP connection * handshake can invoke multiple features stanzas, e.g. when TLS is activated a second feature * stanza is send by the server. This is set to true once the last feature stanza has been * parsed. */ - protected final SynchronizationPoint lastFeaturesReceived = new SynchronizationPoint<>( - AbstractXMPPConnection.this, "last stream features received from server"); + protected boolean lastFeaturesReceived; /** - * Set to success if the SASL feature has been received. + * Set to true if the SASL feature has been received. */ - protected final SynchronizationPoint saslFeatureReceived = new SynchronizationPoint<>( - AbstractXMPPConnection.this, "SASL mechanisms stream feature from server"); - + protected boolean saslFeatureReceived; /** * A synchronization point which is successful if this connection has received the closing * stream element from the remote end-point, i.e. the server. */ - protected final SynchronizationPoint closingStreamReceived = new SynchronizationPoint<>( - this, "stream closing element received"); + protected boolean closingStreamReceived; /** * The SASLAuthentication manager that is responsible for authenticating with the server. @@ -369,8 +375,6 @@ public abstract class AbstractXMPPConnection implements XMPPConnection { */ protected boolean wasAuthenticated = false; - protected Exception currentConnectionException; - private final Map setIqRequestHandler = new HashMap<>(); private final Map getIqRequestHandler = new HashMap<>(); @@ -486,10 +490,10 @@ public abstract class AbstractXMPPConnection implements XMPPConnection { public abstract boolean isUsingCompression(); protected void initState() { - saslFeatureReceived.init(); - lastFeaturesReceived.init(); - tlsHandled.init(); - // TODO: We do not init() closingStreamReceived here, as the integration tests use it to check if we waited for + currentSmackException = null; + currentXmppException = null; + saslFeatureReceived = lastFeaturesReceived = tlsHandled = false; + // TODO: We do not init closingStreamReceived here, as the integration tests use it to check if we waited for // it. } @@ -512,7 +516,7 @@ public abstract class AbstractXMPPConnection implements XMPPConnection { // Reset the connection state initState(); - closingStreamReceived.init(); + closingStreamReceived = false; streamId = null; try { @@ -657,15 +661,82 @@ public abstract class AbstractXMPPConnection implements XMPPConnection { return streamId; } - protected Resourcepart bindResourceAndEstablishSession(Resourcepart resource) throws XMPPErrorException, - SmackException, InterruptedException { + protected final void throwCurrentConnectionException() throws SmackException, XMPPException { + if (currentSmackException != null) { + throw currentSmackException; + } else if (currentXmppException != null) { + throw currentXmppException; + } + throw new AssertionError("No current connection exception set, although throwCurrentException() was called"); + } + + protected final boolean hasCurrentConnectionException() { + return currentSmackException != null || currentXmppException != null; + } + + protected final void setCurrentConnectionExceptionAndNotify(Exception exception) { + if (exception instanceof SmackException) { + currentSmackException = (SmackException) exception; + } else if (exception instanceof XMPPException) { + currentXmppException = (XMPPException) exception; + } else { + currentSmackException = new SmackException.SmackWrappedException(exception); + } + + notifyWaitingThreads(); + } + + /** + * We use an extra object for {@link #notifyWaitingThreads()} and {@link #waitForCondition(Supplier)}, because all state + * changing methods of the connection are synchronized using the connection instance as monitor. If we now would + * also use the connection instance for the internal process to wait for a condition, the {@link Object#wait()} + * would leave the monitor when it waites, which would allow for another potential call to a state changing function + * to proceed. + */ + private final Object internalMonitor = new Object(); + + protected final void notifyWaitingThreads() { + synchronized (internalMonitor) { + internalMonitor.notifyAll(); + } + } + + protected final boolean waitForCondition(Supplier condition) throws InterruptedException { + final long deadline = System.currentTimeMillis() + getReplyTimeout(); + synchronized (internalMonitor) { + while (!condition.get().booleanValue() && !hasCurrentConnectionException()) { + final long now = System.currentTimeMillis(); + if (now >= deadline) { + return false; + } + internalMonitor.wait(deadline - now); + } + } + return true; + } + + protected final void waitForCondition(Supplier condition, String waitFor) throws InterruptedException, NoResponseException { + boolean success = waitForCondition(condition); + if (!success) { + throw NoResponseException.newWith(this, waitFor); + } + } + + protected final void waitForConditionOrThrowConnectionException(Supplier condition, String waitFor) throws InterruptedException, SmackException, XMPPException { + waitForCondition(condition, waitFor); + if (hasCurrentConnectionException()) { + throwCurrentConnectionException(); + } + } + + protected Resourcepart bindResourceAndEstablishSession(Resourcepart resource) + throws SmackException, InterruptedException, XMPPException { // Wait until either: // - the servers last features stanza has been parsed // - the timeout occurs LOGGER.finer("Waiting for last features to be received before continuing with resource binding"); - lastFeaturesReceived.checkIfSuccessOrWaitOrThrow(); - + waitForConditionOrThrowConnectionException(() -> lastFeaturesReceived, "last stream features received from server"); if (!hasFeature(Bind.ELEMENT, Bind.NAMESPACE)) { // Server never offered resource binding, which is REQUIRED in XMPP client and @@ -905,29 +976,20 @@ public abstract class AbstractXMPPConnection implements XMPPConnection { return; } + // TODO: Remove this async but ordered? ASYNC_BUT_ORDERED.performAsyncButOrdered(this, () -> { - currentConnectionException = exception; + // Note that we first have to set the current connection exception and notify waiting threads, as one of them + // could hold the instance lock, which we also need later when calling instantShutdown(). + setCurrentConnectionExceptionAndNotify(exception); + + // Closes the connection temporary. A if the connection supports stream management, then a reconnection is + // possible. Note that a connection listener of e.g. XMPPTCPConnection will drop the SM state in + // case the Exception is a StreamErrorException. + instantShutdown(); for (StanzaCollector collector : collectors) { collector.notifyConnectionError(exception); } - SmackWrappedException smackWrappedException = new SmackWrappedException(exception); - tlsHandled.reportGenericFailure(smackWrappedException); - saslFeatureReceived.reportGenericFailure(smackWrappedException); - lastFeaturesReceived.reportGenericFailure(smackWrappedException); - closingStreamReceived.reportFailure(smackWrappedException); - // TODO From XMPPTCPConnection. Was called in Smack 4.3 where notifyConnectionError() was part of - // XMPPTCPConnection. Create delegation method? - // maybeCompressFeaturesReceived.reportGenericFailure(smackWrappedException); - - synchronized (AbstractXMPPConnection.this) { - notifyAll(); - - // Closes the connection temporary. A if the connection supports stream management, then a reconnection is - // possible. Note that a connection listener of e.g. XMPPTCPConnection will drop the SM state in - // case the Exception is a StreamErrorException. - instantShutdown(); - } Async.go(() -> { // Notify connection listeners of the error. @@ -947,19 +1009,13 @@ public abstract class AbstractXMPPConnection implements XMPPConnection { protected abstract void shutdown(); protected final boolean waitForClosingStreamTagFromServer() { - Exception exception; try { - // After we send the closing stream element, check if there was already a - // closing stream element sent by the server or wait with a timeout for a - // closing stream element to be received from the server. - exception = closingStreamReceived.checkIfSuccessOrWait(); - } catch (InterruptedException | NoResponseException e) { - exception = e; + waitForConditionOrThrowConnectionException(() -> closingStreamReceived, "closing stream tag from the server"); + } catch (InterruptedException | SmackException | XMPPException e) { + LOGGER.log(Level.INFO, "Exception while waiting for closing stream element from the server " + this, e); + return false; } - if (exception != null) { - LOGGER.log(Level.INFO, "Exception while waiting for closing stream element from the server " + this, exception); - } - return exception == null; + return true; } @Override @@ -1817,8 +1873,8 @@ public abstract class AbstractXMPPConnection implements XMPPConnection { // Only proceed with SASL auth if TLS is disabled or if the server doesn't announce it if (!hasFeature(StartTls.ELEMENT, StartTls.NAMESPACE) || config.getSecurityMode() == SecurityMode.disabled) { - tlsHandled.reportSuccess(); - saslFeatureReceived.reportSuccess(); + tlsHandled = saslFeatureReceived = true; + notifyWaitingThreads(); } } @@ -1829,7 +1885,8 @@ public abstract class AbstractXMPPConnection implements XMPPConnection { || !config.isCompressionEnabled()) { // This was was last features from the server is either it did not contain // compression or if we disabled it - lastFeaturesReceived.reportSuccess(); + lastFeaturesReceived = true; + notifyWaitingThreads(); } } afterFeaturesReceived(); diff --git a/smack-core/src/main/java/org/jivesoftware/smack/SmackException.java b/smack-core/src/main/java/org/jivesoftware/smack/SmackException.java index f58ed10a4..3d2c0b851 100644 --- a/smack-core/src/main/java/org/jivesoftware/smack/SmackException.java +++ b/smack-core/src/main/java/org/jivesoftware/smack/SmackException.java @@ -16,6 +16,7 @@ */ package org.jivesoftware.smack; +import java.security.cert.CertificateException; import java.util.Collections; import java.util.List; @@ -371,15 +372,6 @@ public abstract class SmackException extends Exception { } } - public static class ConnectionUnexpectedTerminatedException extends SmackException { - - private static final long serialVersionUID = 1L; - - public ConnectionUnexpectedTerminatedException(Throwable wrappedThrowable) { - super(wrappedThrowable); - } - } - public static class FeatureNotSupportedException extends SmackException { /** @@ -487,4 +479,19 @@ public abstract class SmackException extends Exception { super(message, exception); } } + + public static class SmackCertificateException extends SmackException { + + private static final long serialVersionUID = 1L; + + private final CertificateException certificateException; + + public SmackCertificateException(CertificateException certificateException) { + this.certificateException = certificateException; + } + + public CertificateException getCertificateException() { + return certificateException; + } + } } diff --git a/smack-core/src/main/java/org/jivesoftware/smack/SynchronizationPoint.java b/smack-core/src/main/java/org/jivesoftware/smack/SynchronizationPoint.java deleted file mode 100644 index 7652a48e0..000000000 --- a/smack-core/src/main/java/org/jivesoftware/smack/SynchronizationPoint.java +++ /dev/null @@ -1,352 +0,0 @@ -/** - * - * Copyright © 2014-2019 Florian Schmaus - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.jivesoftware.smack; - -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; - -import org.jivesoftware.smack.SmackException.NoResponseException; -import org.jivesoftware.smack.SmackException.NotConnectedException; -import org.jivesoftware.smack.SmackException.SmackWrappedException; -import org.jivesoftware.smack.packet.Nonza; -import org.jivesoftware.smack.packet.Stanza; -import org.jivesoftware.smack.packet.TopLevelStreamElement; - -public class SynchronizationPoint { - - private final AbstractXMPPConnection connection; - private final Lock connectionLock; - private final Condition condition; - private final String waitFor; - - // Note that there is no need to make 'state' and 'failureException' volatile. Since 'lock' and 'unlock' have the - // same memory synchronization effects as synchronization block enter and leave. - private State state; - private E failureException; - private SmackWrappedException smackWrappedExcpetion; - - private volatile long waitStart; - - /** - * Construct a new synchronization point for the given connection. - * - * @param connection the connection of this synchronization point. - * @param waitFor a description of the event this synchronization point handles. - */ - public SynchronizationPoint(AbstractXMPPConnection connection, String waitFor) { - this.connection = connection; - this.connectionLock = connection.getConnectionLock(); - this.condition = connection.getConnectionLock().newCondition(); - this.waitFor = waitFor; - init(); - } - - /** - * Initialize (or reset) this synchronization point. - */ - @SuppressWarnings("LockNotBeforeTry") - public void init() { - connectionLock.lock(); - state = State.Initial; - failureException = null; - smackWrappedExcpetion = null; - connectionLock.unlock(); - } - - /** - * Send the given top level stream element and wait for a response. - * - * @param request the plain stream element to send. - * @throws NoResponseException if no response was received. - * @throws NotConnectedException if the connection is not connected. - * @throws InterruptedException if the connection is interrupted. - * @return null if synchronization point was successful, or the failure Exception. - */ - public Exception sendAndWaitForResponse(TopLevelStreamElement request) throws NoResponseException, - NotConnectedException, InterruptedException { - assert state == State.Initial; - connectionLock.lock(); - try { - if (request != null) { - if (request instanceof Stanza) { - connection.sendStanza((Stanza) request); - } - else if (request instanceof Nonza) { - connection.sendNonza((Nonza) request); - } else { - throw new IllegalStateException("Unsupported element type"); - } - state = State.RequestSent; - } - waitForConditionOrTimeout(); - } - finally { - connectionLock.unlock(); - } - return checkForResponse(); - } - - /** - * Send the given plain stream element and wait for a response. - * - * @param request the plain stream element to send. - * @throws E if an failure was reported. - * @throws NoResponseException if no response was received. - * @throws NotConnectedException if the connection is not connected. - * @throws InterruptedException if the connection is interrupted. - * @throws SmackWrappedException in case of a wrapped exception; - */ - public void sendAndWaitForResponseOrThrow(Nonza request) throws E, NoResponseException, - NotConnectedException, InterruptedException, SmackWrappedException { - sendAndWaitForResponse(request); - switch (state) { - case Failure: - throwException(); - break; - default: - // Success, do nothing - } - } - - /** - * Check if this synchronization point is successful or wait the connections reply timeout. - * @throws NoResponseException if there was no response marking the synchronization point as success or failed. - * @throws E if there was a failure - * @throws InterruptedException if the connection is interrupted. - * @throws SmackWrappedException in case of a wrapped exception; - */ - public void checkIfSuccessOrWaitOrThrow() throws NoResponseException, E, InterruptedException, SmackWrappedException { - checkIfSuccessOrWait(); - if (state == State.Failure) { - throwException(); - } - } - - /** - * Check if this synchronization point is successful or wait the connections reply timeout. - * @throws NoResponseException if there was no response marking the synchronization point as success or failed. - * @throws InterruptedException if the calling thread was interrupted. - * @return null if synchronization point was successful, or the failure Exception. - */ - public Exception checkIfSuccessOrWait() throws NoResponseException, InterruptedException { - connectionLock.lock(); - try { - switch (state) { - // Return immediately on success or failure - case Success: - return null; - case Failure: - return getException(); - default: - // Do nothing - break; - } - waitForConditionOrTimeout(); - } finally { - connectionLock.unlock(); - } - return checkForResponse(); - } - - /** - * Report this synchronization point as successful. - */ - public void reportSuccess() { - connectionLock.lock(); - try { - state = State.Success; - condition.signalAll(); - } - finally { - connectionLock.unlock(); - } - } - - /** - * Deprecated. - * @deprecated use {@link #reportFailure(Exception)} instead. - */ - @Deprecated - public void reportFailure() { - reportFailure(null); - } - - /** - * Report this synchronization point as failed because of the given exception. The {@code failureException} must be set. - * - * @param failureException the exception causing this synchronization point to fail. - */ - public void reportFailure(E failureException) { - assert failureException != null; - connectionLock.lock(); - try { - state = State.Failure; - this.failureException = failureException; - condition.signalAll(); - } - finally { - connectionLock.unlock(); - } - } - - /** - * Report this synchronization point as failed because of the given exception. The {@code failureException} must be set. - * - * @param exception the exception causing this synchronization point to fail. - */ - public void reportGenericFailure(SmackWrappedException exception) { - assert exception != null; - connectionLock.lock(); - try { - state = State.Failure; - this.smackWrappedExcpetion = exception; - condition.signalAll(); - } - finally { - connectionLock.unlock(); - } - } - - /** - * Check if this synchronization point was successful. - * - * @return true if the synchronization point was successful, false otherwise. - */ - public boolean wasSuccessful() { - connectionLock.lock(); - try { - return state == State.Success; - } - finally { - connectionLock.unlock(); - } - } - - public boolean isNotInInitialState() { - connectionLock.lock(); - try { - return state != State.Initial; - } - finally { - connectionLock.unlock(); - } - } - - /** - * Check if this synchronization point has its request already sent. - * - * @return true if the request was already sent, false otherwise. - */ - public boolean requestSent() { - connectionLock.lock(); - try { - return state == State.RequestSent; - } - finally { - connectionLock.unlock(); - } - } - - public E getFailureException() { - connectionLock.lock(); - try { - return failureException; - } - finally { - connectionLock.unlock(); - } - } - - public void resetTimeout() { - waitStart = System.currentTimeMillis(); - } - - /** - * Wait for the condition to become something else as {@link State#RequestSent} or {@link State#Initial}. - * {@link #reportSuccess()}, {@link #reportFailure()} and {@link #reportFailure(Exception)} will either set this - * synchronization point to {@link State#Success} or {@link State#Failure}. If none of them is set after the - * connections reply timeout, this method will set the state of {@link State#NoResponse}. - * @throws InterruptedException if the calling thread was interrupted. - */ - private void waitForConditionOrTimeout() throws InterruptedException { - waitStart = System.currentTimeMillis(); - while (state == State.RequestSent || state == State.Initial) { - long timeout = connection.getReplyTimeout(); - long remainingWaitMillis = timeout - (System.currentTimeMillis() - waitStart); - long remainingWait = TimeUnit.MILLISECONDS.toNanos(remainingWaitMillis); - - if (remainingWait <= 0) { - state = State.NoResponse; - break; - } - - try { - condition.awaitNanos(remainingWait); - } catch (InterruptedException e) { - state = State.Interrupted; - throw e; - } - } - } - - private Exception getException() { - if (failureException != null) { - return failureException; - } - return smackWrappedExcpetion; - } - - private void throwException() throws E, SmackWrappedException { - if (failureException != null) { - throw failureException; - } - throw smackWrappedExcpetion; - } - - /** - * Check for a response and throw a {@link NoResponseException} if there was none. - *

- * The exception is thrown, if state is one of 'Initial', 'NoResponse' or 'RequestSent' - *

- * @return true if synchronization point was successful, false on failure. - * @throws NoResponseException if there was no response from the remote entity. - */ - private Exception checkForResponse() throws NoResponseException { - switch (state) { - case Initial: - case NoResponse: - case RequestSent: - throw NoResponseException.newWith(connection, waitFor); - case Success: - return null; - case Failure: - return getException(); - default: - throw new AssertionError("Unknown state " + state); - } - } - - private enum State { - Initial, - RequestSent, - NoResponse, - Success, - Failure, - Interrupted, - } -} diff --git a/smack-core/src/main/java/org/jivesoftware/smack/XmppInputOutputFilter.java b/smack-core/src/main/java/org/jivesoftware/smack/XmppInputOutputFilter.java index 18432f564..97fdff246 100644 --- a/smack-core/src/main/java/org/jivesoftware/smack/XmppInputOutputFilter.java +++ b/smack-core/src/main/java/org/jivesoftware/smack/XmppInputOutputFilter.java @@ -67,7 +67,8 @@ public interface XmppInputOutputFilter { default void closeInputOutput() { } - default void waitUntilInputOutputClosed() throws IOException, NoResponseException, CertificateException, InterruptedException, SmackException { + default void waitUntilInputOutputClosed() throws IOException, NoResponseException, CertificateException, + InterruptedException, SmackException, XMPPException { } Object getStats(); diff --git a/smack-core/src/main/java/org/jivesoftware/smack/c2s/ModularXmppClientToServerConnection.java b/smack-core/src/main/java/org/jivesoftware/smack/c2s/ModularXmppClientToServerConnection.java index 061d25e7a..ac8087540 100644 --- a/smack-core/src/main/java/org/jivesoftware/smack/c2s/ModularXmppClientToServerConnection.java +++ b/smack-core/src/main/java/org/jivesoftware/smack/c2s/ModularXmppClientToServerConnection.java @@ -35,7 +35,6 @@ import javax.net.ssl.SSLSession; import org.jivesoftware.smack.AbstractXMPPConnection; import org.jivesoftware.smack.SmackException; -import org.jivesoftware.smack.SmackException.ConnectionUnexpectedTerminatedException; import org.jivesoftware.smack.SmackException.NoResponseException; import org.jivesoftware.smack.SmackException.NotConnectedException; import org.jivesoftware.smack.SmackFuture; @@ -78,6 +77,7 @@ import org.jivesoftware.smack.util.ArrayBlockingQueueWithShutdown; import org.jivesoftware.smack.util.ExtendedAppendable; import org.jivesoftware.smack.util.PacketParserUtils; import org.jivesoftware.smack.util.StringUtils; +import org.jivesoftware.smack.util.Supplier; import org.jivesoftware.smack.xml.XmlPullParser; import org.jivesoftware.smack.xml.XmlPullParserException; @@ -142,7 +142,8 @@ public final class ModularXmppClientToServerConnection extends AbstractXMPPConne @Override public void onStreamClosed() { - ModularXmppClientToServerConnection.this.closingStreamReceived.reportSuccess(); + ModularXmppClientToServerConnection.this.closingStreamReceived = true; + notifyWaitingThreads(); } @Override @@ -177,7 +178,7 @@ public final class ModularXmppClientToServerConnection extends AbstractXMPPConne @Override public void newStreamOpenWaitForFeaturesSequence(String waitFor) throws InterruptedException, - ConnectionUnexpectedTerminatedException, NoResponseException, NotConnectedException { + SmackException, XMPPException { ModularXmppClientToServerConnection.this.newStreamOpenWaitForFeaturesSequence(waitFor); } @@ -198,8 +199,14 @@ public final class ModularXmppClientToServerConnection extends AbstractXMPPConne } @Override - public Exception getCurrentConnectionException() { - return ModularXmppClientToServerConnection.this.currentConnectionException; + public void waitForCondition(Supplier condition, String waitFor) + throws InterruptedException, SmackException, XMPPException { + ModularXmppClientToServerConnection.this.waitForConditionOrThrowConnectionException(condition, waitFor); + } + + @Override + public void notifyWaitingThreads() { + ModularXmppClientToServerConnection.this.notifyWaitingThreads(); } @Override @@ -263,14 +270,13 @@ public final class ModularXmppClientToServerConnection extends AbstractXMPPConne revertedState.resetState(); } - protected void walkStateGraph(WalkStateGraphContext walkStateGraphContext) throws XMPPErrorException, - SASLErrorException, FailedNonzaException, IOException, SmackException, InterruptedException { + protected void walkStateGraph(WalkStateGraphContext walkStateGraphContext) + throws XMPPException, IOException, SmackException, InterruptedException { // Save a copy of the current state GraphVertex previousStateVertex = currentStateVertex; try { walkStateGraphInternal(walkStateGraphContext); - } catch (XMPPErrorException | SASLErrorException | FailedNonzaException | IOException | SmackException - | InterruptedException e) { + } catch (IOException | SmackException | InterruptedException | XMPPException e) { currentStateVertex = previousStateVertex; // Unwind the state. State revertedState = currentStateVertex.getElement(); @@ -279,8 +285,8 @@ public final class ModularXmppClientToServerConnection extends AbstractXMPPConne } } - private void walkStateGraphInternal(WalkStateGraphContext walkStateGraphContext) throws XMPPErrorException, - SASLErrorException, IOException, SmackException, InterruptedException, FailedNonzaException { + private void walkStateGraphInternal(WalkStateGraphContext walkStateGraphContext) + throws IOException, SmackException, InterruptedException, XMPPException { // Save a copy of the current state final GraphVertex initialStateVertex = currentStateVertex; final State initialState = initialStateVertex.getElement(); @@ -359,15 +365,13 @@ public final class ModularXmppClientToServerConnection extends AbstractXMPPConne * @param walkStateGraphContext the "walk state graph" context. * @return A state transition result or null if this state can be ignored. * @throws SmackException if Smack detected an exceptional situation. - * @throws XMPPErrorException if an XMPP protocol error was received. - * @throws SASLErrorException if a SASL protocol error was returned. + * @throws XMPPException if an XMPP protocol error was received. * @throws IOException if an I/O error occurred. * @throws InterruptedException if the calling thread was interrupted. - * @throws FailedNonzaException if an XMPP protocol failure was received. */ private StateTransitionResult attemptEnterState(GraphVertex successorStateVertex, - WalkStateGraphContext walkStateGraphContext) throws SmackException, XMPPErrorException, - SASLErrorException, IOException, InterruptedException, FailedNonzaException { + WalkStateGraphContext walkStateGraphContext) throws SmackException, XMPPException, + IOException, InterruptedException { final GraphVertex initialStateVertex = currentStateVertex; final State initialState = initialStateVertex.getElement(); final State successorState = successorStateVertex.getElement(); @@ -400,8 +404,7 @@ public final class ModularXmppClientToServerConnection extends AbstractXMPPConne invokeConnectionStateMachineListener(new ConnectionStateEvent.AboutToTransitionInto(initialState, successorState)); transitionAttemptResult = successorState.transitionInto(walkStateGraphContext); - } catch (SmackException | XMPPErrorException | SASLErrorException | IOException | InterruptedException - | FailedNonzaException e) { + } catch (SmackException | IOException | InterruptedException | XMPPException e) { // Unwind the state here too, since this state will not be unwound by walkStateGraph(), as it will not // become a predecessor state in the walk. unwindState(successorState); @@ -474,8 +477,7 @@ public final class ModularXmppClientToServerConnection extends AbstractXMPPConne try { walkStateGraph(context); - } catch (XMPPErrorException | SASLErrorException | IOException | SmackException | InterruptedException - | FailedNonzaException e) { + } catch (IOException | SmackException | InterruptedException | XMPPException e) { throw new IllegalStateException("A walk to disconnected state should never throw", e); } } @@ -491,9 +493,7 @@ public final class ModularXmppClientToServerConnection extends AbstractXMPPConne @Override protected void afterFeaturesReceived() { featuresReceived = true; - synchronized (this) { - notifyAll(); - } + notifyWaitingThreads(); } protected void parseAndProcessElement(String element) { @@ -522,8 +522,10 @@ public final class ModularXmppClientToServerConnection extends AbstractXMPPConne break; case "error": StreamError streamError = PacketParserUtils.parseStreamError(parser, null); - saslFeatureReceived.reportFailure(new StreamErrorException(streamError)); - throw new StreamErrorException(streamError); + StreamErrorException streamErrorException = new StreamErrorException(streamError); + currentXmppException = streamErrorException; + notifyWaitingThreads(); + throw streamErrorException; case "features": parseFeatures(parser); afterFeaturesReceived(); @@ -550,25 +552,12 @@ public final class ModularXmppClientToServerConnection extends AbstractXMPPConne } protected void waitForFeaturesReceived(String waitFor) - throws InterruptedException, ConnectionUnexpectedTerminatedException, NoResponseException { - long waitStartMs = System.currentTimeMillis(); - long timeoutMs = getReplyTimeout(); - synchronized (this) { - while (!featuresReceived && currentConnectionException == null) { - long remainingWaitMs = timeoutMs - (System.currentTimeMillis() - waitStartMs); - if (remainingWaitMs <= 0) { - throw NoResponseException.newWith(this, waitFor); - } - wait(remainingWaitMs); - } - if (currentConnectionException != null) { - throw new SmackException.ConnectionUnexpectedTerminatedException(currentConnectionException); - } - } + throws InterruptedException, SmackException, XMPPException { + waitForConditionOrThrowConnectionException(() -> featuresReceived, waitFor); } protected void newStreamOpenWaitForFeaturesSequence(String waitFor) throws InterruptedException, - ConnectionUnexpectedTerminatedException, NoResponseException, NotConnectedException { + SmackException, XMPPException { prepareToWaitForFeaturesReceived(); sendStreamOpen(); waitForFeaturesReceived(waitFor); @@ -763,8 +752,7 @@ public final class ModularXmppClientToServerConnection extends AbstractXMPPConne @Override public StateTransitionResult.AttemptResult transitionInto(WalkStateGraphContext walkStateGraphContext) - throws XMPPErrorException, SASLErrorException, IOException, SmackException, - InterruptedException { + throws IOException, SmackException, InterruptedException, XMPPException { prepareToWaitForFeaturesReceived(); LoginContext loginContext = walkStateGraphContext.getLoginContext(); @@ -813,12 +801,12 @@ public final class ModularXmppClientToServerConnection extends AbstractXMPPConne @Override public StateTransitionResult.AttemptResult transitionInto(WalkStateGraphContext walkStateGraphContext) - throws XMPPErrorException, SASLErrorException, IOException, SmackException, - InterruptedException { + throws IOException, SmackException, InterruptedException, XMPPException { // Calling bindResourceAndEstablishSession() below requires the lastFeaturesReceived sync point to be signaled. // Since we entered this state, the FSM has decided that the last features have been received, hence signal // the sync point. - lastFeaturesReceived.reportSuccess(); + lastFeaturesReceived = true; + notifyWaitingThreads(); LoginContext loginContext = walkStateGraphContext.getLoginContext(); Resourcepart resource = bindResourceAndEstablishSession(loginContext.resource); @@ -914,7 +902,7 @@ public final class ModularXmppClientToServerConnection extends AbstractXMPPConne @Override public StateTransitionResult.AttemptResult transitionInto(WalkStateGraphContext walkStateGraphContext) { - closingStreamReceived.init(); + closingStreamReceived = false; boolean streamCloseIssued = outgoingElementsQueue.offerAndShutdown(StreamClose.INSTANCE); @@ -936,7 +924,7 @@ public final class ModularXmppClientToServerConnection extends AbstractXMPPConne XmppInputOutputFilter filter = it.next(); try { filter.waitUntilInputOutputClosed(); - } catch (IOException | CertificateException | InterruptedException | SmackException e) { + } catch (IOException | CertificateException | InterruptedException | SmackException | XMPPException e) { LOGGER.log(Level.WARNING, "waitUntilInputOutputClosed() threw", e); } } diff --git a/smack-core/src/main/java/org/jivesoftware/smack/c2s/internal/ModularXmppClientToServerConnectionInternal.java b/smack-core/src/main/java/org/jivesoftware/smack/c2s/internal/ModularXmppClientToServerConnectionInternal.java index b08676914..81a485771 100644 --- a/smack-core/src/main/java/org/jivesoftware/smack/c2s/internal/ModularXmppClientToServerConnectionInternal.java +++ b/smack-core/src/main/java/org/jivesoftware/smack/c2s/internal/ModularXmppClientToServerConnectionInternal.java @@ -22,11 +22,12 @@ import java.nio.channels.SelectionKey; import java.util.ListIterator; import java.util.Queue; -import org.jivesoftware.smack.SmackException.ConnectionUnexpectedTerminatedException; +import org.jivesoftware.smack.SmackException; import org.jivesoftware.smack.SmackException.NoResponseException; import org.jivesoftware.smack.SmackException.NotConnectedException; import org.jivesoftware.smack.SmackReactor; import org.jivesoftware.smack.SmackReactor.ChannelSelectedCallback; +import org.jivesoftware.smack.XMPPException; import org.jivesoftware.smack.XMPPException.FailedNonzaException; import org.jivesoftware.smack.XmppInputOutputFilter; import org.jivesoftware.smack.c2s.ModularXmppClientToServerConnection; @@ -38,6 +39,7 @@ import org.jivesoftware.smack.packet.Nonza; import org.jivesoftware.smack.packet.TopLevelStreamElement; import org.jivesoftware.smack.packet.XmlEnvironment; import org.jivesoftware.smack.util.Consumer; +import org.jivesoftware.smack.util.Supplier; import org.jivesoftware.smack.xml.XmlPullParser; public abstract class ModularXmppClientToServerConnectionInternal { @@ -98,7 +100,7 @@ public abstract class ModularXmppClientToServerConnectionInternal { public abstract ListIterator getXmppInputOutputFilterEndIterator(); public abstract void newStreamOpenWaitForFeaturesSequence(String waitFor) throws InterruptedException, - ConnectionUnexpectedTerminatedException, NoResponseException, NotConnectedException; + NoResponseException, NotConnectedException, SmackException, XMPPException; public abstract SmackTlsContext getSmackTlsContext(); @@ -108,7 +110,9 @@ public abstract class ModularXmppClientToServerConnectionInternal { public abstract void asyncGo(Runnable runnable); - public abstract Exception getCurrentConnectionException(); + public abstract void waitForCondition(Supplier condition, String waitFor) throws InterruptedException, SmackException, XMPPException; + + public abstract void notifyWaitingThreads(); public abstract void setCompressionEnabled(boolean compressionEnabled); diff --git a/smack-core/src/main/java/org/jivesoftware/smack/compression/CompressionModule.java b/smack-core/src/main/java/org/jivesoftware/smack/compression/CompressionModule.java index 2a018899b..ce0dddeea 100644 --- a/smack-core/src/main/java/org/jivesoftware/smack/compression/CompressionModule.java +++ b/smack-core/src/main/java/org/jivesoftware/smack/compression/CompressionModule.java @@ -17,10 +17,8 @@ package org.jivesoftware.smack.compression; import org.jivesoftware.smack.ConnectionConfiguration; -import org.jivesoftware.smack.SmackException.ConnectionUnexpectedTerminatedException; -import org.jivesoftware.smack.SmackException.NoResponseException; -import org.jivesoftware.smack.SmackException.NotConnectedException; -import org.jivesoftware.smack.XMPPException.FailedNonzaException; +import org.jivesoftware.smack.SmackException; +import org.jivesoftware.smack.XMPPException; import org.jivesoftware.smack.XmppInputOutputFilter; import org.jivesoftware.smack.c2s.ModularXmppClientToServerConnection.AuthenticatedButUnboundStateDescriptor; import org.jivesoftware.smack.c2s.ModularXmppClientToServerConnection.ResourceBindingStateDescriptor; @@ -90,8 +88,7 @@ public class CompressionModule extends ModularXmppClientToServerConnectionModule @Override public StateTransitionResult.AttemptResult transitionInto(WalkStateGraphContext walkStateGraphContext) - throws NoResponseException, NotConnectedException, FailedNonzaException, InterruptedException, - ConnectionUnexpectedTerminatedException { + throws InterruptedException, SmackException, XMPPException { final String compressionMethod = selectedCompressionFactory.getCompressionMethod(); connectionInternal.sendAndWaitForResponse(new Compress(compressionMethod), Compressed.class, Failure.class); diff --git a/smack-core/src/main/java/org/jivesoftware/smack/fsm/State.java b/smack-core/src/main/java/org/jivesoftware/smack/fsm/State.java index f8749161d..a4a7b76ea 100644 --- a/smack-core/src/main/java/org/jivesoftware/smack/fsm/State.java +++ b/smack-core/src/main/java/org/jivesoftware/smack/fsm/State.java @@ -19,11 +19,9 @@ package org.jivesoftware.smack.fsm; import java.io.IOException; import org.jivesoftware.smack.SmackException; -import org.jivesoftware.smack.XMPPException.FailedNonzaException; -import org.jivesoftware.smack.XMPPException.XMPPErrorException; +import org.jivesoftware.smack.XMPPException; import org.jivesoftware.smack.c2s.internal.ModularXmppClientToServerConnectionInternal; import org.jivesoftware.smack.c2s.internal.WalkStateGraphContext; -import org.jivesoftware.smack.sasl.SASLErrorException; /** * Note that this is an non-static inner class of XmppClientToServerConnection so that states can inspect and modify @@ -53,8 +51,7 @@ public abstract class State { } public abstract StateTransitionResult.AttemptResult transitionInto(WalkStateGraphContext walkStateGraphContext) - throws XMPPErrorException, SASLErrorException, IOException, SmackException, - InterruptedException, FailedNonzaException; + throws IOException, SmackException, InterruptedException, XMPPException; public StateDescriptor getStateDescriptor() { return stateDescriptor; diff --git a/smack-core/src/main/java/org/jivesoftware/smack/fsm/StateTransitionResult.java b/smack-core/src/main/java/org/jivesoftware/smack/fsm/StateTransitionResult.java index 053d35e77..a9acb4f32 100644 --- a/smack-core/src/main/java/org/jivesoftware/smack/fsm/StateTransitionResult.java +++ b/smack-core/src/main/java/org/jivesoftware/smack/fsm/StateTransitionResult.java @@ -67,6 +67,14 @@ public abstract class StateTransitionResult { } } + public static final class FailureCausedByTimeout extends Failure { + + public FailureCausedByTimeout(String failureMessage) { + super(failureMessage); + } + + } + public abstract static class TransitionImpossible extends StateTransitionResult { protected TransitionImpossible(String message) { super(message); diff --git a/smack-core/src/main/java/org/jivesoftware/smack/util/Supplier.java b/smack-core/src/main/java/org/jivesoftware/smack/util/Supplier.java new file mode 100644 index 000000000..f0910adb7 --- /dev/null +++ b/smack-core/src/main/java/org/jivesoftware/smack/util/Supplier.java @@ -0,0 +1,24 @@ +/** + * + * Copyright 2020 Florian Schmaus + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.jivesoftware.smack.util; + +// TODO: Replace with java.util.function.Supplier once Smack's minimum Android SDK level is 24 or higher. +public interface Supplier { + + T get(); + +} diff --git a/smack-integration-test/src/main/java/org/jivesoftware/smack/WaitForClosingStreamElementTest.java b/smack-integration-test/src/main/java/org/jivesoftware/smack/WaitForClosingStreamElementTest.java index 023979d64..2326866c7 100644 --- a/smack-integration-test/src/main/java/org/jivesoftware/smack/WaitForClosingStreamElementTest.java +++ b/smack-integration-test/src/main/java/org/jivesoftware/smack/WaitForClosingStreamElementTest.java @@ -38,12 +38,7 @@ public class WaitForClosingStreamElementTest extends AbstractSmackLowLevelIntegr Field closingStreamReceivedField = AbstractXMPPConnection.class.getDeclaredField("closingStreamReceived"); closingStreamReceivedField.setAccessible(true); - SynchronizationPoint closingStreamReceived = (SynchronizationPoint) closingStreamReceivedField.get(connection); - Exception failureException = closingStreamReceived.getFailureException(); - if (failureException != null) { - throw new AssertionError("Sync poing yielded failure exception", failureException); - } - boolean closingStreamReceivedSuccessful = closingStreamReceived.wasSuccessful(); - assertTrue(closingStreamReceivedSuccessful); + boolean closingStreamReceived = (boolean) closingStreamReceivedField.get(connection); + assertTrue(closingStreamReceived); } } diff --git a/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/ConnectionAttemptState.java b/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/ConnectionAttemptState.java index 415af19f9..d8a3890e0 100644 --- a/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/ConnectionAttemptState.java +++ b/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/ConnectionAttemptState.java @@ -26,10 +26,9 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; -import org.jivesoftware.smack.SmackException.ConnectionException; import org.jivesoftware.smack.SmackException.EndpointConnectionException; -import org.jivesoftware.smack.SynchronizationPoint; import org.jivesoftware.smack.c2s.internal.ModularXmppClientToServerConnectionInternal; +import org.jivesoftware.smack.fsm.StateTransitionResult; import org.jivesoftware.smack.tcp.XmppTcpTransportModule.EstablishingTcpConnectionState; import org.jivesoftware.smack.tcp.rce.Rfc6120TcpRemoteConnectionEndpoint; import org.jivesoftware.smack.util.Async; @@ -48,7 +47,10 @@ public final class ConnectionAttemptState { final SocketChannel socketChannel; final List> connectionExceptions; - final SynchronizationPoint tcpConnectionEstablishedSyncPoint; + + EndpointConnectionException connectionException; + boolean connected; + long deadline; final Iterator connectionEndpointIterator; /** The current connection endpoint we are trying */ @@ -65,17 +67,32 @@ public final class ConnectionAttemptState { socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false); - connectionEndpointIterator = discoveredEndpoints.result.discoveredRemoteConnectionEndpoints.iterator(); + List endpoints = discoveredEndpoints.result.discoveredRemoteConnectionEndpoints; + connectionEndpointIterator = endpoints.iterator(); connectionEndpoint = connectionEndpointIterator.next(); - connectionExceptions = new ArrayList<>(discoveredEndpoints.result.discoveredRemoteConnectionEndpoints.size()); - - tcpConnectionEstablishedSyncPoint = new SynchronizationPoint<>(connectionInternal.connection, - "TCP connection establishment"); + connectionExceptions = new ArrayList<>(endpoints.size()); } - void establishTcpConnection() { + StateTransitionResult.Failure establishTcpConnection() throws InterruptedException { RemoteConnectionEndpoint.InetSocketAddressCoupling address = nextAddress(); establishTcpConnection(address); + + synchronized (this) { + while (!connected && connectionException == null) { + final long now = System.currentTimeMillis(); + if (now >= deadline) { + return new StateTransitionResult.FailureCausedByTimeout("Timeout waiting to establish connection"); + } + wait (deadline - now); + } + } + if (connected) { + assert connectionException == null; + // Success case: we have been able to establish a connection to one remote endpoint. + return null; + } + + return new StateTransitionResult.FailureCausedByException(connectionException); } private void establishTcpConnection( @@ -84,8 +101,10 @@ public final class ConnectionAttemptState { establishingTcpConnectionState, address); connectionInternal.invokeConnectionStateMachineListener(connectingToHostEvent); - final boolean connected; final InetSocketAddress inetSocketAddress = address.getInetSocketAddress(); + // TODO: Should use "connect timeout" instead of reply timeout. But first connect timeout needs to be moved from + // XMPPTCPConnectionConfiguration. into XMPPConnectionConfiguration. + deadline = System.currentTimeMillis() + connectionInternal.connection.getReplyTimeout(); try { connected = socketChannel.connect(inetSocketAddress); } catch (IOException e) { @@ -98,7 +117,9 @@ public final class ConnectionAttemptState { establishingTcpConnectionState, address, true); connectionInternal.invokeConnectionStateMachineListener(connectedToHostEvent); - tcpConnectionEstablishedSyncPoint.reportSuccess(); + synchronized (this) { + notifyAll(); + } return; } @@ -124,9 +145,10 @@ public final class ConnectionAttemptState { establishingTcpConnectionState, address, false); connectionInternal.invokeConnectionStateMachineListener(connectedToHostEvent); - // Do not set 'state' here, since this is processed by a reactor thread, which doesn't hold - // the objects lock. - tcpConnectionEstablishedSyncPoint.reportSuccess(); + connected = true; + synchronized (ConnectionAttemptState.this) { + notifyAll(); + } }); } catch (ClosedChannelException e) { onIOExceptionWhenEstablishingTcpConnection(e, address); @@ -137,14 +159,14 @@ public final class ConnectionAttemptState { RemoteConnectionEndpoint.InetSocketAddressCoupling failedAddress) { RemoteConnectionEndpoint.InetSocketAddressCoupling nextInetSocketAddress = nextAddress(); if (nextInetSocketAddress == null) { - EndpointConnectionException connectionException = EndpointConnectionException.from( + connectionException = EndpointConnectionException.from( discoveredEndpoints.result.lookupFailures, connectionExceptions); - tcpConnectionEstablishedSyncPoint.reportFailure(connectionException); + synchronized (this) { + notifyAll(); + } return; } - tcpConnectionEstablishedSyncPoint.resetTimeout(); - RemoteConnectionException rce = new RemoteConnectionException<>( failedAddress, exception); connectionExceptions.add(rce); diff --git a/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java b/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java index ea34b9518..dce23f149 100644 --- a/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java +++ b/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java @@ -26,11 +26,6 @@ import java.io.Writer; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.Socket; -import java.security.KeyManagementException; -import java.security.KeyStoreException; -import java.security.NoSuchAlgorithmException; -import java.security.NoSuchProviderException; -import java.security.UnrecoverableKeyException; import java.security.cert.CertificateException; import java.util.ArrayList; import java.util.Collection; @@ -44,7 +39,6 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Level; @@ -65,13 +59,12 @@ import org.jivesoftware.smack.SmackException.AlreadyConnectedException; import org.jivesoftware.smack.SmackException.AlreadyLoggedInException; import org.jivesoftware.smack.SmackException.ConnectionException; import org.jivesoftware.smack.SmackException.EndpointConnectionException; -import org.jivesoftware.smack.SmackException.NoResponseException; import org.jivesoftware.smack.SmackException.NotConnectedException; import org.jivesoftware.smack.SmackException.NotLoggedInException; +import org.jivesoftware.smack.SmackException.SecurityNotPossibleException; import org.jivesoftware.smack.SmackException.SecurityRequiredByServerException; import org.jivesoftware.smack.SmackFuture; import org.jivesoftware.smack.StanzaListener; -import org.jivesoftware.smack.SynchronizationPoint; import org.jivesoftware.smack.XMPPConnection; import org.jivesoftware.smack.XMPPException; import org.jivesoftware.smack.XMPPException.FailedNonzaException; @@ -151,8 +144,6 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { private SSLSocket secureSocket; - private final Semaphore readerWriterSemaphore = new Semaphore(2); - /** * Protected access level because of unit test purposes */ @@ -166,14 +157,12 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { /** * */ - private final SynchronizationPoint maybeCompressFeaturesReceived = new SynchronizationPoint( - this, "stream compression feature"); + private boolean streamFeaturesAfterAuthenticationReceived; /** * */ - private final SynchronizationPoint compressSyncPoint = new SynchronizationPoint<>( - this, "stream compression"); + private boolean compressSyncPoint; /** * The default bundle and defer callback, used for new connections. @@ -197,15 +186,14 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { /** * The stream ID of the stream that is currently resumable, ie. the stream we hold the state * for in {@link #clientHandledStanzasCount}, {@link #serverHandledStanzasCount} and - * {@link #unacknowledgedStanzas}. + * {@link #unFailedNonzaExceptionacknowledgedStanzas}. */ private String smSessionId; - private final SynchronizationPoint smResumedSyncPoint = new SynchronizationPoint<>( - this, "stream resumed element"); + private SyncPointState smResumedSyncPoint; + private Failed smResumptionFailed; - private final SynchronizationPoint smEnabledSyncPoint = new SynchronizationPoint<>( - this, "stream enabled element"); + private boolean smEnabledSyncPoint; /** * The client's preferred maximum resumption time in seconds. @@ -381,15 +369,17 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { // Wait for stream features after the authentication. // TODO: The name of this synchronization point "maybeCompressFeaturesReceived" is not perfect. It should be // renamed to "streamFeaturesAfterAuthenticationReceived". - maybeCompressFeaturesReceived.checkIfSuccessOrWait(); + waitForConditionOrThrowConnectionException(() -> streamFeaturesAfterAuthenticationReceived, "compress features from server"); // If compression is enabled then request the server to use stream compression. XEP-170 // recommends to perform stream compression before resource binding. maybeEnableCompression(); if (isSmResumptionPossible()) { - smResumedSyncPoint.sendAndWaitForResponse(new Resume(clientHandledStanzasCount, smSessionId)); - if (smResumedSyncPoint.wasSuccessful()) { + smResumedSyncPoint = SyncPointState.request_sent; + sendNonza(new Resume(clientHandledStanzasCount, smSessionId)); + waitForCondition(() -> smResumedSyncPoint == SyncPointState.successful || smResumptionFailed != null, "resume previous stream"); + if (smResumedSyncPoint == SyncPointState.successful) { // We successfully resumed the stream, be done here afterSuccessfulLogin(true); return; @@ -397,7 +387,8 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { // SM resumption failed, what Smack does here is to report success of // lastFeaturesReceived in case of sm resumption was answered with 'failed' so that // normal resource binding can be tried. - LOGGER.fine("Stream resumption failed, continuing with normal stream establishment process"); + assert smResumptionFailed != null; + LOGGER.fine("Stream resumption failed, continuing with normal stream establishment process: " + smResumptionFailed); } List previouslyUnackedStanzas = new LinkedList(); @@ -421,9 +412,10 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { if (isSmAvailable() && useSm) { // Remove what is maybe left from previously stream managed sessions serverHandledStanzasCount = 0; + sendNonza(new Enable(useSmResumption, smClientMaxResumptionTime)); // XEP-198 3. Enabling Stream Management. If the server response to 'Enable' is 'Failed' // then this is a non recoverable error and we therefore throw an exception. - smEnabledSyncPoint.sendAndWaitForResponseOrThrow(new Enable(useSmResumption, smClientMaxResumptionTime)); + waitForConditionOrThrowConnectionException(() -> smEnabledSyncPoint, "enabling stream mangement"); synchronized (requestAckPredicates) { if (requestAckPredicates.isEmpty()) { // Assure that we have at lest one predicate set up that so that we request acks @@ -503,9 +495,14 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { setWasAuthenticated(); - // Wait for reader and writer threads to be terminated. - readerWriterSemaphore.acquireUninterruptibly(2); - readerWriterSemaphore.release(2); + try { + boolean readerAndWriterThreadsTermianted = waitForCondition(() -> !packetWriter.running && !packetReader.running); + if (!readerAndWriterThreadsTermianted) { + LOGGER.severe("Reader and writer threads did not terminate"); + } + } catch (InterruptedException e) { + LOGGER.log(Level.FINE, "Interrupted while waiting for reader and writer threads to terminate", e); + } if (disconnectedButResumeable) { return; @@ -536,10 +533,10 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { @Override protected void initState() { super.initState(); - maybeCompressFeaturesReceived.init(); - compressSyncPoint.init(); - smResumedSyncPoint.init(); - smEnabledSyncPoint.init(); + streamFeaturesAfterAuthenticationReceived = compressSyncPoint = false; + smResumedSyncPoint = SyncPointState.initial; + smResumptionFailed = null; + smEnabledSyncPoint = false; } @Override @@ -652,15 +649,6 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { // Set the reader and writer instance variables initReaderAndWriter(); - int availableReaderWriterSemaphorePermits = readerWriterSemaphore.availablePermits(); - if (availableReaderWriterSemaphorePermits < 2) { - Object[] logObjects = new Object[] { - this, - availableReaderWriterSemaphorePermits, - }; - LOGGER.log(Level.FINE, "Not every reader/writer threads where terminated on connection re-initializtion of {0}. Available permits {1}", logObjects); - } - readerWriterSemaphore.acquire(2); // Start the writer thread. This will open an XMPP stream to the server packetWriter.init(); // Start the reader thread. The startup() method will block until we @@ -688,17 +676,11 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { * existing plain connection and perform a handshake. This method won't return until the * connection has finished the handshake or an error occurred while securing the connection. * @throws IOException if an I/O error occurred. - * @throws CertificateException - * @throws NoSuchAlgorithmException if no such algorithm is available. - * @throws NoSuchProviderException - * @throws KeyStoreException - * @throws UnrecoverableKeyException - * @throws KeyManagementException if there was a key mangement error. - * @throws SmackException if Smack detected an exceptional situation. - * @throws Exception if an exception occurs. + * @throws SecurityNotPossibleException if TLS is not possible. + * @throws CertificateException if there is an issue with the certificate. */ @SuppressWarnings("LiteralClassName") - private void proceedTLSReceived() throws NoSuchAlgorithmException, CertificateException, IOException, KeyStoreException, NoSuchProviderException, UnrecoverableKeyException, KeyManagementException, SmackException { + private void proceedTLSReceived() throws IOException, SecurityNotPossibleException, CertificateException { SmackTlsContext smackTlsContext = getSmackTlsContext(); Socket plain = socket; @@ -773,7 +755,7 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { @Override public boolean isUsingCompression() { - return compressionHandler != null && compressSyncPoint.wasSuccessful(); + return compressionHandler != null && compressSyncPoint; } /** @@ -791,10 +773,10 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { * * @throws NotConnectedException if the XMPP connection is not connected. * @throws SmackException if Smack detected an exceptional situation. - * @throws NoResponseException if there was no response from the remote entity. * @throws InterruptedException if the calling thread was interrupted. + * @throws XMPPException if an XMPP protocol error was received. */ - private void maybeEnableCompression() throws SmackException, InterruptedException { + private void maybeEnableCompression() throws SmackException, InterruptedException, XMPPException { if (!config.isCompressionEnabled()) { return; } @@ -807,7 +789,8 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { // If stream compression was offered by the server and we want to use // compression then send compression request to the server if ((compressionHandler = maybeGetCompressionHandler(compression)) != null) { - compressSyncPoint.sendAndWaitForResponseOrThrow(new Compress(compressionHandler.getCompressionMethod())); + sendNonza(new Compress(compressionHandler.getCompressionMethod())); + waitForConditionOrThrowConnectionException(() -> compressSyncPoint, "establishing stream compression"); } else { LOGGER.warning("Could not enable compression because no matching handler/method pair was found"); } @@ -835,11 +818,11 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { // We connected successfully to the servers TCP port initConnection(); - // TLS handled will be successful either if TLS was established, or if it was not mandatory. - tlsHandled.checkIfSuccessOrWaitOrThrow(); + // TLS handled will be true either if TLS was established, or if it was not mandatory. + waitForConditionOrThrowConnectionException(() -> tlsHandled, "establishing TLS"); // Wait with SASL auth until the SASL mechanisms have been received - saslFeatureReceived.checkIfSuccessOrWaitOrThrow(); + waitForConditionOrThrowConnectionException(() -> saslFeatureReceived, "SASL mechanisms stream feature from server"); } /** @@ -857,24 +840,28 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { if (startTlsFeature != null) { if (startTlsFeature.required() && config.getSecurityMode() == SecurityMode.disabled) { SecurityRequiredByServerException smackException = new SecurityRequiredByServerException(); - tlsHandled.reportFailure(smackException); + currentSmackException = smackException; + notifyWaitingThreads(); throw smackException; } if (config.getSecurityMode() != ConnectionConfiguration.SecurityMode.disabled) { sendNonza(new StartTls()); } else { - tlsHandled.reportSuccess(); + tlsHandled = true; + notifyWaitingThreads(); } } else { - tlsHandled.reportSuccess(); + tlsHandled = true; + notifyWaitingThreads(); } if (isSaslAuthenticated()) { // If we have received features after the SASL has been successfully completed, then we // have also *maybe* received, as it is an optional feature, the compression feature // from the server. - maybeCompressFeaturesReceived.reportSuccess(); + streamFeaturesAfterAuthenticationReceived = true; + notifyWaitingThreads(); } } @@ -899,6 +886,8 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { private volatile boolean done; + private boolean running; + /** * Initializes the reader in order to be used. The reader is initialized during the * first connection and when reconnecting due to an abruptly disconnection. @@ -910,11 +899,13 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { @Override public void run() { LOGGER.finer(threadName + " start"); + running = true; try { parsePackets(); } finally { LOGGER.finer(threadName + " exit"); - XMPPTCPConnection.this.readerWriterSemaphore.release(); + running = false; + notifyWaitingThreads(); } } }, threadName); @@ -931,10 +922,8 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { * Parse top-level packets in order to process them further. */ private void parsePackets() { - boolean initialStreamOpenSend = false; try { openStreamAndResetParser(); - initialStreamOpenSend = true; XmlPullParser.Event eventType = parser.getEventType(); while (!done) { switch (eventType) { @@ -955,27 +944,21 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { break; case "error": StreamError streamError = PacketParserUtils.parseStreamError(parser); - saslFeatureReceived.reportFailure(new StreamErrorException(streamError)); + currentXmppException = new StreamErrorException(streamError); // Mark the tlsHandled sync point as success, we will use the saslFeatureReceived sync // point to report the error, which is checked immediately after tlsHandled in // connectInternal(). - tlsHandled.reportSuccess(); + tlsHandled = true; + notifyWaiters(); throw new StreamErrorException(streamError); case "features": parseFeaturesAndNotify(parser); break; case "proceed": - try { - // Secure the connection by negotiating TLS - proceedTLSReceived(); - // Send a new opening stream to the server - openStreamAndResetParser(); - } - catch (Exception e) { - SmackException.SmackWrappedException smackException = new SmackException.SmackWrappedException(e); - tlsHandled.reportFailure(smackException); - throw e; - } + // Secure the connection by negotiating TLS + proceedTLSReceived(); + // Send a new opening stream to the server + openStreamAndResetParser(); break; case "failure": String namespace = parser.getNamespace(null); @@ -989,8 +972,8 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { // situation. It is still possible to authenticate and // use the connection but using an uncompressed connection // TODO Parse failure stanza - compressSyncPoint.reportFailure(new SmackException.SmackMessageException( - "Could not establish compression")); + currentSmackException = new SmackException.SmackMessageException("Could not establish compression"); + notifyWaitingThreads(); break; default: parseAndProcessNonza(parser); @@ -1004,7 +987,8 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { // Send a new opening stream to the server openStreamAndResetParser(); // Notify that compression is being used - compressSyncPoint.reportSuccess(); + compressSyncPoint = true; + notifyWaitingThreads(); break; case Enabled.ELEMENT: Enabled enabled = ParseStreamManagement.enabled(parser); @@ -1012,7 +996,7 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { smSessionId = enabled.getId(); if (StringUtils.isNullOrEmpty(smSessionId)) { SmackException xmppException = new SmackException.SmackMessageException("Stream Management 'enabled' element with resume attribute but without session id received"); - smEnabledSyncPoint.reportFailure(xmppException); + setCurrentConnectionExceptionAndNotify(xmppException); throw xmppException; } smServerMaxResumptionTime = enabled.getMaxResumptionTime(); @@ -1022,28 +1006,18 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { } clientHandledStanzasCount = 0; smWasEnabledAtLeastOnce = true; - smEnabledSyncPoint.reportSuccess(); - LOGGER.fine("Stream Management (XEP-198): successfully enabled"); + smEnabledSyncPoint = true; + notifyWaitingThreads(); break; case Failed.ELEMENT: Failed failed = ParseStreamManagement.failed(parser); - FailedNonzaException xmppException = new FailedNonzaException(failed, failed.getStanzaErrorCondition()); - // If only XEP-198 would specify different failure elements for the SM - // enable and SM resume failure case. But this is not the case, so we - // need to determine if this is a 'Failed' response for either 'Enable' - // or 'Resume'. - if (smResumedSyncPoint.requestSent()) { - smResumedSyncPoint.reportFailure(xmppException); - } - else { - if (!smEnabledSyncPoint.requestSent()) { - throw new IllegalStateException("Failed element received but SM was not previously enabled"); - } - smEnabledSyncPoint.reportFailure(new SmackException.SmackWrappedException(xmppException)); - // Report success for last lastFeaturesReceived so that in case a - // failed resumption, we can continue with normal resource binding. - // See text of XEP-198 5. below Example 11. - lastFeaturesReceived.reportSuccess(); + if (smResumedSyncPoint == SyncPointState.request_sent) { + // This is a nonza in a response to resuming a previous stream, failure to do + // so is non-fatal as we can simply continue with resource binding in this case. + smResumptionFailed = failed; + } else { + FailedNonzaException xmppException = new FailedNonzaException(failed, failed.getStanzaErrorCondition()); + setCurrentConnectionExceptionAndNotify(xmppException); } break; case Resumed.ELEMENT: @@ -1052,7 +1026,7 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { throw new StreamIdDoesNotMatchException(smSessionId, resumed.getPrevId()); } // Mark SM as enabled - smEnabledSyncPoint.reportSuccess(); + smEnabledSyncPoint = true; // First, drop the stanzas already handled by the server processHandledCount(resumed.getHandledCount()); // Then re-send what is left in the unacknowledged queue @@ -1068,8 +1042,8 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { requestSmAcknowledgementInternal(); } // Mark SM resumption as successful - smResumedSyncPoint.reportSuccess(); - LOGGER.fine("Stream Management (XEP-198): Stream resumed"); + smResumedSyncPoint = SyncPointState.successful; + notifyWaitingThreads(); break; case AckAnswer.ELEMENT: AckAnswer ackAnswer = ParseStreamManagement.ackAnswer(parser); @@ -1077,7 +1051,7 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { break; case AckRequest.ELEMENT: ParseStreamManagement.ackRequest(parser); - if (smEnabledSyncPoint.wasSuccessful()) { + if (smEnabledSyncPoint) { sendSmAcknowledgementInternal(); } else { LOGGER.warning("SM Ack Request received while SM is not enabled"); @@ -1101,7 +1075,8 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { // did re-start the queue again, causing this writer to assume that the queue is not // shutdown, which results in a call to disconnect(). final boolean queueWasShutdown = packetWriter.queue.isShutdown(); - closingStreamReceived.reportSuccess(); + closingStreamReceived = true; + notifyWaitingThreads(); if (queueWasShutdown) { // We received a closing stream element *after* we initiated the @@ -1137,12 +1112,9 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { } } catch (Exception e) { - // TODO: Move the call closingStreamReceived.reportFailure(e) into notifyConnectionError? - closingStreamReceived.reportFailure(e); // The exception can be ignored if the the connection is 'done' - // or if the it was caused because the socket got closed. It can not be ignored if it - // happened before (or while) the initial stream opened was send. - if (!(done || packetWriter.queue.isShutdown()) || !initialStreamOpenSend) { + // or if the it was caused because the socket got closed. + if (!done) { // Close the connection and notify connection listeners of the // error. notifyConnectionError(e); @@ -1161,12 +1133,6 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { private final ArrayBlockingQueueWithShutdown queue = new ArrayBlockingQueueWithShutdown<>( QUEUE_SIZE, true); - /** - * Needs to be protected for unit testing purposes. - */ - protected SynchronizationPoint shutdownDone = new SynchronizationPoint<>( - XMPPTCPConnection.this, "shutdown completed"); - /** * If set, the stanza writer is shut down */ @@ -1184,12 +1150,13 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { */ private boolean shouldBundleAndDefer; + private boolean running; + /** * Initializes the writer in order to be used. It is called at the first connection and also * is invoked if the connection is disconnected by an error. */ void init() { - shutdownDone.init(); shutdownTimestamp = null; if (unacknowledgedStanzas != null) { @@ -1204,11 +1171,13 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { @Override public void run() { LOGGER.finer(threadName + " start"); + running = true; try { writePackets(); } finally { LOGGER.finer(threadName + " exit"); - XMPPTCPConnection.this.readerWriterSemaphore.release(); + running = false; + notifyWaitingThreads(); } } }, threadName); @@ -1262,13 +1231,6 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { instantShutdown = instant; queue.shutdown(); shutdownTimestamp = System.currentTimeMillis(); - if (shutdownDone.isNotInInitialState()) { - try { - shutdownDone.checkIfSuccessOrWait(); - } catch (NoResponseException | InterruptedException e) { - LOGGER.log(Level.WARNING, "shutdownDone was not marked as successful by the writer thread", e); - } - } } /** @@ -1407,9 +1369,6 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { } else { LOGGER.log(Level.FINE, "Ignoring Exception in writePackets()", e); } - } finally { - LOGGER.fine("Reporting shutdownDone success in writer thread"); - shutdownDone.reportSuccess(); } // Delay notifyConnectionError after shutdownDone has been reported in the finally block. if (writerException != null) { @@ -1721,7 +1680,7 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { * @return true if Stream Management was negotiated. */ public boolean isSmEnabled() { - return smEnabledSyncPoint.wasSuccessful(); + return smEnabledSyncPoint; } /** @@ -1730,7 +1689,7 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { * @return true if the stream was resumed. */ public boolean streamWasResumed() { - return smResumedSyncPoint.wasSuccessful(); + return smResumedSyncPoint == SyncPointState.successful; } /** diff --git a/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XmppTcpTransportModule.java b/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XmppTcpTransportModule.java index 74286fdd9..26c74b663 100644 --- a/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XmppTcpTransportModule.java +++ b/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XmppTcpTransportModule.java @@ -47,17 +47,13 @@ import javax.net.ssl.SSLSession; import org.jivesoftware.smack.ConnectionConfiguration.SecurityMode; import org.jivesoftware.smack.SmackException; -import org.jivesoftware.smack.SmackException.ConnectionException; -import org.jivesoftware.smack.SmackException.ConnectionUnexpectedTerminatedException; -import org.jivesoftware.smack.SmackException.NoResponseException; -import org.jivesoftware.smack.SmackException.NotConnectedException; import org.jivesoftware.smack.SmackException.SecurityRequiredByClientException; import org.jivesoftware.smack.SmackException.SecurityRequiredByServerException; -import org.jivesoftware.smack.SmackException.SmackWrappedException; +import org.jivesoftware.smack.SmackException.SmackCertificateException; import org.jivesoftware.smack.SmackFuture; import org.jivesoftware.smack.SmackFuture.InternalSmackFuture; import org.jivesoftware.smack.SmackReactor.SelectionKeyAttachment; -import org.jivesoftware.smack.XMPPException.FailedNonzaException; +import org.jivesoftware.smack.XMPPException; import org.jivesoftware.smack.XmppInputOutputFilter; import org.jivesoftware.smack.c2s.ModularXmppClientToServerConnection.ConnectedButUnauthenticatedStateDescriptor; import org.jivesoftware.smack.c2s.ModularXmppClientToServerConnection.LookupRemoteConnectionEndpointsStateDescriptor; @@ -744,23 +740,14 @@ public class XmppTcpTransportModule extends ModularXmppClientToServerConnectionM @Override public StateTransitionResult.AttemptResult transitionInto(WalkStateGraphContext walkStateGraphContext) - throws InterruptedException, ConnectionUnexpectedTerminatedException, NotConnectedException, - NoResponseException, IOException { + throws InterruptedException, IOException, SmackException, XMPPException { // The fields inetSocketAddress and failedAddresses are handed over from LookupHostAddresses to // ConnectingToHost. ConnectionAttemptState connectionAttemptState = new ConnectionAttemptState(connectionInternal, discoveredTcpEndpoints, this); - connectionAttemptState.establishTcpConnection(); - - try { - connectionAttemptState.tcpConnectionEstablishedSyncPoint.checkIfSuccessOrWaitOrThrow(); - } catch (ConnectionException | NoResponseException e) { - // TODO: It is not really elegant that we catch the exception here. Ideally ConnectionAttemptState would - // simply return a StateTranstionResult.FailureCausedByException. - return new StateTransitionResult.FailureCausedByException<>(e); - } catch (SmackWrappedException e) { - // Should never throw SmackWrappedException. - throw new AssertionError(e); + StateTransitionResult.Failure failure = connectionAttemptState.establishTcpConnection(); + if (failure != null) { + return failure; } socketChannel = connectionAttemptState.socketChannel; @@ -858,8 +845,7 @@ public class XmppTcpTransportModule extends ModularXmppClientToServerConnectionM @Override public StateTransitionResult.AttemptResult transitionInto(WalkStateGraphContext walkStateGraphContext) - throws SmackWrappedException, FailedNonzaException, IOException, InterruptedException, - ConnectionUnexpectedTerminatedException, NoResponseException, NotConnectedException { + throws IOException, InterruptedException, SmackException, XMPPException { connectionInternal.sendAndWaitForResponse(StartTls.INSTANCE, TlsProceed.class, TlsFailure.class); SmackTlsContext smackTlsContext = connectionInternal.getSmackTlsContext(); @@ -881,7 +867,7 @@ public class XmppTcpTransportModule extends ModularXmppClientToServerConnectionM try { tlsState.waitForHandshakeFinished(); } catch (CertificateException e) { - throw new SmackWrappedException(e); + throw new SmackCertificateException(e); } connectionInternal.newStreamOpenWaitForFeaturesSequence("stream features after TLS established"); @@ -1166,43 +1152,20 @@ public class XmppTcpTransportModule extends ModularXmppClientToServerConnectionM private void handleSslException(SSLException e) { handshakeException = e; handshakeStatus = TlsHandshakeStatus.failed; - synchronized (this) { - notifyAll(); - } + connectionInternal.notifyWaitingThreads(); } private void onHandshakeFinished() { handshakeStatus = TlsHandshakeStatus.successful; - synchronized (this) { - notifyAll(); - } + connectionInternal.notifyWaitingThreads(); } private boolean isHandshakeFinished() { return handshakeStatus == TlsHandshakeStatus.successful || handshakeStatus == TlsHandshakeStatus.failed; } - private void waitForHandshakeFinished() throws InterruptedException, CertificateException, SSLException, ConnectionUnexpectedTerminatedException, NoResponseException { - final long deadline = System.currentTimeMillis() + connectionInternal.connection.getReplyTimeout(); - - Exception currentConnectionException = null; - synchronized (this) { - while (!isHandshakeFinished() - && (currentConnectionException = connectionInternal.getCurrentConnectionException()) == null) { - final long now = System.currentTimeMillis(); - if (now >= deadline) - break; - wait(deadline - now); - } - } - - if (currentConnectionException != null) { - throw new SmackException.ConnectionUnexpectedTerminatedException(currentConnectionException); - } - - if (!isHandshakeFinished()) { - throw NoResponseException.newWith(connectionInternal.connection, "TLS handshake to finish"); - } + private void waitForHandshakeFinished() throws InterruptedException, CertificateException, SSLException, SmackException, XMPPException { + connectionInternal.waitForCondition(() -> isHandshakeFinished(), "TLS handshake to finish"); if (handshakeStatus == TlsHandshakeStatus.failed) { throw handshakeException; @@ -1235,7 +1198,7 @@ public class XmppTcpTransportModule extends ModularXmppClientToServerConnectionM @Override public void waitUntilInputOutputClosed() throws IOException, CertificateException, InterruptedException, - ConnectionUnexpectedTerminatedException, NoResponseException { + SmackException, XMPPException { waitForHandshakeFinished(); } diff --git a/smack-tcp/src/test/java/org/jivesoftware/smack/tcp/PacketWriterTest.java b/smack-tcp/src/test/java/org/jivesoftware/smack/tcp/PacketWriterTest.java index 95f74bc60..b6a2ab6a9 100644 --- a/smack-tcp/src/test/java/org/jivesoftware/smack/tcp/PacketWriterTest.java +++ b/smack-tcp/src/test/java/org/jivesoftware/smack/tcp/PacketWriterTest.java @@ -1,6 +1,6 @@ /** * - * Copyright 2014-2019 Florian Schmaus + * Copyright 2014-2020 Florian Schmaus * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -124,8 +124,6 @@ public class PacketWriterTest { // Not really cool, but may increases the chances for 't' to block in sendStanza. Thread.sleep(250); - // Set to true for testing purposes, so that shutdown() won't wait packet writer - pw.shutdownDone.reportSuccess(); // Shutdown the packetwriter, this will also interrupt the writer thread, which is what we hope to happen in the // thread created above. pw.shutdown(false);