From b3646abecdc882011d7245b37dc72927fab29c22 Mon Sep 17 00:00:00 2001 From: Florian Schmaus Date: Mon, 29 Apr 2019 18:37:36 +0200 Subject: [PATCH] XMPPTCPConnection: Move openStream() call from writer into reader thread and also call notifyConnectionError() on exception thrown by openStream(). In hindsight I wonder why openStream() was ever called in the writer thread, as it only caused unnecessary synchronization overhead, as can be seen by the initialOpenStreamSend synchronization point. --- .../smack/tcp/XMPPTCPConnection.java | 15 ++-- .../smack/tcp/PacketWriterTest.java | 70 +++++++++++++++---- 2 files changed, 62 insertions(+), 23 deletions(-) diff --git a/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java b/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java index dd7b7a30a..a44aa7a0a 100644 --- a/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java +++ b/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java @@ -160,9 +160,6 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { */ protected final PacketReader packetReader = new PacketReader(); - private final SynchronizationPoint initialOpenStreamSend = new SynchronizationPoint<>( - this, "initial open stream element send to server"); - /** * */ @@ -536,7 +533,6 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { compressSyncPoint.init(); smResumedSyncPoint.init(); smEnabledSyncPoint.init(); - initialOpenStreamSend.init(); } @Override @@ -898,8 +894,10 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { * Parse top-level packets in order to process them further. */ private void parsePackets() { + boolean initialStreamOpenSend = false; try { - initialOpenStreamSend.checkIfSuccessOrWait(); + openStream(); + initialStreamOpenSend = true; int eventType = parser.getEventType(); while (!done) { switch (eventType) { @@ -1119,8 +1117,9 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { catch (Exception e) { closingStreamReceived.reportFailure(e); // The exception can be ignored if the the connection is 'done' - // or if the it was caused because the socket got closed - if (!(done || packetWriter.queue.isShutdown())) { + // or if the it was caused because the socket got closed. It can not be ignored if it + // happened before (or while) the initial stream opened was send. + if (!(done || packetWriter.queue.isShutdown()) || !initialStreamOpenSend) { // Close the connection and notify connection listeners of the // error. notifyConnectionError(e); @@ -1275,8 +1274,6 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { private void writePackets() { Exception writerException = null; try { - openStream(); - initialOpenStreamSend.reportSuccess(); // Write out packets from the queue. while (!done()) { Element element = nextStreamElement(); diff --git a/smack-tcp/src/test/java/org/jivesoftware/smack/tcp/PacketWriterTest.java b/smack-tcp/src/test/java/org/jivesoftware/smack/tcp/PacketWriterTest.java index c28ec85e4..1053f2645 100644 --- a/smack-tcp/src/test/java/org/jivesoftware/smack/tcp/PacketWriterTest.java +++ b/smack-tcp/src/test/java/org/jivesoftware/smack/tcp/PacketWriterTest.java @@ -16,13 +16,16 @@ */ package org.jivesoftware.smack.tcp; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.fail; import java.io.IOException; import java.io.Writer; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.atomic.AtomicReference; +import org.jivesoftware.smack.SmackException; import org.jivesoftware.smack.SmackException.NotConnectedException; import org.jivesoftware.smack.packet.Message; import org.jivesoftware.smack.tcp.XMPPTCPConnection.PacketWriter; @@ -45,19 +48,24 @@ public class PacketWriterTest { * @throws NotConnectedException * @throws XmppStringprepException */ - @SuppressWarnings("javadoc") @Test public void shouldBlockAndUnblockTest() throws InterruptedException, BrokenBarrierException, NotConnectedException, XmppStringprepException { XMPPTCPConnection connection = new XMPPTCPConnection("user", "pass", "example.org"); final PacketWriter pw = connection.packetWriter; - connection.setWriter(new BlockingStringWriter()); + BlockingStringWriter blockingStringWriter = new BlockingStringWriter(); + connection.setWriter(blockingStringWriter); connection.packetWriter.init(); - for (int i = 0; i < XMPPTCPConnection.PacketWriter.QUEUE_SIZE; i++) { + // Now insert QUEUE_SIZE + 1 stanzas into the outgoing queue to make sure that the queue is filled until its + // full capacity. The +1 is because the writer thread will dequeue one stanza and try to write it into the + // blocking writer. + for (int i = 0; i < XMPPTCPConnection.PacketWriter.QUEUE_SIZE + 1; i++) { pw.sendStreamElement(new Message()); } final CyclicBarrier barrier = new CyclicBarrier(2); + final AtomicReference unexpectedThreadExceptionReference = new AtomicReference<>(); + final AtomicReference expectedThreadExceptionReference = new AtomicReference<>(); shutdown = false; prematureUnblocked = false; Thread t = new Thread(new Runnable() { @@ -71,12 +79,19 @@ public class PacketWriterTest { prematureUnblocked = true; } } - catch (Exception e) { + catch (SmackException.NotConnectedException e) { + // This is the exception we expect. + expectedThreadExceptionReference.set(e); } + catch (BrokenBarrierException | InterruptedException e) { + unexpectedThreadExceptionReference.set(e); + } + try { barrier.await(); } catch (InterruptedException | BrokenBarrierException e) { + unexpectedThreadExceptionReference.set(e); } } }); @@ -90,29 +105,56 @@ public class PacketWriterTest { // Set to true for testing purposes, so that shutdown() won't wait packet writer pw.shutdownDone.reportSuccess(); - // Shutdown the packetwriter + // Shutdown the packetwriter, this will also interrupt the writer thread, which is what we hope to happen in the + // thread created above. pw.shutdown(false); shutdown = true; barrier.await(); - if (prematureUnblocked) { - fail("Should not unblock before the thread got shutdown"); + + t.join(60000); + + Exception unexpectedThreadException = unexpectedThreadExceptionReference.get(); + try { + if (prematureUnblocked) { + String failureMessage = "Should not unblock before the thread got shutdown."; + if (unexpectedThreadException != null) { + failureMessage += " Unexpected thread exception thrown: " + unexpectedThreadException; + } + fail(failureMessage); + } + else if (unexpectedThreadException != null) { + fail("Unexpected thread exception: " + unexpectedThreadException); + } + + assertNotNull("Did not encounter expected exception on sendStreamElement()", expectedThreadExceptionReference.get()); } - synchronized (t) { - t.notify(); + finally { + blockingStringWriter.unblock(); } } public static class BlockingStringWriter extends Writer { + private boolean blocked = true; + @Override - @SuppressWarnings("WaitNotInLoop") public void write(char[] cbuf, int off, int len) throws IOException { - try { - wait(); - } - catch (InterruptedException e) { + synchronized (this) { + while (blocked) { + try { + wait(); + } + catch (InterruptedException e) { + throw new AssertionError(e); + } + } } } + public synchronized void unblock() { + blocked = false; + notify(); + } + @Override public void flush() throws IOException { }