Synchronize PacketWriter shutdown

try the best to send the queue elements and the closing stream element.
This commit is contained in:
Florian Schmaus 2014-05-05 17:45:40 +02:00
parent 74d188aabb
commit 07d6b9203c
2 changed files with 30 additions and 3 deletions

View File

@ -23,6 +23,8 @@ import org.jivesoftware.smack.util.ArrayBlockingQueueWithShutdown;
import java.io.IOException; import java.io.IOException;
import java.io.Writer; import java.io.Writer;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
/** /**
@ -48,6 +50,8 @@ class PacketWriter {
volatile boolean done; volatile boolean done;
AtomicBoolean shutdownDone = new AtomicBoolean(false);
/** /**
* Creates a new packet writer with the specified connection. * Creates a new packet writer with the specified connection.
* *
@ -65,6 +69,7 @@ class PacketWriter {
protected void init() { protected void init() {
this.writer = connection.writer; this.writer = connection.writer;
done = false; done = false;
shutdownDone.set(false);
queue.start(); queue.start();
writerThread = new Thread() { writerThread = new Thread() {
@ -115,6 +120,16 @@ class PacketWriter {
public void shutdown() { public void shutdown() {
done = true; done = true;
queue.shutdown(); queue.shutdown();
synchronized(shutdownDone) {
if (!shutdownDone.get()) {
try {
shutdownDone.wait(connection.getPacketReplyTimeout());
}
catch (InterruptedException e) {
LOGGER.log(Level.WARNING, "shutdown", e);
}
}
}
} }
/** /**
@ -163,7 +178,7 @@ class PacketWriter {
writer.flush(); writer.flush();
} }
catch (Exception e) { catch (Exception e) {
LOGGER.warning("Error flushing queue during shutdown, ignore and continue"); LOGGER.log(Level.WARNING, "Exception flushing queue during shutdown, ignore and continue", e);
} }
// Delete the queue contents (hopefully nothing is left). // Delete the queue contents (hopefully nothing is left).
@ -175,7 +190,8 @@ class PacketWriter {
writer.flush(); writer.flush();
} }
catch (Exception e) { catch (Exception e) {
// Do nothing LOGGER.log(Level.WARNING, "Exception writing closing stream element", e);
} }
finally { finally {
try { try {
@ -185,6 +201,11 @@ class PacketWriter {
// Do nothing // Do nothing
} }
} }
shutdownDone.set(true);
synchronized(shutdownDone) {
shutdownDone.notify();
}
} }
catch (IOException ioe) { catch (IOException ioe) {
// The exception can be ignored if the the connection is 'done' // The exception can be ignored if the the connection is 'done'
@ -213,4 +234,5 @@ class PacketWriter {
writer.write(stream.toString()); writer.write(stream.toString());
writer.flush(); writer.flush();
} }
} }

View File

@ -83,6 +83,8 @@ public class PacketWriterTest {
// Not really cool, but may increases the chances for 't' to block in sendPacket. // Not really cool, but may increases the chances for 't' to block in sendPacket.
Thread.sleep(250); Thread.sleep(250);
// Set to true for testing purposes, so that shutdown() won't wait packet writer
pw.shutdownDone.set(true);
// Shutdown the packetwriter // Shutdown the packetwriter
pw.shutdown(); pw.shutdown();
shutdown = true; shutdown = true;
@ -90,13 +92,16 @@ public class PacketWriterTest {
if (prematureUnblocked) { if (prematureUnblocked) {
fail("Should not unblock before the thread got shutdown"); fail("Should not unblock before the thread got shutdown");
} }
synchronized (t) {
t.notify();
}
} }
public class BlockingStringWriter extends Writer { public class BlockingStringWriter extends Writer {
@Override @Override
public void write(char[] cbuf, int off, int len) throws IOException { public void write(char[] cbuf, int off, int len) throws IOException {
try { try {
Thread.sleep(Integer.MAX_VALUE); wait();
} }
catch (InterruptedException e) { catch (InterruptedException e) {
} }