From 7041e90522760acb9aff5c738b8227deb5dffec6 Mon Sep 17 00:00:00 2001 From: Florian Schmaus Date: Wed, 30 Apr 2014 14:05:52 +0200 Subject: [PATCH] Properly sync PacketWriter's queue by using a custom ArrayBlockingQueueWithShutdown. Fixes a race condition where nextpacket() would wait for a notification that would never arrive, because all all put(Packet) calls are still blocking. SMACK-560 --- .../util/ArrayBlockingQueueWithShutdown.java | 495 ++++++++++++++++++ .../org/jivesoftware/smack/PacketWriter.java | 68 ++- .../jivesoftware/smack/PacketWriterTest.java | 112 ++++ 3 files changed, 639 insertions(+), 36 deletions(-) create mode 100644 smack-core/src/main/java/org/jivesoftware/smack/util/ArrayBlockingQueueWithShutdown.java create mode 100644 smack-tcp/src/test/java/org/jivesoftware/smack/PacketWriterTest.java diff --git a/smack-core/src/main/java/org/jivesoftware/smack/util/ArrayBlockingQueueWithShutdown.java b/smack-core/src/main/java/org/jivesoftware/smack/util/ArrayBlockingQueueWithShutdown.java new file mode 100644 index 000000000..99aa50292 --- /dev/null +++ b/smack-core/src/main/java/org/jivesoftware/smack/util/ArrayBlockingQueueWithShutdown.java @@ -0,0 +1,495 @@ +/** + * + * Copyright 2014 Florian Schmaus + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.jivesoftware.smack.util; + +import java.util.AbstractQueue; +import java.util.Collection; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * Like ArrayBlockingQueue but with additional {@link #shutdown()} and {@link #start} methods. Will + * throw {@link InterruptedException} if Queue has been shutdown on {@link #take()} and + * {@link #poll(long, TimeUnit)}. + *

+ * Based on ArrayBlockingQueue of OpenJDK by Doug Lea (who released ArrayBlockingQueue as public + * domain). + * + * @param the type of elements held in this collection + */ +public class ArrayBlockingQueueWithShutdown extends AbstractQueue implements BlockingQueue { + + private final E[] items; + + private int takeIndex; + + private int putIndex; + + private int count; + + private final ReentrantLock lock; + + private final Condition notEmpty; + + private final Condition notFull; + + private volatile boolean isShutdown = false; + + private final int inc(int i) { + return (++i == items.length) ? 0 : i; + } + + private final void insert(E e) { + items[putIndex] = e; + putIndex = inc(putIndex); + count++; + notEmpty.signal(); + } + + private final E extract() { + E e = items[takeIndex]; + items[takeIndex] = null; + takeIndex = inc(takeIndex); + count--; + notFull.signal(); + return e; + } + + private final void removeAt(int i) { + if (i == takeIndex) { + items[takeIndex] = null; + takeIndex = inc(takeIndex); + } + else { + while (true) { + int nexti = inc(i); + if (nexti != putIndex) { + items[i] = items[nexti]; + i = nexti; + } + else { + items[i] = null; + putIndex = i; + break; + } + } + } + count--; + notFull.signal(); + } + + private final static void checkNotNull(Object o) { + if (o == null) { + throw new NullPointerException(); + } + } + + private final void checkNotShutdown() throws InterruptedException { + if (isShutdown) { + throw new InterruptedException(); + } + } + + private final boolean hasNoElements() { + return count == 0; + } + + private final boolean hasElements() { + return !hasNoElements(); + } + + private final boolean isFull() { + return count == items.length; + } + + private final boolean isNotFull() { + return !isFull(); + } + + public ArrayBlockingQueueWithShutdown(int capacity) { + this(capacity, false); + } + + @SuppressWarnings("unchecked") + public ArrayBlockingQueueWithShutdown(int capacity, boolean fair) { + if (capacity <= 0) + throw new IllegalArgumentException(); + items = (E[]) new Object[capacity]; + lock = new ReentrantLock(fair); + notEmpty = lock.newCondition(); + notFull = lock.newCondition(); + } + + /** + * Shutdown the Queue. Will method currently waiting for a not full/empty condition will unblock + * (and usually throw a InterruptedException). + */ + public void shutdown() { + lock.lock(); + try { + isShutdown = true; + notEmpty.signalAll(); + notFull.signalAll(); + } + finally { + lock.unlock(); + } + } + + /** + * Start the queue. Newly created instances will be started automatically, thus this only needs + * to be called after {@link #shutdown()}. + */ + public void start() { + lock.lock(); + try { + isShutdown = false; + } + finally { + lock.unlock(); + } + } + + /** + * Returns true if the queue is currently shut down. + * + * @return true if the queue is shut down. + */ + public boolean isShutdown() { + lock.lock(); + try { + return isShutdown; + } finally { + lock.unlock(); + } + } + + @Override + public E poll() { + lock.lock(); + try { + if (hasNoElements()) { + return null; + } + E e = extract(); + return e; + } + finally { + lock.unlock(); + } + } + + @Override + public E peek() { + lock.lock(); + try { + return hasNoElements() ? null : items[takeIndex]; + } + finally { + lock.unlock(); + } + } + + @Override + public boolean offer(E e) { + checkNotNull(e); + lock.lock(); + try { + if (isFull() || isShutdown) { + return false; + } + else { + insert(e); + return true; + } + } + finally { + lock.unlock(); + } + } + + @Override + public void put(E e) throws InterruptedException { + checkNotNull(e); + lock.lockInterruptibly(); + + try { + while (isFull()) { + try { + notFull.await(); + checkNotShutdown(); + } + catch (InterruptedException ie) { + notFull.signal(); + throw ie; + } + } + insert(e); + } + finally { + lock.unlock(); + } + } + + @Override + public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { + checkNotNull(e); + long nanos = unit.toNanos(timeout); + lock.lockInterruptibly(); + try { + while (true) { + if (isNotFull()) { + insert(e); + return true; + } + if (nanos <= 0) { + return false; + } + try { + nanos = notFull.awaitNanos(nanos); + checkNotShutdown(); + } + catch (InterruptedException ie) { + notFull.signal(); + throw ie; + } + } + } + finally { + lock.unlock(); + } + + } + + @Override + public E take() throws InterruptedException { + lock.lockInterruptibly(); + try { + checkNotShutdown(); + try { + while (hasNoElements()) { + notEmpty.await(); + checkNotShutdown(); + } + } + catch (InterruptedException ie) { + notEmpty.signal(); + throw ie; + } + E e = extract(); + return e; + } + finally { + lock.unlock(); + } + } + + @Override + public E poll(long timeout, TimeUnit unit) throws InterruptedException { + long nanos = unit.toNanos(timeout); + lock.lockInterruptibly(); + try { + checkNotShutdown(); + while (true) { + if (hasElements()) { + E e = extract(); + return e; + } + if (nanos <= 0) { + return null; + } + try { + nanos = notEmpty.awaitNanos(nanos); + checkNotShutdown(); + } + catch (InterruptedException ie) { + notEmpty.signal(); + throw ie; + } + } + } + finally { + lock.unlock(); + } + } + + @Override + public int remainingCapacity() { + lock.lock(); + try { + return items.length - count; + } + finally { + lock.unlock(); + } + } + + @Override + public int drainTo(Collection c) { + checkNotNull(c); + if (c == this) { + throw new IllegalArgumentException(); + } + lock.lock(); + try { + int i = takeIndex; + int n = 0; + for (; n < count; n++) { + c.add(items[i]); + items[i] = null; + i = inc(i); + } + if (n > 0) { + count = 0; + putIndex = 0; + takeIndex = 0; + notFull.signalAll(); + } + return n; + } + finally { + lock.unlock(); + } + } + + @Override + public int drainTo(Collection c, int maxElements) { + checkNotNull(c); + if (c == this) { + throw new IllegalArgumentException(); + } + if (maxElements <= 0) { + return 0; + } + lock.lock(); + try { + int i = takeIndex; + int n = 0; + int max = (maxElements < count) ? maxElements : count; + for (; n < max; n++) { + c.add(items[i]); + items[i] = null; + i = inc(i); + } + if (n > 0) { + count -= n; + takeIndex = i; + notFull.signalAll(); + } + return n; + } + finally { + lock.unlock(); + } + } + + @Override + public int size() { + lock.lock(); + try { + return count; + } + finally { + lock.unlock(); + } + } + + @Override + public Iterator iterator() { + lock.lock(); + try { + return new Itr(); + } + finally { + lock.unlock(); + } + } + + private class Itr implements Iterator { + private int nextIndex; + private E nextItem; + private int lastRet; + + Itr() { + lastRet = -1; + if (count == 0) { + nextIndex = -1; + } + else { + nextIndex = takeIndex; + nextItem = items[takeIndex]; + } + } + + public boolean hasNext() { + return nextIndex >= 0; + } + + private void checkNext() { + if (nextIndex == putIndex) { + nextIndex = -1; + nextItem = null; + } + else { + nextItem = items[nextIndex]; + if (nextItem == null) { + nextIndex = -1; + } + } + } + + public E next() { + lock.lock(); + try { + if (nextIndex < 0) { + throw new NoSuchElementException(); + } + lastRet = nextIndex; + E e = nextItem; + nextIndex = inc(nextIndex); + checkNext(); + return e; + } + finally { + lock.unlock(); + } + } + + public void remove() { + lock.lock(); + try { + int i = lastRet; + if (i < 0) { + throw new IllegalStateException(); + } + lastRet = -1; + int ti = takeIndex; + removeAt(i); + nextIndex = (i == ti) ? takeIndex : i; + checkNext(); + } + finally { + lock.unlock(); + } + } + } + +} diff --git a/smack-tcp/src/main/java/org/jivesoftware/smack/PacketWriter.java b/smack-tcp/src/main/java/org/jivesoftware/smack/PacketWriter.java index d27d68dcd..a620d8e72 100644 --- a/smack-tcp/src/main/java/org/jivesoftware/smack/PacketWriter.java +++ b/smack-tcp/src/main/java/org/jivesoftware/smack/PacketWriter.java @@ -18,11 +18,10 @@ package org.jivesoftware.smack; import org.jivesoftware.smack.packet.Packet; +import org.jivesoftware.smack.util.ArrayBlockingQueueWithShutdown; import java.io.IOException; import java.io.Writer; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; import java.util.logging.Level; import java.util.logging.Logger; @@ -37,12 +36,16 @@ import java.util.logging.Logger; * @author Matt Tucker */ class PacketWriter { + public static final int QUEUE_SIZE = 500; + private static final Logger LOGGER = Logger.getLogger(PacketWriter.class.getName()); - + + private final XMPPTCPConnection connection; + private final ArrayBlockingQueueWithShutdown queue = new ArrayBlockingQueueWithShutdown(QUEUE_SIZE, true); + private Thread writerThread; private Writer writer; - private XMPPTCPConnection connection; - private final BlockingQueue queue; + volatile boolean done; /** @@ -51,7 +54,6 @@ class PacketWriter { * @param connection the connection. */ protected PacketWriter(XMPPTCPConnection connection) { - this.queue = new ArrayBlockingQueue(500, true); this.connection = connection; init(); } @@ -64,6 +66,7 @@ class PacketWriter { this.writer = connection.writer; done = false; + queue.start(); writerThread = new Thread() { public void run() { writePackets(this); @@ -79,17 +82,17 @@ class PacketWriter { * @param packet the packet to send. */ public void sendPacket(Packet packet) { - if (!done) { - try { - queue.put(packet); - } - catch (InterruptedException ie) { - LOGGER.log(Level.SEVERE, "Failed to queue packet to send to server: " + packet.toString(), ie); - return; - } - synchronized (queue) { - queue.notifyAll(); - } + if (done) { + return; + } + + try { + queue.put(packet); + } + catch (InterruptedException ie) { + LOGGER.log(Level.SEVERE, + "Failed to queue packet to send to server: " + packet.toString(), ie); + return; } } @@ -112,9 +115,7 @@ class PacketWriter { */ public void shutdown() { done = true; - synchronized (queue) { - queue.notifyAll(); - } + queue.shutdown(); } /** @@ -123,17 +124,16 @@ class PacketWriter { * @return the next packet for writing. */ private Packet nextPacket() { + if (done) { + return null; + } + Packet packet = null; - // Wait until there's a packet or we're done. - while (!done && (packet = queue.poll()) == null) { - try { - synchronized (queue) { - queue.wait(); - } - } - catch (InterruptedException ie) { - // Do nothing - } + try { + packet = queue.take(); + } + catch (InterruptedException e) { + // Do nothing } return packet; } @@ -191,12 +191,8 @@ class PacketWriter { // The exception can be ignored if the the connection is 'done' // or if the it was caused because the socket got closed if (!(done || connection.isSocketClosed())) { - done = true; - // packetReader could be set to null by an concurrent disconnect() call. - // Therefore Prevent NPE exceptions by checking packetReader. - if (connection.packetReader != null) { - connection.notifyConnectionError(ioe); - } + shutdown(); + connection.notifyConnectionError(ioe); } } } diff --git a/smack-tcp/src/test/java/org/jivesoftware/smack/PacketWriterTest.java b/smack-tcp/src/test/java/org/jivesoftware/smack/PacketWriterTest.java new file mode 100644 index 000000000..ee196a756 --- /dev/null +++ b/smack-tcp/src/test/java/org/jivesoftware/smack/PacketWriterTest.java @@ -0,0 +1,112 @@ +/** + * + * Copyright 2014 Florian Schmaus + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.jivesoftware.smack; + +import java.io.IOException; +import java.io.Writer; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CyclicBarrier; + +import org.jivesoftware.smack.packet.Message; +import org.junit.Test; + +import static org.junit.Assert.fail; + +public class PacketWriterTest { + volatile boolean shutdown; + volatile boolean prematureUnblocked; + + /** + * Make sure that packet writer does block once the queue reaches + * {@link PacketWriter#QUEUE_SIZE} and that + * {@link PacketWriter#sendPacket(org.jivesoftware.smack.packet.Packet)} does unblock after the + * interrupt. + * + * @throws InterruptedException + * @throws BrokenBarrierException + */ + @SuppressWarnings("javadoc") + @Test + public void shouldBlockAndUnblockTest() throws InterruptedException, BrokenBarrierException { + XMPPTCPConnection connection = new XMPPTCPConnection("foobar.com"); + final PacketWriter pw = new PacketWriter(connection); + pw.setWriter(new BlockingStringWriter()); + pw.startup(); + + for (int i = 0; i < PacketWriter.QUEUE_SIZE; i++) { + pw.sendPacket(new Message()); + } + + final CyclicBarrier barrier = new CyclicBarrier(2); + shutdown = false; + prematureUnblocked = false; + Thread t = new Thread(new Runnable() { + @Override + public void run() { + try { + barrier.await(); + } + catch (InterruptedException | BrokenBarrierException e1) { + } + pw.sendPacket(new Message()); + // should only return after the pw was shutdown + if (!shutdown) { + prematureUnblocked = true; + } + try { + barrier.await(); + } + catch (InterruptedException | BrokenBarrierException e) { + } + } + }); + t.start(); + // This barrier is not strictly necessary, but may increases the chances that the threat + // will block before we call shutdown. Otherwise we may get false positives (which is still + // better then false negatives). + barrier.await(); + // Not really cool, but may increases the chances for 't' to block in sendPacket. + Thread.sleep(250); + + // Shutdown the packetwriter + pw.shutdown(); + shutdown = true; + barrier.await(); + if (prematureUnblocked) { + fail("Should not unblock before the thread got shutdown"); + } + } + + public class BlockingStringWriter extends Writer { + @Override + public void write(char[] cbuf, int off, int len) throws IOException { + try { + Thread.sleep(Integer.MAX_VALUE); + } + catch (InterruptedException e) { + } + } + + @Override + public void flush() throws IOException { + } + + @Override + public void close() throws IOException { + } + } +}