XMPPTCPConnection: Move openStream() call from writer into reader thread

and also call notifyConnectionError() on exception thrown by
openStream().

In hindsight I wonder why openStream() was ever called in the writer
thread, as it only caused unnecessary synchronization overhead, as can
be seen by the initialOpenStreamSend synchronization point.
This commit is contained in:
Florian Schmaus 2019-04-29 18:37:36 +02:00
parent ae2c57f56b
commit b3646abecd
2 changed files with 62 additions and 23 deletions

View File

@ -160,9 +160,6 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
*/
protected final PacketReader packetReader = new PacketReader();
private final SynchronizationPoint<Exception> initialOpenStreamSend = new SynchronizationPoint<>(
this, "initial open stream element send to server");
/**
*
*/
@ -536,7 +533,6 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
compressSyncPoint.init();
smResumedSyncPoint.init();
smEnabledSyncPoint.init();
initialOpenStreamSend.init();
}
@Override
@ -898,8 +894,10 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
* Parse top-level packets in order to process them further.
*/
private void parsePackets() {
boolean initialStreamOpenSend = false;
try {
initialOpenStreamSend.checkIfSuccessOrWait();
openStream();
initialStreamOpenSend = true;
int eventType = parser.getEventType();
while (!done) {
switch (eventType) {
@ -1119,8 +1117,9 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
catch (Exception e) {
closingStreamReceived.reportFailure(e);
// The exception can be ignored if the the connection is 'done'
// or if the it was caused because the socket got closed
if (!(done || packetWriter.queue.isShutdown())) {
// or if the it was caused because the socket got closed. It can not be ignored if it
// happened before (or while) the initial stream opened was send.
if (!(done || packetWriter.queue.isShutdown()) || !initialStreamOpenSend) {
// Close the connection and notify connection listeners of the
// error.
notifyConnectionError(e);
@ -1275,8 +1274,6 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
private void writePackets() {
Exception writerException = null;
try {
openStream();
initialOpenStreamSend.reportSuccess();
// Write out packets from the queue.
while (!done()) {
Element element = nextStreamElement();

View File

@ -16,13 +16,16 @@
*/
package org.jivesoftware.smack.tcp;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.io.Writer;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicReference;
import org.jivesoftware.smack.SmackException;
import org.jivesoftware.smack.SmackException.NotConnectedException;
import org.jivesoftware.smack.packet.Message;
import org.jivesoftware.smack.tcp.XMPPTCPConnection.PacketWriter;
@ -45,19 +48,24 @@ public class PacketWriterTest {
* @throws NotConnectedException
* @throws XmppStringprepException
*/
@SuppressWarnings("javadoc")
@Test
public void shouldBlockAndUnblockTest() throws InterruptedException, BrokenBarrierException, NotConnectedException, XmppStringprepException {
XMPPTCPConnection connection = new XMPPTCPConnection("user", "pass", "example.org");
final PacketWriter pw = connection.packetWriter;
connection.setWriter(new BlockingStringWriter());
BlockingStringWriter blockingStringWriter = new BlockingStringWriter();
connection.setWriter(blockingStringWriter);
connection.packetWriter.init();
for (int i = 0; i < XMPPTCPConnection.PacketWriter.QUEUE_SIZE; i++) {
// Now insert QUEUE_SIZE + 1 stanzas into the outgoing queue to make sure that the queue is filled until its
// full capacity. The +1 is because the writer thread will dequeue one stanza and try to write it into the
// blocking writer.
for (int i = 0; i < XMPPTCPConnection.PacketWriter.QUEUE_SIZE + 1; i++) {
pw.sendStreamElement(new Message());
}
final CyclicBarrier barrier = new CyclicBarrier(2);
final AtomicReference<Exception> unexpectedThreadExceptionReference = new AtomicReference<>();
final AtomicReference<Exception> expectedThreadExceptionReference = new AtomicReference<>();
shutdown = false;
prematureUnblocked = false;
Thread t = new Thread(new Runnable() {
@ -71,12 +79,19 @@ public class PacketWriterTest {
prematureUnblocked = true;
}
}
catch (Exception e) {
catch (SmackException.NotConnectedException e) {
// This is the exception we expect.
expectedThreadExceptionReference.set(e);
}
catch (BrokenBarrierException | InterruptedException e) {
unexpectedThreadExceptionReference.set(e);
}
try {
barrier.await();
}
catch (InterruptedException | BrokenBarrierException e) {
unexpectedThreadExceptionReference.set(e);
}
}
});
@ -90,29 +105,56 @@ public class PacketWriterTest {
// Set to true for testing purposes, so that shutdown() won't wait packet writer
pw.shutdownDone.reportSuccess();
// Shutdown the packetwriter
// Shutdown the packetwriter, this will also interrupt the writer thread, which is what we hope to happen in the
// thread created above.
pw.shutdown(false);
shutdown = true;
barrier.await();
if (prematureUnblocked) {
fail("Should not unblock before the thread got shutdown");
t.join(60000);
Exception unexpectedThreadException = unexpectedThreadExceptionReference.get();
try {
if (prematureUnblocked) {
String failureMessage = "Should not unblock before the thread got shutdown.";
if (unexpectedThreadException != null) {
failureMessage += " Unexpected thread exception thrown: " + unexpectedThreadException;
}
fail(failureMessage);
}
else if (unexpectedThreadException != null) {
fail("Unexpected thread exception: " + unexpectedThreadException);
}
assertNotNull("Did not encounter expected exception on sendStreamElement()", expectedThreadExceptionReference.get());
}
synchronized (t) {
t.notify();
finally {
blockingStringWriter.unblock();
}
}
public static class BlockingStringWriter extends Writer {
private boolean blocked = true;
@Override
@SuppressWarnings("WaitNotInLoop")
public void write(char[] cbuf, int off, int len) throws IOException {
try {
wait();
}
catch (InterruptedException e) {
synchronized (this) {
while (blocked) {
try {
wait();
}
catch (InterruptedException e) {
throw new AssertionError(e);
}
}
}
}
public synchronized void unblock() {
blocked = false;
notify();
}
@Override
public void flush() throws IOException {
}