Merge branch '4.3'

This commit is contained in:
Florian Schmaus 2019-02-09 20:15:09 +01:00
commit 163ca2b009
6 changed files with 124 additions and 46 deletions

View File

@ -76,6 +76,7 @@ import org.jivesoftware.smack.SmackException.NotLoggedInException;
import org.jivesoftware.smack.SmackException.ResourceBindingNotOfferedException; import org.jivesoftware.smack.SmackException.ResourceBindingNotOfferedException;
import org.jivesoftware.smack.SmackException.SecurityRequiredByClientException; import org.jivesoftware.smack.SmackException.SecurityRequiredByClientException;
import org.jivesoftware.smack.SmackException.SecurityRequiredException; import org.jivesoftware.smack.SmackException.SecurityRequiredException;
import org.jivesoftware.smack.SmackException.SmackWrappedException;
import org.jivesoftware.smack.SmackFuture.InternalSmackFuture; import org.jivesoftware.smack.SmackFuture.InternalSmackFuture;
import org.jivesoftware.smack.XMPPException.FailedNonzaException; import org.jivesoftware.smack.XMPPException.FailedNonzaException;
import org.jivesoftware.smack.XMPPException.StreamErrorException; import org.jivesoftware.smack.XMPPException.StreamErrorException;
@ -276,7 +277,7 @@ public abstract class AbstractXMPPConnection implements XMPPConnection {
* stanza is send by the server. This is set to true once the last feature stanza has been * stanza is send by the server. This is set to true once the last feature stanza has been
* parsed. * parsed.
*/ */
protected final SynchronizationPoint<Exception> lastFeaturesReceived = new SynchronizationPoint<Exception>( protected final SynchronizationPoint<SmackException> lastFeaturesReceived = new SynchronizationPoint<>(
AbstractXMPPConnection.this, "last stream features received from server"); AbstractXMPPConnection.this, "last stream features received from server");
/** /**
@ -612,7 +613,7 @@ public abstract class AbstractXMPPConnection implements XMPPConnection {
// - the servers last features stanza has been parsed // - the servers last features stanza has been parsed
// - the timeout occurs // - the timeout occurs
LOGGER.finer("Waiting for last features to be received before continuing with resource binding"); LOGGER.finer("Waiting for last features to be received before continuing with resource binding");
lastFeaturesReceived.checkIfSuccessOrWait(); lastFeaturesReceived.checkIfSuccessOrWaitOrThrow();
if (!hasFeature(Bind.ELEMENT, Bind.NAMESPACE)) { if (!hasFeature(Bind.ELEMENT, Bind.NAMESPACE)) {
@ -841,7 +842,10 @@ public abstract class AbstractXMPPConnection implements XMPPConnection {
for (StanzaCollector collector : collectors) { for (StanzaCollector collector : collectors) {
collector.notifyConnectionError(exception); collector.notifyConnectionError(exception);
} }
// TODO: We should also notify things like the SASL authentication machinery about the exception. SmackWrappedException smackWrappedException = new SmackWrappedException(exception);
tlsHandled.reportGenericFailure(smackWrappedException);
saslFeatureReceived.reportGenericFailure(smackWrappedException);
lastFeaturesReceived.reportGenericFailure(smackWrappedException);
// Closes the connection temporary. A if the connection supports stream management, then a reconnection is // Closes the connection temporary. A if the connection supports stream management, then a reconnection is
// possible. Note that a connection listener of e.g. XMPPTCPConnection will drop the SM state in // possible. Note that a connection listener of e.g. XMPPTCPConnection will drop the SM state in

View File

@ -356,4 +356,16 @@ public class SmackException extends Exception {
super("Resource binding was not offered by server"); super("Resource binding was not offered by server");
} }
} }
public static class SmackWrappedException extends SmackException {
/**
*
*/
private static final long serialVersionUID = 1L;
public SmackWrappedException(Exception exception) {
super(exception);
}
}
} }

View File

@ -1,6 +1,6 @@
/** /**
* *
* Copyright © 2014-2018 Florian Schmaus * Copyright © 2014-2019 Florian Schmaus
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -22,6 +22,7 @@ import java.util.concurrent.locks.Lock;
import org.jivesoftware.smack.SmackException.NoResponseException; import org.jivesoftware.smack.SmackException.NoResponseException;
import org.jivesoftware.smack.SmackException.NotConnectedException; import org.jivesoftware.smack.SmackException.NotConnectedException;
import org.jivesoftware.smack.SmackException.SmackWrappedException;
import org.jivesoftware.smack.packet.Nonza; import org.jivesoftware.smack.packet.Nonza;
import org.jivesoftware.smack.packet.Stanza; import org.jivesoftware.smack.packet.Stanza;
import org.jivesoftware.smack.packet.TopLevelStreamElement; import org.jivesoftware.smack.packet.TopLevelStreamElement;
@ -37,6 +38,7 @@ public class SynchronizationPoint<E extends Exception> {
// same memory synchronization effects as synchronization block enter and leave. // same memory synchronization effects as synchronization block enter and leave.
private State state; private State state;
private E failureException; private E failureException;
private SmackWrappedException smackWrappedExcpetion;
private volatile long waitStart; private volatile long waitStart;
@ -62,6 +64,7 @@ public class SynchronizationPoint<E extends Exception> {
connectionLock.lock(); connectionLock.lock();
state = State.Initial; state = State.Initial;
failureException = null; failureException = null;
smackWrappedExcpetion = null;
connectionLock.unlock(); connectionLock.unlock();
} }
@ -74,7 +77,7 @@ public class SynchronizationPoint<E extends Exception> {
* @throws InterruptedException if the connection is interrupted. * @throws InterruptedException if the connection is interrupted.
* @return <code>null</code> if synchronization point was successful, or the failure Exception. * @return <code>null</code> if synchronization point was successful, or the failure Exception.
*/ */
public E sendAndWaitForResponse(TopLevelStreamElement request) throws NoResponseException, public Exception sendAndWaitForResponse(TopLevelStreamElement request) throws NoResponseException,
NotConnectedException, InterruptedException { NotConnectedException, InterruptedException {
assert (state == State.Initial); assert (state == State.Initial);
connectionLock.lock(); connectionLock.lock();
@ -106,15 +109,14 @@ public class SynchronizationPoint<E extends Exception> {
* @throws NoResponseException if no response was received. * @throws NoResponseException if no response was received.
* @throws NotConnectedException if the connection is not connected. * @throws NotConnectedException if the connection is not connected.
* @throws InterruptedException if the connection is interrupted. * @throws InterruptedException if the connection is interrupted.
* @throws SmackWrappedException in case of a wrapped exception;
*/ */
public void sendAndWaitForResponseOrThrow(Nonza request) throws E, NoResponseException, public void sendAndWaitForResponseOrThrow(Nonza request) throws E, NoResponseException,
NotConnectedException, InterruptedException { NotConnectedException, InterruptedException, SmackWrappedException {
sendAndWaitForResponse(request); sendAndWaitForResponse(request);
switch (state) { switch (state) {
case Failure: case Failure:
if (failureException != null) { throwException();
throw failureException;
}
break; break;
default: default:
// Success, do nothing // Success, do nothing
@ -126,11 +128,12 @@ public class SynchronizationPoint<E extends Exception> {
* @throws NoResponseException if there was no response marking the synchronization point as success or failed. * @throws NoResponseException if there was no response marking the synchronization point as success or failed.
* @throws E if there was a failure * @throws E if there was a failure
* @throws InterruptedException if the connection is interrupted. * @throws InterruptedException if the connection is interrupted.
* @throws SmackWrappedException in case of a wrapped exception;
*/ */
public void checkIfSuccessOrWaitOrThrow() throws NoResponseException, E, InterruptedException { public void checkIfSuccessOrWaitOrThrow() throws NoResponseException, E, InterruptedException, SmackWrappedException {
checkIfSuccessOrWait(); checkIfSuccessOrWait();
if (state == State.Failure) { if (state == State.Failure) {
throw failureException; throwException();
} }
} }
@ -140,7 +143,7 @@ public class SynchronizationPoint<E extends Exception> {
* @throws InterruptedException * @throws InterruptedException
* @return <code>null</code> if synchronization point was successful, or the failure Exception. * @return <code>null</code> if synchronization point was successful, or the failure Exception.
*/ */
public E checkIfSuccessOrWait() throws NoResponseException, InterruptedException { public Exception checkIfSuccessOrWait() throws NoResponseException, InterruptedException {
connectionLock.lock(); connectionLock.lock();
try { try {
switch (state) { switch (state) {
@ -148,7 +151,7 @@ public class SynchronizationPoint<E extends Exception> {
case Success: case Success:
return null; return null;
case Failure: case Failure:
return failureException; return getException();
default: default:
// Do nothing // Do nothing
break; break;
@ -201,6 +204,24 @@ public class SynchronizationPoint<E extends Exception> {
} }
} }
/**
* Report this synchronization point as failed because of the given exception. The {@code failureException} must be set.
*
* @param exception the exception causing this synchronization point to fail.
*/
public void reportGenericFailure(SmackWrappedException exception) {
assert exception != null;
connectionLock.lock();
try {
state = State.Failure;
this.smackWrappedExcpetion = exception;
condition.signalAll();
}
finally {
connectionLock.unlock();
}
}
/** /**
* Check if this synchronization point was successful. * Check if this synchronization point was successful.
* *
@ -216,6 +237,16 @@ public class SynchronizationPoint<E extends Exception> {
} }
} }
public boolean isNotInInitialState() {
connectionLock.lock();
try {
return state != State.Initial;
}
finally {
connectionLock.unlock();
}
}
/** /**
* Check if this synchronization point has its request already sent. * Check if this synchronization point has its request already sent.
* *
@ -273,6 +304,20 @@ public class SynchronizationPoint<E extends Exception> {
} }
} }
private Exception getException() {
if (failureException != null) {
return failureException;
}
return smackWrappedExcpetion;
}
private void throwException() throws E, SmackWrappedException {
if (failureException != null) {
throw failureException;
}
throw smackWrappedExcpetion;
}
/** /**
* Check for a response and throw a {@link NoResponseException} if there was none. * Check for a response and throw a {@link NoResponseException} if there was none.
* <p> * <p>
@ -281,7 +326,7 @@ public class SynchronizationPoint<E extends Exception> {
* @return <code>true</code> if synchronization point was successful, <code>false</code> on failure. * @return <code>true</code> if synchronization point was successful, <code>false</code> on failure.
* @throws NoResponseException * @throws NoResponseException
*/ */
private E checkForResponse() throws NoResponseException { private Exception checkForResponse() throws NoResponseException {
switch (state) { switch (state) {
case Initial: case Initial:
case NoResponse: case NoResponse:
@ -290,7 +335,7 @@ public class SynchronizationPoint<E extends Exception> {
case Success: case Success:
return null; return null;
case Failure: case Failure:
return failureException; return getException();
default: default:
throw new AssertionError("Unknown state " + state); throw new AssertionError("Unknown state " + state);
} }

View File

@ -44,6 +44,7 @@ import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level; import java.util.logging.Level;
@ -148,15 +149,17 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
private SSLSocket secureSocket; private SSLSocket secureSocket;
/** private final Semaphore readerWriterSemaphore = new Semaphore(2);
* Protected access level because of unit test purposes
*/
protected PacketWriter packetWriter;
/** /**
* Protected access level because of unit test purposes * Protected access level because of unit test purposes
*/ */
protected PacketReader packetReader; protected final PacketWriter packetWriter = new PacketWriter();
/**
* Protected access level because of unit test purposes
*/
protected final PacketReader packetReader = new PacketReader();
private final SynchronizationPoint<Exception> initialOpenStreamSend = new SynchronizationPoint<>( private final SynchronizationPoint<Exception> initialOpenStreamSend = new SynchronizationPoint<>(
this, "initial open stream element send to server"); this, "initial open stream element send to server");
@ -464,20 +467,16 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
// First shutdown the writer, this will result in a closing stream element getting send to // First shutdown the writer, this will result in a closing stream element getting send to
// the server // the server
if (packetWriter != null) { LOGGER.finer("PacketWriter shutdown()");
LOGGER.finer("PacketWriter shutdown()"); packetWriter.shutdown(instant);
packetWriter.shutdown(instant);
}
LOGGER.finer("PacketWriter has been shut down"); LOGGER.finer("PacketWriter has been shut down");
if (!instant) { if (!instant) {
waitForClosingStreamTagFromServer(); waitForClosingStreamTagFromServer();
} }
if (packetReader != null) { LOGGER.finer("PacketReader shutdown()");
LOGGER.finer("PacketReader shutdown()"); packetReader.shutdown();
packetReader.shutdown();
}
LOGGER.finer("PacketReader has been shut down"); LOGGER.finer("PacketReader has been shut down");
CloseableUtil.maybeClose(socket, LOGGER); CloseableUtil.maybeClose(socket, LOGGER);
@ -599,18 +598,23 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
* @throws XMPPException if establishing a connection to the server fails. * @throws XMPPException if establishing a connection to the server fails.
* @throws SmackException if the server fails to respond back or if there is anther error. * @throws SmackException if the server fails to respond back or if there is anther error.
* @throws IOException * @throws IOException
* @throws InterruptedException
*/ */
private void initConnection() throws IOException { private void initConnection() throws IOException, InterruptedException {
boolean isFirstInitialization = packetReader == null || packetWriter == null;
compressionHandler = null; compressionHandler = null;
// Set the reader and writer instance variables // Set the reader and writer instance variables
initReaderAndWriter(); initReaderAndWriter();
if (isFirstInitialization) { int availableReaderWriterSemaphorePermits = readerWriterSemaphore.availablePermits();
packetWriter = new PacketWriter(); if (availableReaderWriterSemaphorePermits < 2) {
packetReader = new PacketReader(); Object[] logObjects = new Object[] {
this,
availableReaderWriterSemaphorePermits,
};
LOGGER.log(Level.FINE, "Not every reader/writer threads where terminated on connection re-initializtion of {0}. Available permits {1}", logObjects);
} }
readerWriterSemaphore.acquire(2);
// Start the writer thread. This will open an XMPP stream to the server // Start the writer thread. This will open an XMPP stream to the server
packetWriter.init(); packetWriter.init();
// Start the reader thread. The startup() method will block until we // Start the reader thread. The startup() method will block until we
@ -841,7 +845,11 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
Async.go(new Runnable() { Async.go(new Runnable() {
@Override @Override
public void run() { public void run() {
parsePackets(); try {
parsePackets();
} finally {
XMPPTCPConnection.this.readerWriterSemaphore.release();
}
} }
}, "Smack Reader (" + getConnectionCounter() + ")"); }, "Smack Reader (" + getConnectionCounter() + ")");
} }
@ -1132,7 +1140,11 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
Async.go(new Runnable() { Async.go(new Runnable() {
@Override @Override
public void run() { public void run() {
writePackets(); try {
writePackets();
} finally {
XMPPTCPConnection.this.readerWriterSemaphore.release();
}
} }
}, "Smack Writer (" + getConnectionCounter() + ")"); }, "Smack Writer (" + getConnectionCounter() + ")");
} }
@ -1185,11 +1197,12 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
instantShutdown = instant; instantShutdown = instant;
queue.shutdown(); queue.shutdown();
shutdownTimestamp = System.currentTimeMillis(); shutdownTimestamp = System.currentTimeMillis();
try { if (shutdownDone.isNotInInitialState()) {
shutdownDone.checkIfSuccessOrWait(); try {
} shutdownDone.checkIfSuccessOrWait();
catch (NoResponseException | InterruptedException e) { } catch (NoResponseException | InterruptedException e) {
LOGGER.log(Level.WARNING, "shutdownDone was not marked as successful by the writer thread", e); LOGGER.log(Level.WARNING, "shutdownDone was not marked as successful by the writer thread", e);
}
} }
} }

View File

@ -62,6 +62,7 @@ import org.jivesoftware.smack.SmackException.NoResponseException;
import org.jivesoftware.smack.SmackException.NotConnectedException; import org.jivesoftware.smack.SmackException.NotConnectedException;
import org.jivesoftware.smack.SmackException.SecurityRequiredByClientException; import org.jivesoftware.smack.SmackException.SecurityRequiredByClientException;
import org.jivesoftware.smack.SmackException.SecurityRequiredByServerException; import org.jivesoftware.smack.SmackException.SecurityRequiredByServerException;
import org.jivesoftware.smack.SmackException.SmackWrappedException;
import org.jivesoftware.smack.SmackReactor.ChannelSelectedCallback; import org.jivesoftware.smack.SmackReactor.ChannelSelectedCallback;
import org.jivesoftware.smack.SmackReactor.SelectionKeyAttachment; import org.jivesoftware.smack.SmackReactor.SelectionKeyAttachment;
import org.jivesoftware.smack.SynchronizationPoint; import org.jivesoftware.smack.SynchronizationPoint;
@ -920,7 +921,12 @@ public class XmppNioTcpConnection extends AbstractXmppNioConnection {
failedAddresses, this); failedAddresses, this);
connectionAttemptState.establishTcpConnection(); connectionAttemptState.establishTcpConnection();
connectionAttemptState.tcpConnectionEstablishedSyncPoint.checkIfSuccessOrWaitOrThrow(); try {
connectionAttemptState.tcpConnectionEstablishedSyncPoint.checkIfSuccessOrWaitOrThrow();
} catch (SmackWrappedException e) {
// Should never throw SmackWrappedException.
throw new AssertionError(e);
}
socketChannel = connectionAttemptState.socketChannel; socketChannel = connectionAttemptState.socketChannel;
remoteAddress = (InetSocketAddress) socketChannel.socket().getRemoteSocketAddress(); remoteAddress = (InetSocketAddress) socketChannel.socket().getRemoteSocketAddress();

View File

@ -1,6 +1,6 @@
/** /**
* *
* Copyright 2014 Florian Schmaus * Copyright 2014-2019 Florian Schmaus
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -49,11 +49,9 @@ public class PacketWriterTest {
@Test @Test
public void shouldBlockAndUnblockTest() throws InterruptedException, BrokenBarrierException, NotConnectedException, XmppStringprepException { public void shouldBlockAndUnblockTest() throws InterruptedException, BrokenBarrierException, NotConnectedException, XmppStringprepException {
XMPPTCPConnection connection = new XMPPTCPConnection("user", "pass", "example.org"); XMPPTCPConnection connection = new XMPPTCPConnection("user", "pass", "example.org");
final PacketWriter pw = connection.new PacketWriter(); final PacketWriter pw = connection.packetWriter;
connection.packetWriter = pw;
connection.packetReader = connection.new PacketReader();
connection.setWriter(new BlockingStringWriter()); connection.setWriter(new BlockingStringWriter());
pw.init(); connection.packetWriter.init();
for (int i = 0; i < XMPPTCPConnection.PacketWriter.QUEUE_SIZE; i++) { for (int i = 0; i < XMPPTCPConnection.PacketWriter.QUEUE_SIZE; i++) {
pw.sendStreamElement(new Message()); pw.sendStreamElement(new Message());