mirror of
https://github.com/vanitasvitae/Smack.git
synced 2024-11-19 10:32:05 +01:00
Remove SynchronizationPoint
This continues the design started with e98d42790
("SmackReactor/NIO,
Java8/Android19, Pretty print XML, FSM connections"), where the
exceptions that caused an operation to fail, are not recorded within
SynchronizationPoint but within the connection instance itself.
This commit is contained in:
parent
b1a4ccfae8
commit
57961a8cc1
15 changed files with 352 additions and 684 deletions
|
@ -111,6 +111,7 @@ import org.jivesoftware.smack.util.PacketParserUtils;
|
|||
import org.jivesoftware.smack.util.ParserUtils;
|
||||
import org.jivesoftware.smack.util.Predicate;
|
||||
import org.jivesoftware.smack.util.StringUtils;
|
||||
import org.jivesoftware.smack.util.Supplier;
|
||||
import org.jivesoftware.smack.xml.XmlPullParser;
|
||||
import org.jivesoftware.smack.xml.XmlPullParserException;
|
||||
|
||||
|
@ -174,6 +175,12 @@ public abstract class AbstractXMPPConnection implements XMPPConnection {
|
|||
SmackConfiguration.getVersion();
|
||||
}
|
||||
|
||||
protected enum SyncPointState {
|
||||
initial,
|
||||
request_sent,
|
||||
successful,
|
||||
}
|
||||
|
||||
/**
|
||||
* A collection of ConnectionListeners which listen for connection closing
|
||||
* and reconnection events.
|
||||
|
@ -271,30 +278,29 @@ public abstract class AbstractXMPPConnection implements XMPPConnection {
|
|||
*/
|
||||
protected Writer writer;
|
||||
|
||||
protected final SynchronizationPoint<SmackException> tlsHandled = new SynchronizationPoint<>(this, "establishing TLS");
|
||||
protected SmackException currentSmackException;
|
||||
protected XMPPException currentXmppException;
|
||||
|
||||
protected boolean tlsHandled;
|
||||
|
||||
/**
|
||||
* Set to success if the last features stanza from the server has been parsed. A XMPP connection
|
||||
* Set to <code>true</code> if the last features stanza from the server has been parsed. A XMPP connection
|
||||
* handshake can invoke multiple features stanzas, e.g. when TLS is activated a second feature
|
||||
* stanza is send by the server. This is set to true once the last feature stanza has been
|
||||
* parsed.
|
||||
*/
|
||||
protected final SynchronizationPoint<SmackException> lastFeaturesReceived = new SynchronizationPoint<>(
|
||||
AbstractXMPPConnection.this, "last stream features received from server");
|
||||
protected boolean lastFeaturesReceived;
|
||||
|
||||
/**
|
||||
* Set to success if the SASL feature has been received.
|
||||
* Set to <code>true</code> if the SASL feature has been received.
|
||||
*/
|
||||
protected final SynchronizationPoint<XMPPException> saslFeatureReceived = new SynchronizationPoint<>(
|
||||
AbstractXMPPConnection.this, "SASL mechanisms stream feature from server");
|
||||
|
||||
protected boolean saslFeatureReceived;
|
||||
|
||||
/**
|
||||
* A synchronization point which is successful if this connection has received the closing
|
||||
* stream element from the remote end-point, i.e. the server.
|
||||
*/
|
||||
protected final SynchronizationPoint<Exception> closingStreamReceived = new SynchronizationPoint<>(
|
||||
this, "stream closing element received");
|
||||
protected boolean closingStreamReceived;
|
||||
|
||||
/**
|
||||
* The SASLAuthentication manager that is responsible for authenticating with the server.
|
||||
|
@ -369,8 +375,6 @@ public abstract class AbstractXMPPConnection implements XMPPConnection {
|
|||
*/
|
||||
protected boolean wasAuthenticated = false;
|
||||
|
||||
protected Exception currentConnectionException;
|
||||
|
||||
private final Map<QName, IQRequestHandler> setIqRequestHandler = new HashMap<>();
|
||||
private final Map<QName, IQRequestHandler> getIqRequestHandler = new HashMap<>();
|
||||
|
||||
|
@ -486,10 +490,10 @@ public abstract class AbstractXMPPConnection implements XMPPConnection {
|
|||
public abstract boolean isUsingCompression();
|
||||
|
||||
protected void initState() {
|
||||
saslFeatureReceived.init();
|
||||
lastFeaturesReceived.init();
|
||||
tlsHandled.init();
|
||||
// TODO: We do not init() closingStreamReceived here, as the integration tests use it to check if we waited for
|
||||
currentSmackException = null;
|
||||
currentXmppException = null;
|
||||
saslFeatureReceived = lastFeaturesReceived = tlsHandled = false;
|
||||
// TODO: We do not init closingStreamReceived here, as the integration tests use it to check if we waited for
|
||||
// it.
|
||||
}
|
||||
|
||||
|
@ -512,7 +516,7 @@ public abstract class AbstractXMPPConnection implements XMPPConnection {
|
|||
|
||||
// Reset the connection state
|
||||
initState();
|
||||
closingStreamReceived.init();
|
||||
closingStreamReceived = false;
|
||||
streamId = null;
|
||||
|
||||
try {
|
||||
|
@ -657,15 +661,82 @@ public abstract class AbstractXMPPConnection implements XMPPConnection {
|
|||
return streamId;
|
||||
}
|
||||
|
||||
protected Resourcepart bindResourceAndEstablishSession(Resourcepart resource) throws XMPPErrorException,
|
||||
SmackException, InterruptedException {
|
||||
protected final void throwCurrentConnectionException() throws SmackException, XMPPException {
|
||||
if (currentSmackException != null) {
|
||||
throw currentSmackException;
|
||||
} else if (currentXmppException != null) {
|
||||
throw currentXmppException;
|
||||
}
|
||||
|
||||
throw new AssertionError("No current connection exception set, although throwCurrentException() was called");
|
||||
}
|
||||
|
||||
protected final boolean hasCurrentConnectionException() {
|
||||
return currentSmackException != null || currentXmppException != null;
|
||||
}
|
||||
|
||||
protected final void setCurrentConnectionExceptionAndNotify(Exception exception) {
|
||||
if (exception instanceof SmackException) {
|
||||
currentSmackException = (SmackException) exception;
|
||||
} else if (exception instanceof XMPPException) {
|
||||
currentXmppException = (XMPPException) exception;
|
||||
} else {
|
||||
currentSmackException = new SmackException.SmackWrappedException(exception);
|
||||
}
|
||||
|
||||
notifyWaitingThreads();
|
||||
}
|
||||
|
||||
/**
|
||||
* We use an extra object for {@link #notifyWaitingThreads()} and {@link #waitForCondition(Supplier)}, because all state
|
||||
* changing methods of the connection are synchronized using the connection instance as monitor. If we now would
|
||||
* also use the connection instance for the internal process to wait for a condition, the {@link Object#wait()}
|
||||
* would leave the monitor when it waites, which would allow for another potential call to a state changing function
|
||||
* to proceed.
|
||||
*/
|
||||
private final Object internalMonitor = new Object();
|
||||
|
||||
protected final void notifyWaitingThreads() {
|
||||
synchronized (internalMonitor) {
|
||||
internalMonitor.notifyAll();
|
||||
}
|
||||
}
|
||||
|
||||
protected final boolean waitForCondition(Supplier<Boolean> condition) throws InterruptedException {
|
||||
final long deadline = System.currentTimeMillis() + getReplyTimeout();
|
||||
synchronized (internalMonitor) {
|
||||
while (!condition.get().booleanValue() && !hasCurrentConnectionException()) {
|
||||
final long now = System.currentTimeMillis();
|
||||
if (now >= deadline) {
|
||||
return false;
|
||||
}
|
||||
internalMonitor.wait(deadline - now);
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
protected final void waitForCondition(Supplier<Boolean> condition, String waitFor) throws InterruptedException, NoResponseException {
|
||||
boolean success = waitForCondition(condition);
|
||||
if (!success) {
|
||||
throw NoResponseException.newWith(this, waitFor);
|
||||
}
|
||||
}
|
||||
|
||||
protected final void waitForConditionOrThrowConnectionException(Supplier<Boolean> condition, String waitFor) throws InterruptedException, SmackException, XMPPException {
|
||||
waitForCondition(condition, waitFor);
|
||||
if (hasCurrentConnectionException()) {
|
||||
throwCurrentConnectionException();
|
||||
}
|
||||
}
|
||||
|
||||
protected Resourcepart bindResourceAndEstablishSession(Resourcepart resource)
|
||||
throws SmackException, InterruptedException, XMPPException {
|
||||
// Wait until either:
|
||||
// - the servers last features stanza has been parsed
|
||||
// - the timeout occurs
|
||||
LOGGER.finer("Waiting for last features to be received before continuing with resource binding");
|
||||
lastFeaturesReceived.checkIfSuccessOrWaitOrThrow();
|
||||
|
||||
waitForConditionOrThrowConnectionException(() -> lastFeaturesReceived, "last stream features received from server");
|
||||
|
||||
if (!hasFeature(Bind.ELEMENT, Bind.NAMESPACE)) {
|
||||
// Server never offered resource binding, which is REQUIRED in XMPP client and
|
||||
|
@ -905,29 +976,20 @@ public abstract class AbstractXMPPConnection implements XMPPConnection {
|
|||
return;
|
||||
}
|
||||
|
||||
// TODO: Remove this async but ordered?
|
||||
ASYNC_BUT_ORDERED.performAsyncButOrdered(this, () -> {
|
||||
currentConnectionException = exception;
|
||||
// Note that we first have to set the current connection exception and notify waiting threads, as one of them
|
||||
// could hold the instance lock, which we also need later when calling instantShutdown().
|
||||
setCurrentConnectionExceptionAndNotify(exception);
|
||||
|
||||
// Closes the connection temporary. A if the connection supports stream management, then a reconnection is
|
||||
// possible. Note that a connection listener of e.g. XMPPTCPConnection will drop the SM state in
|
||||
// case the Exception is a StreamErrorException.
|
||||
instantShutdown();
|
||||
|
||||
for (StanzaCollector collector : collectors) {
|
||||
collector.notifyConnectionError(exception);
|
||||
}
|
||||
SmackWrappedException smackWrappedException = new SmackWrappedException(exception);
|
||||
tlsHandled.reportGenericFailure(smackWrappedException);
|
||||
saslFeatureReceived.reportGenericFailure(smackWrappedException);
|
||||
lastFeaturesReceived.reportGenericFailure(smackWrappedException);
|
||||
closingStreamReceived.reportFailure(smackWrappedException);
|
||||
// TODO From XMPPTCPConnection. Was called in Smack 4.3 where notifyConnectionError() was part of
|
||||
// XMPPTCPConnection. Create delegation method?
|
||||
// maybeCompressFeaturesReceived.reportGenericFailure(smackWrappedException);
|
||||
|
||||
synchronized (AbstractXMPPConnection.this) {
|
||||
notifyAll();
|
||||
|
||||
// Closes the connection temporary. A if the connection supports stream management, then a reconnection is
|
||||
// possible. Note that a connection listener of e.g. XMPPTCPConnection will drop the SM state in
|
||||
// case the Exception is a StreamErrorException.
|
||||
instantShutdown();
|
||||
}
|
||||
|
||||
Async.go(() -> {
|
||||
// Notify connection listeners of the error.
|
||||
|
@ -947,19 +1009,13 @@ public abstract class AbstractXMPPConnection implements XMPPConnection {
|
|||
protected abstract void shutdown();
|
||||
|
||||
protected final boolean waitForClosingStreamTagFromServer() {
|
||||
Exception exception;
|
||||
try {
|
||||
// After we send the closing stream element, check if there was already a
|
||||
// closing stream element sent by the server or wait with a timeout for a
|
||||
// closing stream element to be received from the server.
|
||||
exception = closingStreamReceived.checkIfSuccessOrWait();
|
||||
} catch (InterruptedException | NoResponseException e) {
|
||||
exception = e;
|
||||
waitForConditionOrThrowConnectionException(() -> closingStreamReceived, "closing stream tag from the server");
|
||||
} catch (InterruptedException | SmackException | XMPPException e) {
|
||||
LOGGER.log(Level.INFO, "Exception while waiting for closing stream element from the server " + this, e);
|
||||
return false;
|
||||
}
|
||||
if (exception != null) {
|
||||
LOGGER.log(Level.INFO, "Exception while waiting for closing stream element from the server " + this, exception);
|
||||
}
|
||||
return exception == null;
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1817,8 +1873,8 @@ public abstract class AbstractXMPPConnection implements XMPPConnection {
|
|||
// Only proceed with SASL auth if TLS is disabled or if the server doesn't announce it
|
||||
if (!hasFeature(StartTls.ELEMENT, StartTls.NAMESPACE)
|
||||
|| config.getSecurityMode() == SecurityMode.disabled) {
|
||||
tlsHandled.reportSuccess();
|
||||
saslFeatureReceived.reportSuccess();
|
||||
tlsHandled = saslFeatureReceived = true;
|
||||
notifyWaitingThreads();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1829,7 +1885,8 @@ public abstract class AbstractXMPPConnection implements XMPPConnection {
|
|||
|| !config.isCompressionEnabled()) {
|
||||
// This was was last features from the server is either it did not contain
|
||||
// compression or if we disabled it
|
||||
lastFeaturesReceived.reportSuccess();
|
||||
lastFeaturesReceived = true;
|
||||
notifyWaitingThreads();
|
||||
}
|
||||
}
|
||||
afterFeaturesReceived();
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
*/
|
||||
package org.jivesoftware.smack;
|
||||
|
||||
import java.security.cert.CertificateException;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
|
@ -371,15 +372,6 @@ public abstract class SmackException extends Exception {
|
|||
}
|
||||
}
|
||||
|
||||
public static class ConnectionUnexpectedTerminatedException extends SmackException {
|
||||
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
public ConnectionUnexpectedTerminatedException(Throwable wrappedThrowable) {
|
||||
super(wrappedThrowable);
|
||||
}
|
||||
}
|
||||
|
||||
public static class FeatureNotSupportedException extends SmackException {
|
||||
|
||||
/**
|
||||
|
@ -487,4 +479,19 @@ public abstract class SmackException extends Exception {
|
|||
super(message, exception);
|
||||
}
|
||||
}
|
||||
|
||||
public static class SmackCertificateException extends SmackException {
|
||||
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
private final CertificateException certificateException;
|
||||
|
||||
public SmackCertificateException(CertificateException certificateException) {
|
||||
this.certificateException = certificateException;
|
||||
}
|
||||
|
||||
public CertificateException getCertificateException() {
|
||||
return certificateException;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,352 +0,0 @@
|
|||
/**
|
||||
*
|
||||
* Copyright © 2014-2019 Florian Schmaus
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.jivesoftware.smack;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.locks.Condition;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
|
||||
import org.jivesoftware.smack.SmackException.NoResponseException;
|
||||
import org.jivesoftware.smack.SmackException.NotConnectedException;
|
||||
import org.jivesoftware.smack.SmackException.SmackWrappedException;
|
||||
import org.jivesoftware.smack.packet.Nonza;
|
||||
import org.jivesoftware.smack.packet.Stanza;
|
||||
import org.jivesoftware.smack.packet.TopLevelStreamElement;
|
||||
|
||||
public class SynchronizationPoint<E extends Exception> {
|
||||
|
||||
private final AbstractXMPPConnection connection;
|
||||
private final Lock connectionLock;
|
||||
private final Condition condition;
|
||||
private final String waitFor;
|
||||
|
||||
// Note that there is no need to make 'state' and 'failureException' volatile. Since 'lock' and 'unlock' have the
|
||||
// same memory synchronization effects as synchronization block enter and leave.
|
||||
private State state;
|
||||
private E failureException;
|
||||
private SmackWrappedException smackWrappedExcpetion;
|
||||
|
||||
private volatile long waitStart;
|
||||
|
||||
/**
|
||||
* Construct a new synchronization point for the given connection.
|
||||
*
|
||||
* @param connection the connection of this synchronization point.
|
||||
* @param waitFor a description of the event this synchronization point handles.
|
||||
*/
|
||||
public SynchronizationPoint(AbstractXMPPConnection connection, String waitFor) {
|
||||
this.connection = connection;
|
||||
this.connectionLock = connection.getConnectionLock();
|
||||
this.condition = connection.getConnectionLock().newCondition();
|
||||
this.waitFor = waitFor;
|
||||
init();
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialize (or reset) this synchronization point.
|
||||
*/
|
||||
@SuppressWarnings("LockNotBeforeTry")
|
||||
public void init() {
|
||||
connectionLock.lock();
|
||||
state = State.Initial;
|
||||
failureException = null;
|
||||
smackWrappedExcpetion = null;
|
||||
connectionLock.unlock();
|
||||
}
|
||||
|
||||
/**
|
||||
* Send the given top level stream element and wait for a response.
|
||||
*
|
||||
* @param request the plain stream element to send.
|
||||
* @throws NoResponseException if no response was received.
|
||||
* @throws NotConnectedException if the connection is not connected.
|
||||
* @throws InterruptedException if the connection is interrupted.
|
||||
* @return <code>null</code> if synchronization point was successful, or the failure Exception.
|
||||
*/
|
||||
public Exception sendAndWaitForResponse(TopLevelStreamElement request) throws NoResponseException,
|
||||
NotConnectedException, InterruptedException {
|
||||
assert state == State.Initial;
|
||||
connectionLock.lock();
|
||||
try {
|
||||
if (request != null) {
|
||||
if (request instanceof Stanza) {
|
||||
connection.sendStanza((Stanza) request);
|
||||
}
|
||||
else if (request instanceof Nonza) {
|
||||
connection.sendNonza((Nonza) request);
|
||||
} else {
|
||||
throw new IllegalStateException("Unsupported element type");
|
||||
}
|
||||
state = State.RequestSent;
|
||||
}
|
||||
waitForConditionOrTimeout();
|
||||
}
|
||||
finally {
|
||||
connectionLock.unlock();
|
||||
}
|
||||
return checkForResponse();
|
||||
}
|
||||
|
||||
/**
|
||||
* Send the given plain stream element and wait for a response.
|
||||
*
|
||||
* @param request the plain stream element to send.
|
||||
* @throws E if an failure was reported.
|
||||
* @throws NoResponseException if no response was received.
|
||||
* @throws NotConnectedException if the connection is not connected.
|
||||
* @throws InterruptedException if the connection is interrupted.
|
||||
* @throws SmackWrappedException in case of a wrapped exception;
|
||||
*/
|
||||
public void sendAndWaitForResponseOrThrow(Nonza request) throws E, NoResponseException,
|
||||
NotConnectedException, InterruptedException, SmackWrappedException {
|
||||
sendAndWaitForResponse(request);
|
||||
switch (state) {
|
||||
case Failure:
|
||||
throwException();
|
||||
break;
|
||||
default:
|
||||
// Success, do nothing
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if this synchronization point is successful or wait the connections reply timeout.
|
||||
* @throws NoResponseException if there was no response marking the synchronization point as success or failed.
|
||||
* @throws E if there was a failure
|
||||
* @throws InterruptedException if the connection is interrupted.
|
||||
* @throws SmackWrappedException in case of a wrapped exception;
|
||||
*/
|
||||
public void checkIfSuccessOrWaitOrThrow() throws NoResponseException, E, InterruptedException, SmackWrappedException {
|
||||
checkIfSuccessOrWait();
|
||||
if (state == State.Failure) {
|
||||
throwException();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if this synchronization point is successful or wait the connections reply timeout.
|
||||
* @throws NoResponseException if there was no response marking the synchronization point as success or failed.
|
||||
* @throws InterruptedException if the calling thread was interrupted.
|
||||
* @return <code>null</code> if synchronization point was successful, or the failure Exception.
|
||||
*/
|
||||
public Exception checkIfSuccessOrWait() throws NoResponseException, InterruptedException {
|
||||
connectionLock.lock();
|
||||
try {
|
||||
switch (state) {
|
||||
// Return immediately on success or failure
|
||||
case Success:
|
||||
return null;
|
||||
case Failure:
|
||||
return getException();
|
||||
default:
|
||||
// Do nothing
|
||||
break;
|
||||
}
|
||||
waitForConditionOrTimeout();
|
||||
} finally {
|
||||
connectionLock.unlock();
|
||||
}
|
||||
return checkForResponse();
|
||||
}
|
||||
|
||||
/**
|
||||
* Report this synchronization point as successful.
|
||||
*/
|
||||
public void reportSuccess() {
|
||||
connectionLock.lock();
|
||||
try {
|
||||
state = State.Success;
|
||||
condition.signalAll();
|
||||
}
|
||||
finally {
|
||||
connectionLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Deprecated.
|
||||
* @deprecated use {@link #reportFailure(Exception)} instead.
|
||||
*/
|
||||
@Deprecated
|
||||
public void reportFailure() {
|
||||
reportFailure(null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Report this synchronization point as failed because of the given exception. The {@code failureException} must be set.
|
||||
*
|
||||
* @param failureException the exception causing this synchronization point to fail.
|
||||
*/
|
||||
public void reportFailure(E failureException) {
|
||||
assert failureException != null;
|
||||
connectionLock.lock();
|
||||
try {
|
||||
state = State.Failure;
|
||||
this.failureException = failureException;
|
||||
condition.signalAll();
|
||||
}
|
||||
finally {
|
||||
connectionLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Report this synchronization point as failed because of the given exception. The {@code failureException} must be set.
|
||||
*
|
||||
* @param exception the exception causing this synchronization point to fail.
|
||||
*/
|
||||
public void reportGenericFailure(SmackWrappedException exception) {
|
||||
assert exception != null;
|
||||
connectionLock.lock();
|
||||
try {
|
||||
state = State.Failure;
|
||||
this.smackWrappedExcpetion = exception;
|
||||
condition.signalAll();
|
||||
}
|
||||
finally {
|
||||
connectionLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if this synchronization point was successful.
|
||||
*
|
||||
* @return true if the synchronization point was successful, false otherwise.
|
||||
*/
|
||||
public boolean wasSuccessful() {
|
||||
connectionLock.lock();
|
||||
try {
|
||||
return state == State.Success;
|
||||
}
|
||||
finally {
|
||||
connectionLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public boolean isNotInInitialState() {
|
||||
connectionLock.lock();
|
||||
try {
|
||||
return state != State.Initial;
|
||||
}
|
||||
finally {
|
||||
connectionLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if this synchronization point has its request already sent.
|
||||
*
|
||||
* @return true if the request was already sent, false otherwise.
|
||||
*/
|
||||
public boolean requestSent() {
|
||||
connectionLock.lock();
|
||||
try {
|
||||
return state == State.RequestSent;
|
||||
}
|
||||
finally {
|
||||
connectionLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public E getFailureException() {
|
||||
connectionLock.lock();
|
||||
try {
|
||||
return failureException;
|
||||
}
|
||||
finally {
|
||||
connectionLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public void resetTimeout() {
|
||||
waitStart = System.currentTimeMillis();
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for the condition to become something else as {@link State#RequestSent} or {@link State#Initial}.
|
||||
* {@link #reportSuccess()}, {@link #reportFailure()} and {@link #reportFailure(Exception)} will either set this
|
||||
* synchronization point to {@link State#Success} or {@link State#Failure}. If none of them is set after the
|
||||
* connections reply timeout, this method will set the state of {@link State#NoResponse}.
|
||||
* @throws InterruptedException if the calling thread was interrupted.
|
||||
*/
|
||||
private void waitForConditionOrTimeout() throws InterruptedException {
|
||||
waitStart = System.currentTimeMillis();
|
||||
while (state == State.RequestSent || state == State.Initial) {
|
||||
long timeout = connection.getReplyTimeout();
|
||||
long remainingWaitMillis = timeout - (System.currentTimeMillis() - waitStart);
|
||||
long remainingWait = TimeUnit.MILLISECONDS.toNanos(remainingWaitMillis);
|
||||
|
||||
if (remainingWait <= 0) {
|
||||
state = State.NoResponse;
|
||||
break;
|
||||
}
|
||||
|
||||
try {
|
||||
condition.awaitNanos(remainingWait);
|
||||
} catch (InterruptedException e) {
|
||||
state = State.Interrupted;
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private Exception getException() {
|
||||
if (failureException != null) {
|
||||
return failureException;
|
||||
}
|
||||
return smackWrappedExcpetion;
|
||||
}
|
||||
|
||||
private void throwException() throws E, SmackWrappedException {
|
||||
if (failureException != null) {
|
||||
throw failureException;
|
||||
}
|
||||
throw smackWrappedExcpetion;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check for a response and throw a {@link NoResponseException} if there was none.
|
||||
* <p>
|
||||
* The exception is thrown, if state is one of 'Initial', 'NoResponse' or 'RequestSent'
|
||||
* </p>
|
||||
* @return <code>true</code> if synchronization point was successful, <code>false</code> on failure.
|
||||
* @throws NoResponseException if there was no response from the remote entity.
|
||||
*/
|
||||
private Exception checkForResponse() throws NoResponseException {
|
||||
switch (state) {
|
||||
case Initial:
|
||||
case NoResponse:
|
||||
case RequestSent:
|
||||
throw NoResponseException.newWith(connection, waitFor);
|
||||
case Success:
|
||||
return null;
|
||||
case Failure:
|
||||
return getException();
|
||||
default:
|
||||
throw new AssertionError("Unknown state " + state);
|
||||
}
|
||||
}
|
||||
|
||||
private enum State {
|
||||
Initial,
|
||||
RequestSent,
|
||||
NoResponse,
|
||||
Success,
|
||||
Failure,
|
||||
Interrupted,
|
||||
}
|
||||
}
|
|
@ -67,7 +67,8 @@ public interface XmppInputOutputFilter {
|
|||
default void closeInputOutput() {
|
||||
}
|
||||
|
||||
default void waitUntilInputOutputClosed() throws IOException, NoResponseException, CertificateException, InterruptedException, SmackException {
|
||||
default void waitUntilInputOutputClosed() throws IOException, NoResponseException, CertificateException,
|
||||
InterruptedException, SmackException, XMPPException {
|
||||
}
|
||||
|
||||
Object getStats();
|
||||
|
|
|
@ -35,7 +35,6 @@ import javax.net.ssl.SSLSession;
|
|||
|
||||
import org.jivesoftware.smack.AbstractXMPPConnection;
|
||||
import org.jivesoftware.smack.SmackException;
|
||||
import org.jivesoftware.smack.SmackException.ConnectionUnexpectedTerminatedException;
|
||||
import org.jivesoftware.smack.SmackException.NoResponseException;
|
||||
import org.jivesoftware.smack.SmackException.NotConnectedException;
|
||||
import org.jivesoftware.smack.SmackFuture;
|
||||
|
@ -78,6 +77,7 @@ import org.jivesoftware.smack.util.ArrayBlockingQueueWithShutdown;
|
|||
import org.jivesoftware.smack.util.ExtendedAppendable;
|
||||
import org.jivesoftware.smack.util.PacketParserUtils;
|
||||
import org.jivesoftware.smack.util.StringUtils;
|
||||
import org.jivesoftware.smack.util.Supplier;
|
||||
import org.jivesoftware.smack.xml.XmlPullParser;
|
||||
import org.jivesoftware.smack.xml.XmlPullParserException;
|
||||
|
||||
|
@ -142,7 +142,8 @@ public final class ModularXmppClientToServerConnection extends AbstractXMPPConne
|
|||
|
||||
@Override
|
||||
public void onStreamClosed() {
|
||||
ModularXmppClientToServerConnection.this.closingStreamReceived.reportSuccess();
|
||||
ModularXmppClientToServerConnection.this.closingStreamReceived = true;
|
||||
notifyWaitingThreads();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -177,7 +178,7 @@ public final class ModularXmppClientToServerConnection extends AbstractXMPPConne
|
|||
|
||||
@Override
|
||||
public void newStreamOpenWaitForFeaturesSequence(String waitFor) throws InterruptedException,
|
||||
ConnectionUnexpectedTerminatedException, NoResponseException, NotConnectedException {
|
||||
SmackException, XMPPException {
|
||||
ModularXmppClientToServerConnection.this.newStreamOpenWaitForFeaturesSequence(waitFor);
|
||||
}
|
||||
|
||||
|
@ -198,8 +199,14 @@ public final class ModularXmppClientToServerConnection extends AbstractXMPPConne
|
|||
}
|
||||
|
||||
@Override
|
||||
public Exception getCurrentConnectionException() {
|
||||
return ModularXmppClientToServerConnection.this.currentConnectionException;
|
||||
public void waitForCondition(Supplier<Boolean> condition, String waitFor)
|
||||
throws InterruptedException, SmackException, XMPPException {
|
||||
ModularXmppClientToServerConnection.this.waitForConditionOrThrowConnectionException(condition, waitFor);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void notifyWaitingThreads() {
|
||||
ModularXmppClientToServerConnection.this.notifyWaitingThreads();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -263,14 +270,13 @@ public final class ModularXmppClientToServerConnection extends AbstractXMPPConne
|
|||
revertedState.resetState();
|
||||
}
|
||||
|
||||
protected void walkStateGraph(WalkStateGraphContext walkStateGraphContext) throws XMPPErrorException,
|
||||
SASLErrorException, FailedNonzaException, IOException, SmackException, InterruptedException {
|
||||
protected void walkStateGraph(WalkStateGraphContext walkStateGraphContext)
|
||||
throws XMPPException, IOException, SmackException, InterruptedException {
|
||||
// Save a copy of the current state
|
||||
GraphVertex<State> previousStateVertex = currentStateVertex;
|
||||
try {
|
||||
walkStateGraphInternal(walkStateGraphContext);
|
||||
} catch (XMPPErrorException | SASLErrorException | FailedNonzaException | IOException | SmackException
|
||||
| InterruptedException e) {
|
||||
} catch (IOException | SmackException | InterruptedException | XMPPException e) {
|
||||
currentStateVertex = previousStateVertex;
|
||||
// Unwind the state.
|
||||
State revertedState = currentStateVertex.getElement();
|
||||
|
@ -279,8 +285,8 @@ public final class ModularXmppClientToServerConnection extends AbstractXMPPConne
|
|||
}
|
||||
}
|
||||
|
||||
private void walkStateGraphInternal(WalkStateGraphContext walkStateGraphContext) throws XMPPErrorException,
|
||||
SASLErrorException, IOException, SmackException, InterruptedException, FailedNonzaException {
|
||||
private void walkStateGraphInternal(WalkStateGraphContext walkStateGraphContext)
|
||||
throws IOException, SmackException, InterruptedException, XMPPException {
|
||||
// Save a copy of the current state
|
||||
final GraphVertex<State> initialStateVertex = currentStateVertex;
|
||||
final State initialState = initialStateVertex.getElement();
|
||||
|
@ -359,15 +365,13 @@ public final class ModularXmppClientToServerConnection extends AbstractXMPPConne
|
|||
* @param walkStateGraphContext the "walk state graph" context.
|
||||
* @return A state transition result or <code>null</code> if this state can be ignored.
|
||||
* @throws SmackException if Smack detected an exceptional situation.
|
||||
* @throws XMPPErrorException if an XMPP protocol error was received.
|
||||
* @throws SASLErrorException if a SASL protocol error was returned.
|
||||
* @throws XMPPException if an XMPP protocol error was received.
|
||||
* @throws IOException if an I/O error occurred.
|
||||
* @throws InterruptedException if the calling thread was interrupted.
|
||||
* @throws FailedNonzaException if an XMPP protocol failure was received.
|
||||
*/
|
||||
private StateTransitionResult attemptEnterState(GraphVertex<State> successorStateVertex,
|
||||
WalkStateGraphContext walkStateGraphContext) throws SmackException, XMPPErrorException,
|
||||
SASLErrorException, IOException, InterruptedException, FailedNonzaException {
|
||||
WalkStateGraphContext walkStateGraphContext) throws SmackException, XMPPException,
|
||||
IOException, InterruptedException {
|
||||
final GraphVertex<State> initialStateVertex = currentStateVertex;
|
||||
final State initialState = initialStateVertex.getElement();
|
||||
final State successorState = successorStateVertex.getElement();
|
||||
|
@ -400,8 +404,7 @@ public final class ModularXmppClientToServerConnection extends AbstractXMPPConne
|
|||
|
||||
invokeConnectionStateMachineListener(new ConnectionStateEvent.AboutToTransitionInto(initialState, successorState));
|
||||
transitionAttemptResult = successorState.transitionInto(walkStateGraphContext);
|
||||
} catch (SmackException | XMPPErrorException | SASLErrorException | IOException | InterruptedException
|
||||
| FailedNonzaException e) {
|
||||
} catch (SmackException | IOException | InterruptedException | XMPPException e) {
|
||||
// Unwind the state here too, since this state will not be unwound by walkStateGraph(), as it will not
|
||||
// become a predecessor state in the walk.
|
||||
unwindState(successorState);
|
||||
|
@ -474,8 +477,7 @@ public final class ModularXmppClientToServerConnection extends AbstractXMPPConne
|
|||
|
||||
try {
|
||||
walkStateGraph(context);
|
||||
} catch (XMPPErrorException | SASLErrorException | IOException | SmackException | InterruptedException
|
||||
| FailedNonzaException e) {
|
||||
} catch (IOException | SmackException | InterruptedException | XMPPException e) {
|
||||
throw new IllegalStateException("A walk to disconnected state should never throw", e);
|
||||
}
|
||||
}
|
||||
|
@ -491,9 +493,7 @@ public final class ModularXmppClientToServerConnection extends AbstractXMPPConne
|
|||
@Override
|
||||
protected void afterFeaturesReceived() {
|
||||
featuresReceived = true;
|
||||
synchronized (this) {
|
||||
notifyAll();
|
||||
}
|
||||
notifyWaitingThreads();
|
||||
}
|
||||
|
||||
protected void parseAndProcessElement(String element) {
|
||||
|
@ -522,8 +522,10 @@ public final class ModularXmppClientToServerConnection extends AbstractXMPPConne
|
|||
break;
|
||||
case "error":
|
||||
StreamError streamError = PacketParserUtils.parseStreamError(parser, null);
|
||||
saslFeatureReceived.reportFailure(new StreamErrorException(streamError));
|
||||
throw new StreamErrorException(streamError);
|
||||
StreamErrorException streamErrorException = new StreamErrorException(streamError);
|
||||
currentXmppException = streamErrorException;
|
||||
notifyWaitingThreads();
|
||||
throw streamErrorException;
|
||||
case "features":
|
||||
parseFeatures(parser);
|
||||
afterFeaturesReceived();
|
||||
|
@ -550,25 +552,12 @@ public final class ModularXmppClientToServerConnection extends AbstractXMPPConne
|
|||
}
|
||||
|
||||
protected void waitForFeaturesReceived(String waitFor)
|
||||
throws InterruptedException, ConnectionUnexpectedTerminatedException, NoResponseException {
|
||||
long waitStartMs = System.currentTimeMillis();
|
||||
long timeoutMs = getReplyTimeout();
|
||||
synchronized (this) {
|
||||
while (!featuresReceived && currentConnectionException == null) {
|
||||
long remainingWaitMs = timeoutMs - (System.currentTimeMillis() - waitStartMs);
|
||||
if (remainingWaitMs <= 0) {
|
||||
throw NoResponseException.newWith(this, waitFor);
|
||||
}
|
||||
wait(remainingWaitMs);
|
||||
}
|
||||
if (currentConnectionException != null) {
|
||||
throw new SmackException.ConnectionUnexpectedTerminatedException(currentConnectionException);
|
||||
}
|
||||
}
|
||||
throws InterruptedException, SmackException, XMPPException {
|
||||
waitForConditionOrThrowConnectionException(() -> featuresReceived, waitFor);
|
||||
}
|
||||
|
||||
protected void newStreamOpenWaitForFeaturesSequence(String waitFor) throws InterruptedException,
|
||||
ConnectionUnexpectedTerminatedException, NoResponseException, NotConnectedException {
|
||||
SmackException, XMPPException {
|
||||
prepareToWaitForFeaturesReceived();
|
||||
sendStreamOpen();
|
||||
waitForFeaturesReceived(waitFor);
|
||||
|
@ -763,8 +752,7 @@ public final class ModularXmppClientToServerConnection extends AbstractXMPPConne
|
|||
|
||||
@Override
|
||||
public StateTransitionResult.AttemptResult transitionInto(WalkStateGraphContext walkStateGraphContext)
|
||||
throws XMPPErrorException, SASLErrorException, IOException, SmackException,
|
||||
InterruptedException {
|
||||
throws IOException, SmackException, InterruptedException, XMPPException {
|
||||
prepareToWaitForFeaturesReceived();
|
||||
|
||||
LoginContext loginContext = walkStateGraphContext.getLoginContext();
|
||||
|
@ -813,12 +801,12 @@ public final class ModularXmppClientToServerConnection extends AbstractXMPPConne
|
|||
|
||||
@Override
|
||||
public StateTransitionResult.AttemptResult transitionInto(WalkStateGraphContext walkStateGraphContext)
|
||||
throws XMPPErrorException, SASLErrorException, IOException, SmackException,
|
||||
InterruptedException {
|
||||
throws IOException, SmackException, InterruptedException, XMPPException {
|
||||
// Calling bindResourceAndEstablishSession() below requires the lastFeaturesReceived sync point to be signaled.
|
||||
// Since we entered this state, the FSM has decided that the last features have been received, hence signal
|
||||
// the sync point.
|
||||
lastFeaturesReceived.reportSuccess();
|
||||
lastFeaturesReceived = true;
|
||||
notifyWaitingThreads();
|
||||
|
||||
LoginContext loginContext = walkStateGraphContext.getLoginContext();
|
||||
Resourcepart resource = bindResourceAndEstablishSession(loginContext.resource);
|
||||
|
@ -914,7 +902,7 @@ public final class ModularXmppClientToServerConnection extends AbstractXMPPConne
|
|||
|
||||
@Override
|
||||
public StateTransitionResult.AttemptResult transitionInto(WalkStateGraphContext walkStateGraphContext) {
|
||||
closingStreamReceived.init();
|
||||
closingStreamReceived = false;
|
||||
|
||||
boolean streamCloseIssued = outgoingElementsQueue.offerAndShutdown(StreamClose.INSTANCE);
|
||||
|
||||
|
@ -936,7 +924,7 @@ public final class ModularXmppClientToServerConnection extends AbstractXMPPConne
|
|||
XmppInputOutputFilter filter = it.next();
|
||||
try {
|
||||
filter.waitUntilInputOutputClosed();
|
||||
} catch (IOException | CertificateException | InterruptedException | SmackException e) {
|
||||
} catch (IOException | CertificateException | InterruptedException | SmackException | XMPPException e) {
|
||||
LOGGER.log(Level.WARNING, "waitUntilInputOutputClosed() threw", e);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,11 +22,12 @@ import java.nio.channels.SelectionKey;
|
|||
import java.util.ListIterator;
|
||||
import java.util.Queue;
|
||||
|
||||
import org.jivesoftware.smack.SmackException.ConnectionUnexpectedTerminatedException;
|
||||
import org.jivesoftware.smack.SmackException;
|
||||
import org.jivesoftware.smack.SmackException.NoResponseException;
|
||||
import org.jivesoftware.smack.SmackException.NotConnectedException;
|
||||
import org.jivesoftware.smack.SmackReactor;
|
||||
import org.jivesoftware.smack.SmackReactor.ChannelSelectedCallback;
|
||||
import org.jivesoftware.smack.XMPPException;
|
||||
import org.jivesoftware.smack.XMPPException.FailedNonzaException;
|
||||
import org.jivesoftware.smack.XmppInputOutputFilter;
|
||||
import org.jivesoftware.smack.c2s.ModularXmppClientToServerConnection;
|
||||
|
@ -38,6 +39,7 @@ import org.jivesoftware.smack.packet.Nonza;
|
|||
import org.jivesoftware.smack.packet.TopLevelStreamElement;
|
||||
import org.jivesoftware.smack.packet.XmlEnvironment;
|
||||
import org.jivesoftware.smack.util.Consumer;
|
||||
import org.jivesoftware.smack.util.Supplier;
|
||||
import org.jivesoftware.smack.xml.XmlPullParser;
|
||||
|
||||
public abstract class ModularXmppClientToServerConnectionInternal {
|
||||
|
@ -98,7 +100,7 @@ public abstract class ModularXmppClientToServerConnectionInternal {
|
|||
public abstract ListIterator<XmppInputOutputFilter> getXmppInputOutputFilterEndIterator();
|
||||
|
||||
public abstract void newStreamOpenWaitForFeaturesSequence(String waitFor) throws InterruptedException,
|
||||
ConnectionUnexpectedTerminatedException, NoResponseException, NotConnectedException;
|
||||
NoResponseException, NotConnectedException, SmackException, XMPPException;
|
||||
|
||||
public abstract SmackTlsContext getSmackTlsContext();
|
||||
|
||||
|
@ -108,7 +110,9 @@ public abstract class ModularXmppClientToServerConnectionInternal {
|
|||
|
||||
public abstract void asyncGo(Runnable runnable);
|
||||
|
||||
public abstract Exception getCurrentConnectionException();
|
||||
public abstract void waitForCondition(Supplier<Boolean> condition, String waitFor) throws InterruptedException, SmackException, XMPPException;
|
||||
|
||||
public abstract void notifyWaitingThreads();
|
||||
|
||||
public abstract void setCompressionEnabled(boolean compressionEnabled);
|
||||
|
||||
|
|
|
@ -17,10 +17,8 @@
|
|||
package org.jivesoftware.smack.compression;
|
||||
|
||||
import org.jivesoftware.smack.ConnectionConfiguration;
|
||||
import org.jivesoftware.smack.SmackException.ConnectionUnexpectedTerminatedException;
|
||||
import org.jivesoftware.smack.SmackException.NoResponseException;
|
||||
import org.jivesoftware.smack.SmackException.NotConnectedException;
|
||||
import org.jivesoftware.smack.XMPPException.FailedNonzaException;
|
||||
import org.jivesoftware.smack.SmackException;
|
||||
import org.jivesoftware.smack.XMPPException;
|
||||
import org.jivesoftware.smack.XmppInputOutputFilter;
|
||||
import org.jivesoftware.smack.c2s.ModularXmppClientToServerConnection.AuthenticatedButUnboundStateDescriptor;
|
||||
import org.jivesoftware.smack.c2s.ModularXmppClientToServerConnection.ResourceBindingStateDescriptor;
|
||||
|
@ -90,8 +88,7 @@ public class CompressionModule extends ModularXmppClientToServerConnectionModule
|
|||
|
||||
@Override
|
||||
public StateTransitionResult.AttemptResult transitionInto(WalkStateGraphContext walkStateGraphContext)
|
||||
throws NoResponseException, NotConnectedException, FailedNonzaException, InterruptedException,
|
||||
ConnectionUnexpectedTerminatedException {
|
||||
throws InterruptedException, SmackException, XMPPException {
|
||||
final String compressionMethod = selectedCompressionFactory.getCompressionMethod();
|
||||
connectionInternal.sendAndWaitForResponse(new Compress(compressionMethod), Compressed.class, Failure.class);
|
||||
|
||||
|
|
|
@ -19,11 +19,9 @@ package org.jivesoftware.smack.fsm;
|
|||
import java.io.IOException;
|
||||
|
||||
import org.jivesoftware.smack.SmackException;
|
||||
import org.jivesoftware.smack.XMPPException.FailedNonzaException;
|
||||
import org.jivesoftware.smack.XMPPException.XMPPErrorException;
|
||||
import org.jivesoftware.smack.XMPPException;
|
||||
import org.jivesoftware.smack.c2s.internal.ModularXmppClientToServerConnectionInternal;
|
||||
import org.jivesoftware.smack.c2s.internal.WalkStateGraphContext;
|
||||
import org.jivesoftware.smack.sasl.SASLErrorException;
|
||||
|
||||
/**
|
||||
* Note that this is an non-static inner class of XmppClientToServerConnection so that states can inspect and modify
|
||||
|
@ -53,8 +51,7 @@ public abstract class State {
|
|||
}
|
||||
|
||||
public abstract StateTransitionResult.AttemptResult transitionInto(WalkStateGraphContext walkStateGraphContext)
|
||||
throws XMPPErrorException, SASLErrorException, IOException, SmackException,
|
||||
InterruptedException, FailedNonzaException;
|
||||
throws IOException, SmackException, InterruptedException, XMPPException;
|
||||
|
||||
public StateDescriptor getStateDescriptor() {
|
||||
return stateDescriptor;
|
||||
|
|
|
@ -67,6 +67,14 @@ public abstract class StateTransitionResult {
|
|||
}
|
||||
}
|
||||
|
||||
public static final class FailureCausedByTimeout extends Failure {
|
||||
|
||||
public FailureCausedByTimeout(String failureMessage) {
|
||||
super(failureMessage);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public abstract static class TransitionImpossible extends StateTransitionResult {
|
||||
protected TransitionImpossible(String message) {
|
||||
super(message);
|
||||
|
|
|
@ -0,0 +1,24 @@
|
|||
/**
|
||||
*
|
||||
* Copyright 2020 Florian Schmaus
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.jivesoftware.smack.util;
|
||||
|
||||
// TODO: Replace with java.util.function.Supplier once Smack's minimum Android SDK level is 24 or higher.
|
||||
public interface Supplier<T> {
|
||||
|
||||
T get();
|
||||
|
||||
}
|
|
@ -38,12 +38,7 @@ public class WaitForClosingStreamElementTest extends AbstractSmackLowLevelIntegr
|
|||
|
||||
Field closingStreamReceivedField = AbstractXMPPConnection.class.getDeclaredField("closingStreamReceived");
|
||||
closingStreamReceivedField.setAccessible(true);
|
||||
SynchronizationPoint<?> closingStreamReceived = (SynchronizationPoint<?>) closingStreamReceivedField.get(connection);
|
||||
Exception failureException = closingStreamReceived.getFailureException();
|
||||
if (failureException != null) {
|
||||
throw new AssertionError("Sync poing yielded failure exception", failureException);
|
||||
}
|
||||
boolean closingStreamReceivedSuccessful = closingStreamReceived.wasSuccessful();
|
||||
assertTrue(closingStreamReceivedSuccessful);
|
||||
boolean closingStreamReceived = (boolean) closingStreamReceivedField.get(connection);
|
||||
assertTrue(closingStreamReceived);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,10 +26,9 @@ import java.util.ArrayList;
|
|||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
||||
import org.jivesoftware.smack.SmackException.ConnectionException;
|
||||
import org.jivesoftware.smack.SmackException.EndpointConnectionException;
|
||||
import org.jivesoftware.smack.SynchronizationPoint;
|
||||
import org.jivesoftware.smack.c2s.internal.ModularXmppClientToServerConnectionInternal;
|
||||
import org.jivesoftware.smack.fsm.StateTransitionResult;
|
||||
import org.jivesoftware.smack.tcp.XmppTcpTransportModule.EstablishingTcpConnectionState;
|
||||
import org.jivesoftware.smack.tcp.rce.Rfc6120TcpRemoteConnectionEndpoint;
|
||||
import org.jivesoftware.smack.util.Async;
|
||||
|
@ -48,7 +47,10 @@ public final class ConnectionAttemptState {
|
|||
final SocketChannel socketChannel;
|
||||
|
||||
final List<RemoteConnectionException<?>> connectionExceptions;
|
||||
final SynchronizationPoint<ConnectionException> tcpConnectionEstablishedSyncPoint;
|
||||
|
||||
EndpointConnectionException connectionException;
|
||||
boolean connected;
|
||||
long deadline;
|
||||
|
||||
final Iterator<Rfc6120TcpRemoteConnectionEndpoint> connectionEndpointIterator;
|
||||
/** The current connection endpoint we are trying */
|
||||
|
@ -65,17 +67,32 @@ public final class ConnectionAttemptState {
|
|||
socketChannel = SocketChannel.open();
|
||||
socketChannel.configureBlocking(false);
|
||||
|
||||
connectionEndpointIterator = discoveredEndpoints.result.discoveredRemoteConnectionEndpoints.iterator();
|
||||
List<Rfc6120TcpRemoteConnectionEndpoint> endpoints = discoveredEndpoints.result.discoveredRemoteConnectionEndpoints;
|
||||
connectionEndpointIterator = endpoints.iterator();
|
||||
connectionEndpoint = connectionEndpointIterator.next();
|
||||
connectionExceptions = new ArrayList<>(discoveredEndpoints.result.discoveredRemoteConnectionEndpoints.size());
|
||||
|
||||
tcpConnectionEstablishedSyncPoint = new SynchronizationPoint<>(connectionInternal.connection,
|
||||
"TCP connection establishment");
|
||||
connectionExceptions = new ArrayList<>(endpoints.size());
|
||||
}
|
||||
|
||||
void establishTcpConnection() {
|
||||
StateTransitionResult.Failure establishTcpConnection() throws InterruptedException {
|
||||
RemoteConnectionEndpoint.InetSocketAddressCoupling<Rfc6120TcpRemoteConnectionEndpoint> address = nextAddress();
|
||||
establishTcpConnection(address);
|
||||
|
||||
synchronized (this) {
|
||||
while (!connected && connectionException == null) {
|
||||
final long now = System.currentTimeMillis();
|
||||
if (now >= deadline) {
|
||||
return new StateTransitionResult.FailureCausedByTimeout("Timeout waiting to establish connection");
|
||||
}
|
||||
wait (deadline - now);
|
||||
}
|
||||
}
|
||||
if (connected) {
|
||||
assert connectionException == null;
|
||||
// Success case: we have been able to establish a connection to one remote endpoint.
|
||||
return null;
|
||||
}
|
||||
|
||||
return new StateTransitionResult.FailureCausedByException<Exception>(connectionException);
|
||||
}
|
||||
|
||||
private void establishTcpConnection(
|
||||
|
@ -84,8 +101,10 @@ public final class ConnectionAttemptState {
|
|||
establishingTcpConnectionState, address);
|
||||
connectionInternal.invokeConnectionStateMachineListener(connectingToHostEvent);
|
||||
|
||||
final boolean connected;
|
||||
final InetSocketAddress inetSocketAddress = address.getInetSocketAddress();
|
||||
// TODO: Should use "connect timeout" instead of reply timeout. But first connect timeout needs to be moved from
|
||||
// XMPPTCPConnectionConfiguration. into XMPPConnectionConfiguration.
|
||||
deadline = System.currentTimeMillis() + connectionInternal.connection.getReplyTimeout();
|
||||
try {
|
||||
connected = socketChannel.connect(inetSocketAddress);
|
||||
} catch (IOException e) {
|
||||
|
@ -98,7 +117,9 @@ public final class ConnectionAttemptState {
|
|||
establishingTcpConnectionState, address, true);
|
||||
connectionInternal.invokeConnectionStateMachineListener(connectedToHostEvent);
|
||||
|
||||
tcpConnectionEstablishedSyncPoint.reportSuccess();
|
||||
synchronized (this) {
|
||||
notifyAll();
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -124,9 +145,10 @@ public final class ConnectionAttemptState {
|
|||
establishingTcpConnectionState, address, false);
|
||||
connectionInternal.invokeConnectionStateMachineListener(connectedToHostEvent);
|
||||
|
||||
// Do not set 'state' here, since this is processed by a reactor thread, which doesn't hold
|
||||
// the objects lock.
|
||||
tcpConnectionEstablishedSyncPoint.reportSuccess();
|
||||
connected = true;
|
||||
synchronized (ConnectionAttemptState.this) {
|
||||
notifyAll();
|
||||
}
|
||||
});
|
||||
} catch (ClosedChannelException e) {
|
||||
onIOExceptionWhenEstablishingTcpConnection(e, address);
|
||||
|
@ -137,14 +159,14 @@ public final class ConnectionAttemptState {
|
|||
RemoteConnectionEndpoint.InetSocketAddressCoupling<Rfc6120TcpRemoteConnectionEndpoint> failedAddress) {
|
||||
RemoteConnectionEndpoint.InetSocketAddressCoupling<Rfc6120TcpRemoteConnectionEndpoint> nextInetSocketAddress = nextAddress();
|
||||
if (nextInetSocketAddress == null) {
|
||||
EndpointConnectionException connectionException = EndpointConnectionException.from(
|
||||
connectionException = EndpointConnectionException.from(
|
||||
discoveredEndpoints.result.lookupFailures, connectionExceptions);
|
||||
tcpConnectionEstablishedSyncPoint.reportFailure(connectionException);
|
||||
synchronized (this) {
|
||||
notifyAll();
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
tcpConnectionEstablishedSyncPoint.resetTimeout();
|
||||
|
||||
RemoteConnectionException<Rfc6120TcpRemoteConnectionEndpoint> rce = new RemoteConnectionException<>(
|
||||
failedAddress, exception);
|
||||
connectionExceptions.add(rce);
|
||||
|
|
|
@ -26,11 +26,6 @@ import java.io.Writer;
|
|||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.Socket;
|
||||
import java.security.KeyManagementException;
|
||||
import java.security.KeyStoreException;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
import java.security.NoSuchProviderException;
|
||||
import java.security.UnrecoverableKeyException;
|
||||
import java.security.cert.CertificateException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
|
@ -44,7 +39,6 @@ import java.util.concurrent.ArrayBlockingQueue;
|
|||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.Semaphore;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.logging.Level;
|
||||
|
@ -65,13 +59,12 @@ import org.jivesoftware.smack.SmackException.AlreadyConnectedException;
|
|||
import org.jivesoftware.smack.SmackException.AlreadyLoggedInException;
|
||||
import org.jivesoftware.smack.SmackException.ConnectionException;
|
||||
import org.jivesoftware.smack.SmackException.EndpointConnectionException;
|
||||
import org.jivesoftware.smack.SmackException.NoResponseException;
|
||||
import org.jivesoftware.smack.SmackException.NotConnectedException;
|
||||
import org.jivesoftware.smack.SmackException.NotLoggedInException;
|
||||
import org.jivesoftware.smack.SmackException.SecurityNotPossibleException;
|
||||
import org.jivesoftware.smack.SmackException.SecurityRequiredByServerException;
|
||||
import org.jivesoftware.smack.SmackFuture;
|
||||
import org.jivesoftware.smack.StanzaListener;
|
||||
import org.jivesoftware.smack.SynchronizationPoint;
|
||||
import org.jivesoftware.smack.XMPPConnection;
|
||||
import org.jivesoftware.smack.XMPPException;
|
||||
import org.jivesoftware.smack.XMPPException.FailedNonzaException;
|
||||
|
@ -151,8 +144,6 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
|
|||
|
||||
private SSLSocket secureSocket;
|
||||
|
||||
private final Semaphore readerWriterSemaphore = new Semaphore(2);
|
||||
|
||||
/**
|
||||
* Protected access level because of unit test purposes
|
||||
*/
|
||||
|
@ -166,14 +157,12 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
|
|||
/**
|
||||
*
|
||||
*/
|
||||
private final SynchronizationPoint<XMPPException> maybeCompressFeaturesReceived = new SynchronizationPoint<XMPPException>(
|
||||
this, "stream compression feature");
|
||||
private boolean streamFeaturesAfterAuthenticationReceived;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
private final SynchronizationPoint<SmackException> compressSyncPoint = new SynchronizationPoint<>(
|
||||
this, "stream compression");
|
||||
private boolean compressSyncPoint;
|
||||
|
||||
/**
|
||||
* The default bundle and defer callback, used for new connections.
|
||||
|
@ -197,15 +186,14 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
|
|||
/**
|
||||
* The stream ID of the stream that is currently resumable, ie. the stream we hold the state
|
||||
* for in {@link #clientHandledStanzasCount}, {@link #serverHandledStanzasCount} and
|
||||
* {@link #unacknowledgedStanzas}.
|
||||
* {@link #unFailedNonzaExceptionacknowledgedStanzas}.
|
||||
*/
|
||||
private String smSessionId;
|
||||
|
||||
private final SynchronizationPoint<FailedNonzaException> smResumedSyncPoint = new SynchronizationPoint<>(
|
||||
this, "stream resumed element");
|
||||
private SyncPointState smResumedSyncPoint;
|
||||
private Failed smResumptionFailed;
|
||||
|
||||
private final SynchronizationPoint<SmackException> smEnabledSyncPoint = new SynchronizationPoint<>(
|
||||
this, "stream enabled element");
|
||||
private boolean smEnabledSyncPoint;
|
||||
|
||||
/**
|
||||
* The client's preferred maximum resumption time in seconds.
|
||||
|
@ -381,15 +369,17 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
|
|||
// Wait for stream features after the authentication.
|
||||
// TODO: The name of this synchronization point "maybeCompressFeaturesReceived" is not perfect. It should be
|
||||
// renamed to "streamFeaturesAfterAuthenticationReceived".
|
||||
maybeCompressFeaturesReceived.checkIfSuccessOrWait();
|
||||
waitForConditionOrThrowConnectionException(() -> streamFeaturesAfterAuthenticationReceived, "compress features from server");
|
||||
|
||||
// If compression is enabled then request the server to use stream compression. XEP-170
|
||||
// recommends to perform stream compression before resource binding.
|
||||
maybeEnableCompression();
|
||||
|
||||
if (isSmResumptionPossible()) {
|
||||
smResumedSyncPoint.sendAndWaitForResponse(new Resume(clientHandledStanzasCount, smSessionId));
|
||||
if (smResumedSyncPoint.wasSuccessful()) {
|
||||
smResumedSyncPoint = SyncPointState.request_sent;
|
||||
sendNonza(new Resume(clientHandledStanzasCount, smSessionId));
|
||||
waitForCondition(() -> smResumedSyncPoint == SyncPointState.successful || smResumptionFailed != null, "resume previous stream");
|
||||
if (smResumedSyncPoint == SyncPointState.successful) {
|
||||
// We successfully resumed the stream, be done here
|
||||
afterSuccessfulLogin(true);
|
||||
return;
|
||||
|
@ -397,7 +387,8 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
|
|||
// SM resumption failed, what Smack does here is to report success of
|
||||
// lastFeaturesReceived in case of sm resumption was answered with 'failed' so that
|
||||
// normal resource binding can be tried.
|
||||
LOGGER.fine("Stream resumption failed, continuing with normal stream establishment process");
|
||||
assert smResumptionFailed != null;
|
||||
LOGGER.fine("Stream resumption failed, continuing with normal stream establishment process: " + smResumptionFailed);
|
||||
}
|
||||
|
||||
List<Stanza> previouslyUnackedStanzas = new LinkedList<Stanza>();
|
||||
|
@ -421,9 +412,10 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
|
|||
if (isSmAvailable() && useSm) {
|
||||
// Remove what is maybe left from previously stream managed sessions
|
||||
serverHandledStanzasCount = 0;
|
||||
sendNonza(new Enable(useSmResumption, smClientMaxResumptionTime));
|
||||
// XEP-198 3. Enabling Stream Management. If the server response to 'Enable' is 'Failed'
|
||||
// then this is a non recoverable error and we therefore throw an exception.
|
||||
smEnabledSyncPoint.sendAndWaitForResponseOrThrow(new Enable(useSmResumption, smClientMaxResumptionTime));
|
||||
waitForConditionOrThrowConnectionException(() -> smEnabledSyncPoint, "enabling stream mangement");
|
||||
synchronized (requestAckPredicates) {
|
||||
if (requestAckPredicates.isEmpty()) {
|
||||
// Assure that we have at lest one predicate set up that so that we request acks
|
||||
|
@ -503,9 +495,14 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
|
|||
|
||||
setWasAuthenticated();
|
||||
|
||||
// Wait for reader and writer threads to be terminated.
|
||||
readerWriterSemaphore.acquireUninterruptibly(2);
|
||||
readerWriterSemaphore.release(2);
|
||||
try {
|
||||
boolean readerAndWriterThreadsTermianted = waitForCondition(() -> !packetWriter.running && !packetReader.running);
|
||||
if (!readerAndWriterThreadsTermianted) {
|
||||
LOGGER.severe("Reader and writer threads did not terminate");
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
LOGGER.log(Level.FINE, "Interrupted while waiting for reader and writer threads to terminate", e);
|
||||
}
|
||||
|
||||
if (disconnectedButResumeable) {
|
||||
return;
|
||||
|
@ -536,10 +533,10 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
|
|||
@Override
|
||||
protected void initState() {
|
||||
super.initState();
|
||||
maybeCompressFeaturesReceived.init();
|
||||
compressSyncPoint.init();
|
||||
smResumedSyncPoint.init();
|
||||
smEnabledSyncPoint.init();
|
||||
streamFeaturesAfterAuthenticationReceived = compressSyncPoint = false;
|
||||
smResumedSyncPoint = SyncPointState.initial;
|
||||
smResumptionFailed = null;
|
||||
smEnabledSyncPoint = false;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -652,15 +649,6 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
|
|||
// Set the reader and writer instance variables
|
||||
initReaderAndWriter();
|
||||
|
||||
int availableReaderWriterSemaphorePermits = readerWriterSemaphore.availablePermits();
|
||||
if (availableReaderWriterSemaphorePermits < 2) {
|
||||
Object[] logObjects = new Object[] {
|
||||
this,
|
||||
availableReaderWriterSemaphorePermits,
|
||||
};
|
||||
LOGGER.log(Level.FINE, "Not every reader/writer threads where terminated on connection re-initializtion of {0}. Available permits {1}", logObjects);
|
||||
}
|
||||
readerWriterSemaphore.acquire(2);
|
||||
// Start the writer thread. This will open an XMPP stream to the server
|
||||
packetWriter.init();
|
||||
// Start the reader thread. The startup() method will block until we
|
||||
|
@ -688,17 +676,11 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
|
|||
* existing plain connection and perform a handshake. This method won't return until the
|
||||
* connection has finished the handshake or an error occurred while securing the connection.
|
||||
* @throws IOException if an I/O error occurred.
|
||||
* @throws CertificateException
|
||||
* @throws NoSuchAlgorithmException if no such algorithm is available.
|
||||
* @throws NoSuchProviderException
|
||||
* @throws KeyStoreException
|
||||
* @throws UnrecoverableKeyException
|
||||
* @throws KeyManagementException if there was a key mangement error.
|
||||
* @throws SmackException if Smack detected an exceptional situation.
|
||||
* @throws Exception if an exception occurs.
|
||||
* @throws SecurityNotPossibleException if TLS is not possible.
|
||||
* @throws CertificateException if there is an issue with the certificate.
|
||||
*/
|
||||
@SuppressWarnings("LiteralClassName")
|
||||
private void proceedTLSReceived() throws NoSuchAlgorithmException, CertificateException, IOException, KeyStoreException, NoSuchProviderException, UnrecoverableKeyException, KeyManagementException, SmackException {
|
||||
private void proceedTLSReceived() throws IOException, SecurityNotPossibleException, CertificateException {
|
||||
SmackTlsContext smackTlsContext = getSmackTlsContext();
|
||||
|
||||
Socket plain = socket;
|
||||
|
@ -773,7 +755,7 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
|
|||
|
||||
@Override
|
||||
public boolean isUsingCompression() {
|
||||
return compressionHandler != null && compressSyncPoint.wasSuccessful();
|
||||
return compressionHandler != null && compressSyncPoint;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -791,10 +773,10 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
|
|||
*
|
||||
* @throws NotConnectedException if the XMPP connection is not connected.
|
||||
* @throws SmackException if Smack detected an exceptional situation.
|
||||
* @throws NoResponseException if there was no response from the remote entity.
|
||||
* @throws InterruptedException if the calling thread was interrupted.
|
||||
* @throws XMPPException if an XMPP protocol error was received.
|
||||
*/
|
||||
private void maybeEnableCompression() throws SmackException, InterruptedException {
|
||||
private void maybeEnableCompression() throws SmackException, InterruptedException, XMPPException {
|
||||
if (!config.isCompressionEnabled()) {
|
||||
return;
|
||||
}
|
||||
|
@ -807,7 +789,8 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
|
|||
// If stream compression was offered by the server and we want to use
|
||||
// compression then send compression request to the server
|
||||
if ((compressionHandler = maybeGetCompressionHandler(compression)) != null) {
|
||||
compressSyncPoint.sendAndWaitForResponseOrThrow(new Compress(compressionHandler.getCompressionMethod()));
|
||||
sendNonza(new Compress(compressionHandler.getCompressionMethod()));
|
||||
waitForConditionOrThrowConnectionException(() -> compressSyncPoint, "establishing stream compression");
|
||||
} else {
|
||||
LOGGER.warning("Could not enable compression because no matching handler/method pair was found");
|
||||
}
|
||||
|
@ -835,11 +818,11 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
|
|||
// We connected successfully to the servers TCP port
|
||||
initConnection();
|
||||
|
||||
// TLS handled will be successful either if TLS was established, or if it was not mandatory.
|
||||
tlsHandled.checkIfSuccessOrWaitOrThrow();
|
||||
// TLS handled will be true either if TLS was established, or if it was not mandatory.
|
||||
waitForConditionOrThrowConnectionException(() -> tlsHandled, "establishing TLS");
|
||||
|
||||
// Wait with SASL auth until the SASL mechanisms have been received
|
||||
saslFeatureReceived.checkIfSuccessOrWaitOrThrow();
|
||||
waitForConditionOrThrowConnectionException(() -> saslFeatureReceived, "SASL mechanisms stream feature from server");
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -857,24 +840,28 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
|
|||
if (startTlsFeature != null) {
|
||||
if (startTlsFeature.required() && config.getSecurityMode() == SecurityMode.disabled) {
|
||||
SecurityRequiredByServerException smackException = new SecurityRequiredByServerException();
|
||||
tlsHandled.reportFailure(smackException);
|
||||
currentSmackException = smackException;
|
||||
notifyWaitingThreads();
|
||||
throw smackException;
|
||||
}
|
||||
|
||||
if (config.getSecurityMode() != ConnectionConfiguration.SecurityMode.disabled) {
|
||||
sendNonza(new StartTls());
|
||||
} else {
|
||||
tlsHandled.reportSuccess();
|
||||
tlsHandled = true;
|
||||
notifyWaitingThreads();
|
||||
}
|
||||
} else {
|
||||
tlsHandled.reportSuccess();
|
||||
tlsHandled = true;
|
||||
notifyWaitingThreads();
|
||||
}
|
||||
|
||||
if (isSaslAuthenticated()) {
|
||||
// If we have received features after the SASL has been successfully completed, then we
|
||||
// have also *maybe* received, as it is an optional feature, the compression feature
|
||||
// from the server.
|
||||
maybeCompressFeaturesReceived.reportSuccess();
|
||||
streamFeaturesAfterAuthenticationReceived = true;
|
||||
notifyWaitingThreads();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -899,6 +886,8 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
|
|||
|
||||
private volatile boolean done;
|
||||
|
||||
private boolean running;
|
||||
|
||||
/**
|
||||
* Initializes the reader in order to be used. The reader is initialized during the
|
||||
* first connection and when reconnecting due to an abruptly disconnection.
|
||||
|
@ -910,11 +899,13 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
|
|||
@Override
|
||||
public void run() {
|
||||
LOGGER.finer(threadName + " start");
|
||||
running = true;
|
||||
try {
|
||||
parsePackets();
|
||||
} finally {
|
||||
LOGGER.finer(threadName + " exit");
|
||||
XMPPTCPConnection.this.readerWriterSemaphore.release();
|
||||
running = false;
|
||||
notifyWaitingThreads();
|
||||
}
|
||||
}
|
||||
}, threadName);
|
||||
|
@ -931,10 +922,8 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
|
|||
* Parse top-level packets in order to process them further.
|
||||
*/
|
||||
private void parsePackets() {
|
||||
boolean initialStreamOpenSend = false;
|
||||
try {
|
||||
openStreamAndResetParser();
|
||||
initialStreamOpenSend = true;
|
||||
XmlPullParser.Event eventType = parser.getEventType();
|
||||
while (!done) {
|
||||
switch (eventType) {
|
||||
|
@ -955,27 +944,21 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
|
|||
break;
|
||||
case "error":
|
||||
StreamError streamError = PacketParserUtils.parseStreamError(parser);
|
||||
saslFeatureReceived.reportFailure(new StreamErrorException(streamError));
|
||||
currentXmppException = new StreamErrorException(streamError);
|
||||
// Mark the tlsHandled sync point as success, we will use the saslFeatureReceived sync
|
||||
// point to report the error, which is checked immediately after tlsHandled in
|
||||
// connectInternal().
|
||||
tlsHandled.reportSuccess();
|
||||
tlsHandled = true;
|
||||
notifyWaiters();
|
||||
throw new StreamErrorException(streamError);
|
||||
case "features":
|
||||
parseFeaturesAndNotify(parser);
|
||||
break;
|
||||
case "proceed":
|
||||
try {
|
||||
// Secure the connection by negotiating TLS
|
||||
proceedTLSReceived();
|
||||
// Send a new opening stream to the server
|
||||
openStreamAndResetParser();
|
||||
}
|
||||
catch (Exception e) {
|
||||
SmackException.SmackWrappedException smackException = new SmackException.SmackWrappedException(e);
|
||||
tlsHandled.reportFailure(smackException);
|
||||
throw e;
|
||||
}
|
||||
// Secure the connection by negotiating TLS
|
||||
proceedTLSReceived();
|
||||
// Send a new opening stream to the server
|
||||
openStreamAndResetParser();
|
||||
break;
|
||||
case "failure":
|
||||
String namespace = parser.getNamespace(null);
|
||||
|
@ -989,8 +972,8 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
|
|||
// situation. It is still possible to authenticate and
|
||||
// use the connection but using an uncompressed connection
|
||||
// TODO Parse failure stanza
|
||||
compressSyncPoint.reportFailure(new SmackException.SmackMessageException(
|
||||
"Could not establish compression"));
|
||||
currentSmackException = new SmackException.SmackMessageException("Could not establish compression");
|
||||
notifyWaitingThreads();
|
||||
break;
|
||||
default:
|
||||
parseAndProcessNonza(parser);
|
||||
|
@ -1004,7 +987,8 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
|
|||
// Send a new opening stream to the server
|
||||
openStreamAndResetParser();
|
||||
// Notify that compression is being used
|
||||
compressSyncPoint.reportSuccess();
|
||||
compressSyncPoint = true;
|
||||
notifyWaitingThreads();
|
||||
break;
|
||||
case Enabled.ELEMENT:
|
||||
Enabled enabled = ParseStreamManagement.enabled(parser);
|
||||
|
@ -1012,7 +996,7 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
|
|||
smSessionId = enabled.getId();
|
||||
if (StringUtils.isNullOrEmpty(smSessionId)) {
|
||||
SmackException xmppException = new SmackException.SmackMessageException("Stream Management 'enabled' element with resume attribute but without session id received");
|
||||
smEnabledSyncPoint.reportFailure(xmppException);
|
||||
setCurrentConnectionExceptionAndNotify(xmppException);
|
||||
throw xmppException;
|
||||
}
|
||||
smServerMaxResumptionTime = enabled.getMaxResumptionTime();
|
||||
|
@ -1022,28 +1006,18 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
|
|||
}
|
||||
clientHandledStanzasCount = 0;
|
||||
smWasEnabledAtLeastOnce = true;
|
||||
smEnabledSyncPoint.reportSuccess();
|
||||
LOGGER.fine("Stream Management (XEP-198): successfully enabled");
|
||||
smEnabledSyncPoint = true;
|
||||
notifyWaitingThreads();
|
||||
break;
|
||||
case Failed.ELEMENT:
|
||||
Failed failed = ParseStreamManagement.failed(parser);
|
||||
FailedNonzaException xmppException = new FailedNonzaException(failed, failed.getStanzaErrorCondition());
|
||||
// If only XEP-198 would specify different failure elements for the SM
|
||||
// enable and SM resume failure case. But this is not the case, so we
|
||||
// need to determine if this is a 'Failed' response for either 'Enable'
|
||||
// or 'Resume'.
|
||||
if (smResumedSyncPoint.requestSent()) {
|
||||
smResumedSyncPoint.reportFailure(xmppException);
|
||||
}
|
||||
else {
|
||||
if (!smEnabledSyncPoint.requestSent()) {
|
||||
throw new IllegalStateException("Failed element received but SM was not previously enabled");
|
||||
}
|
||||
smEnabledSyncPoint.reportFailure(new SmackException.SmackWrappedException(xmppException));
|
||||
// Report success for last lastFeaturesReceived so that in case a
|
||||
// failed resumption, we can continue with normal resource binding.
|
||||
// See text of XEP-198 5. below Example 11.
|
||||
lastFeaturesReceived.reportSuccess();
|
||||
if (smResumedSyncPoint == SyncPointState.request_sent) {
|
||||
// This is a <failed/> nonza in a response to resuming a previous stream, failure to do
|
||||
// so is non-fatal as we can simply continue with resource binding in this case.
|
||||
smResumptionFailed = failed;
|
||||
} else {
|
||||
FailedNonzaException xmppException = new FailedNonzaException(failed, failed.getStanzaErrorCondition());
|
||||
setCurrentConnectionExceptionAndNotify(xmppException);
|
||||
}
|
||||
break;
|
||||
case Resumed.ELEMENT:
|
||||
|
@ -1052,7 +1026,7 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
|
|||
throw new StreamIdDoesNotMatchException(smSessionId, resumed.getPrevId());
|
||||
}
|
||||
// Mark SM as enabled
|
||||
smEnabledSyncPoint.reportSuccess();
|
||||
smEnabledSyncPoint = true;
|
||||
// First, drop the stanzas already handled by the server
|
||||
processHandledCount(resumed.getHandledCount());
|
||||
// Then re-send what is left in the unacknowledged queue
|
||||
|
@ -1068,8 +1042,8 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
|
|||
requestSmAcknowledgementInternal();
|
||||
}
|
||||
// Mark SM resumption as successful
|
||||
smResumedSyncPoint.reportSuccess();
|
||||
LOGGER.fine("Stream Management (XEP-198): Stream resumed");
|
||||
smResumedSyncPoint = SyncPointState.successful;
|
||||
notifyWaitingThreads();
|
||||
break;
|
||||
case AckAnswer.ELEMENT:
|
||||
AckAnswer ackAnswer = ParseStreamManagement.ackAnswer(parser);
|
||||
|
@ -1077,7 +1051,7 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
|
|||
break;
|
||||
case AckRequest.ELEMENT:
|
||||
ParseStreamManagement.ackRequest(parser);
|
||||
if (smEnabledSyncPoint.wasSuccessful()) {
|
||||
if (smEnabledSyncPoint) {
|
||||
sendSmAcknowledgementInternal();
|
||||
} else {
|
||||
LOGGER.warning("SM Ack Request received while SM is not enabled");
|
||||
|
@ -1101,7 +1075,8 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
|
|||
// did re-start the queue again, causing this writer to assume that the queue is not
|
||||
// shutdown, which results in a call to disconnect().
|
||||
final boolean queueWasShutdown = packetWriter.queue.isShutdown();
|
||||
closingStreamReceived.reportSuccess();
|
||||
closingStreamReceived = true;
|
||||
notifyWaitingThreads();
|
||||
|
||||
if (queueWasShutdown) {
|
||||
// We received a closing stream element *after* we initiated the
|
||||
|
@ -1137,12 +1112,9 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
|
|||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
// TODO: Move the call closingStreamReceived.reportFailure(e) into notifyConnectionError?
|
||||
closingStreamReceived.reportFailure(e);
|
||||
// The exception can be ignored if the the connection is 'done'
|
||||
// or if the it was caused because the socket got closed. It can not be ignored if it
|
||||
// happened before (or while) the initial stream opened was send.
|
||||
if (!(done || packetWriter.queue.isShutdown()) || !initialStreamOpenSend) {
|
||||
// or if the it was caused because the socket got closed.
|
||||
if (!done) {
|
||||
// Close the connection and notify connection listeners of the
|
||||
// error.
|
||||
notifyConnectionError(e);
|
||||
|
@ -1161,12 +1133,6 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
|
|||
private final ArrayBlockingQueueWithShutdown<Element> queue = new ArrayBlockingQueueWithShutdown<>(
|
||||
QUEUE_SIZE, true);
|
||||
|
||||
/**
|
||||
* Needs to be protected for unit testing purposes.
|
||||
*/
|
||||
protected SynchronizationPoint<NoResponseException> shutdownDone = new SynchronizationPoint<>(
|
||||
XMPPTCPConnection.this, "shutdown completed");
|
||||
|
||||
/**
|
||||
* If set, the stanza writer is shut down
|
||||
*/
|
||||
|
@ -1184,12 +1150,13 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
|
|||
*/
|
||||
private boolean shouldBundleAndDefer;
|
||||
|
||||
private boolean running;
|
||||
|
||||
/**
|
||||
* Initializes the writer in order to be used. It is called at the first connection and also
|
||||
* is invoked if the connection is disconnected by an error.
|
||||
*/
|
||||
void init() {
|
||||
shutdownDone.init();
|
||||
shutdownTimestamp = null;
|
||||
|
||||
if (unacknowledgedStanzas != null) {
|
||||
|
@ -1204,11 +1171,13 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
|
|||
@Override
|
||||
public void run() {
|
||||
LOGGER.finer(threadName + " start");
|
||||
running = true;
|
||||
try {
|
||||
writePackets();
|
||||
} finally {
|
||||
LOGGER.finer(threadName + " exit");
|
||||
XMPPTCPConnection.this.readerWriterSemaphore.release();
|
||||
running = false;
|
||||
notifyWaitingThreads();
|
||||
}
|
||||
}
|
||||
}, threadName);
|
||||
|
@ -1262,13 +1231,6 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
|
|||
instantShutdown = instant;
|
||||
queue.shutdown();
|
||||
shutdownTimestamp = System.currentTimeMillis();
|
||||
if (shutdownDone.isNotInInitialState()) {
|
||||
try {
|
||||
shutdownDone.checkIfSuccessOrWait();
|
||||
} catch (NoResponseException | InterruptedException e) {
|
||||
LOGGER.log(Level.WARNING, "shutdownDone was not marked as successful by the writer thread", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1407,9 +1369,6 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
|
|||
} else {
|
||||
LOGGER.log(Level.FINE, "Ignoring Exception in writePackets()", e);
|
||||
}
|
||||
} finally {
|
||||
LOGGER.fine("Reporting shutdownDone success in writer thread");
|
||||
shutdownDone.reportSuccess();
|
||||
}
|
||||
// Delay notifyConnectionError after shutdownDone has been reported in the finally block.
|
||||
if (writerException != null) {
|
||||
|
@ -1721,7 +1680,7 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
|
|||
* @return true if Stream Management was negotiated.
|
||||
*/
|
||||
public boolean isSmEnabled() {
|
||||
return smEnabledSyncPoint.wasSuccessful();
|
||||
return smEnabledSyncPoint;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1730,7 +1689,7 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
|
|||
* @return true if the stream was resumed.
|
||||
*/
|
||||
public boolean streamWasResumed() {
|
||||
return smResumedSyncPoint.wasSuccessful();
|
||||
return smResumedSyncPoint == SyncPointState.successful;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -47,17 +47,13 @@ import javax.net.ssl.SSLSession;
|
|||
|
||||
import org.jivesoftware.smack.ConnectionConfiguration.SecurityMode;
|
||||
import org.jivesoftware.smack.SmackException;
|
||||
import org.jivesoftware.smack.SmackException.ConnectionException;
|
||||
import org.jivesoftware.smack.SmackException.ConnectionUnexpectedTerminatedException;
|
||||
import org.jivesoftware.smack.SmackException.NoResponseException;
|
||||
import org.jivesoftware.smack.SmackException.NotConnectedException;
|
||||
import org.jivesoftware.smack.SmackException.SecurityRequiredByClientException;
|
||||
import org.jivesoftware.smack.SmackException.SecurityRequiredByServerException;
|
||||
import org.jivesoftware.smack.SmackException.SmackWrappedException;
|
||||
import org.jivesoftware.smack.SmackException.SmackCertificateException;
|
||||
import org.jivesoftware.smack.SmackFuture;
|
||||
import org.jivesoftware.smack.SmackFuture.InternalSmackFuture;
|
||||
import org.jivesoftware.smack.SmackReactor.SelectionKeyAttachment;
|
||||
import org.jivesoftware.smack.XMPPException.FailedNonzaException;
|
||||
import org.jivesoftware.smack.XMPPException;
|
||||
import org.jivesoftware.smack.XmppInputOutputFilter;
|
||||
import org.jivesoftware.smack.c2s.ModularXmppClientToServerConnection.ConnectedButUnauthenticatedStateDescriptor;
|
||||
import org.jivesoftware.smack.c2s.ModularXmppClientToServerConnection.LookupRemoteConnectionEndpointsStateDescriptor;
|
||||
|
@ -744,23 +740,14 @@ public class XmppTcpTransportModule extends ModularXmppClientToServerConnectionM
|
|||
|
||||
@Override
|
||||
public StateTransitionResult.AttemptResult transitionInto(WalkStateGraphContext walkStateGraphContext)
|
||||
throws InterruptedException, ConnectionUnexpectedTerminatedException, NotConnectedException,
|
||||
NoResponseException, IOException {
|
||||
throws InterruptedException, IOException, SmackException, XMPPException {
|
||||
// The fields inetSocketAddress and failedAddresses are handed over from LookupHostAddresses to
|
||||
// ConnectingToHost.
|
||||
ConnectionAttemptState connectionAttemptState = new ConnectionAttemptState(connectionInternal, discoveredTcpEndpoints,
|
||||
this);
|
||||
connectionAttemptState.establishTcpConnection();
|
||||
|
||||
try {
|
||||
connectionAttemptState.tcpConnectionEstablishedSyncPoint.checkIfSuccessOrWaitOrThrow();
|
||||
} catch (ConnectionException | NoResponseException e) {
|
||||
// TODO: It is not really elegant that we catch the exception here. Ideally ConnectionAttemptState would
|
||||
// simply return a StateTranstionResult.FailureCausedByException.
|
||||
return new StateTransitionResult.FailureCausedByException<>(e);
|
||||
} catch (SmackWrappedException e) {
|
||||
// Should never throw SmackWrappedException.
|
||||
throw new AssertionError(e);
|
||||
StateTransitionResult.Failure failure = connectionAttemptState.establishTcpConnection();
|
||||
if (failure != null) {
|
||||
return failure;
|
||||
}
|
||||
|
||||
socketChannel = connectionAttemptState.socketChannel;
|
||||
|
@ -858,8 +845,7 @@ public class XmppTcpTransportModule extends ModularXmppClientToServerConnectionM
|
|||
|
||||
@Override
|
||||
public StateTransitionResult.AttemptResult transitionInto(WalkStateGraphContext walkStateGraphContext)
|
||||
throws SmackWrappedException, FailedNonzaException, IOException, InterruptedException,
|
||||
ConnectionUnexpectedTerminatedException, NoResponseException, NotConnectedException {
|
||||
throws IOException, InterruptedException, SmackException, XMPPException {
|
||||
connectionInternal.sendAndWaitForResponse(StartTls.INSTANCE, TlsProceed.class, TlsFailure.class);
|
||||
|
||||
SmackTlsContext smackTlsContext = connectionInternal.getSmackTlsContext();
|
||||
|
@ -881,7 +867,7 @@ public class XmppTcpTransportModule extends ModularXmppClientToServerConnectionM
|
|||
try {
|
||||
tlsState.waitForHandshakeFinished();
|
||||
} catch (CertificateException e) {
|
||||
throw new SmackWrappedException(e);
|
||||
throw new SmackCertificateException(e);
|
||||
}
|
||||
|
||||
connectionInternal.newStreamOpenWaitForFeaturesSequence("stream features after TLS established");
|
||||
|
@ -1166,43 +1152,20 @@ public class XmppTcpTransportModule extends ModularXmppClientToServerConnectionM
|
|||
private void handleSslException(SSLException e) {
|
||||
handshakeException = e;
|
||||
handshakeStatus = TlsHandshakeStatus.failed;
|
||||
synchronized (this) {
|
||||
notifyAll();
|
||||
}
|
||||
connectionInternal.notifyWaitingThreads();
|
||||
}
|
||||
|
||||
private void onHandshakeFinished() {
|
||||
handshakeStatus = TlsHandshakeStatus.successful;
|
||||
synchronized (this) {
|
||||
notifyAll();
|
||||
}
|
||||
connectionInternal.notifyWaitingThreads();
|
||||
}
|
||||
|
||||
private boolean isHandshakeFinished() {
|
||||
return handshakeStatus == TlsHandshakeStatus.successful || handshakeStatus == TlsHandshakeStatus.failed;
|
||||
}
|
||||
|
||||
private void waitForHandshakeFinished() throws InterruptedException, CertificateException, SSLException, ConnectionUnexpectedTerminatedException, NoResponseException {
|
||||
final long deadline = System.currentTimeMillis() + connectionInternal.connection.getReplyTimeout();
|
||||
|
||||
Exception currentConnectionException = null;
|
||||
synchronized (this) {
|
||||
while (!isHandshakeFinished()
|
||||
&& (currentConnectionException = connectionInternal.getCurrentConnectionException()) == null) {
|
||||
final long now = System.currentTimeMillis();
|
||||
if (now >= deadline)
|
||||
break;
|
||||
wait(deadline - now);
|
||||
}
|
||||
}
|
||||
|
||||
if (currentConnectionException != null) {
|
||||
throw new SmackException.ConnectionUnexpectedTerminatedException(currentConnectionException);
|
||||
}
|
||||
|
||||
if (!isHandshakeFinished()) {
|
||||
throw NoResponseException.newWith(connectionInternal.connection, "TLS handshake to finish");
|
||||
}
|
||||
private void waitForHandshakeFinished() throws InterruptedException, CertificateException, SSLException, SmackException, XMPPException {
|
||||
connectionInternal.waitForCondition(() -> isHandshakeFinished(), "TLS handshake to finish");
|
||||
|
||||
if (handshakeStatus == TlsHandshakeStatus.failed) {
|
||||
throw handshakeException;
|
||||
|
@ -1235,7 +1198,7 @@ public class XmppTcpTransportModule extends ModularXmppClientToServerConnectionM
|
|||
|
||||
@Override
|
||||
public void waitUntilInputOutputClosed() throws IOException, CertificateException, InterruptedException,
|
||||
ConnectionUnexpectedTerminatedException, NoResponseException {
|
||||
SmackException, XMPPException {
|
||||
waitForHandshakeFinished();
|
||||
}
|
||||
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
/**
|
||||
*
|
||||
* Copyright 2014-2019 Florian Schmaus
|
||||
* Copyright 2014-2020 Florian Schmaus
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
@ -124,8 +124,6 @@ public class PacketWriterTest {
|
|||
// Not really cool, but may increases the chances for 't' to block in sendStanza.
|
||||
Thread.sleep(250);
|
||||
|
||||
// Set to true for testing purposes, so that shutdown() won't wait packet writer
|
||||
pw.shutdownDone.reportSuccess();
|
||||
// Shutdown the packetwriter, this will also interrupt the writer thread, which is what we hope to happen in the
|
||||
// thread created above.
|
||||
pw.shutdown(false);
|
||||
|
|
Loading…
Reference in a new issue