1
0
Fork 0
mirror of https://github.com/vanitasvitae/Smack.git synced 2024-12-22 02:27:58 +01:00

Properly sync PacketWriter's queue

by using a custom ArrayBlockingQueueWithShutdown. Fixes a race condition
where nextpacket() would wait for a notification that would never
arrive, because all all put(Packet) calls are still blocking.

SMACK-560
This commit is contained in:
Florian Schmaus 2014-04-30 14:05:52 +02:00
parent ed8b80c2ff
commit 7041e90522
3 changed files with 639 additions and 36 deletions

View file

@ -0,0 +1,495 @@
/**
*
* Copyright 2014 Florian Schmaus
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jivesoftware.smack.util;
import java.util.AbstractQueue;
import java.util.Collection;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* Like ArrayBlockingQueue but with additional {@link #shutdown()} and {@link #start} methods. Will
* throw {@link InterruptedException} if Queue has been shutdown on {@link #take()} and
* {@link #poll(long, TimeUnit)}.
* <p>
* Based on ArrayBlockingQueue of OpenJDK by Doug Lea (who released ArrayBlockingQueue as public
* domain).
*
* @param <E> the type of elements held in this collection
*/
public class ArrayBlockingQueueWithShutdown<E> extends AbstractQueue<E> implements BlockingQueue<E> {
private final E[] items;
private int takeIndex;
private int putIndex;
private int count;
private final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;
private volatile boolean isShutdown = false;
private final int inc(int i) {
return (++i == items.length) ? 0 : i;
}
private final void insert(E e) {
items[putIndex] = e;
putIndex = inc(putIndex);
count++;
notEmpty.signal();
}
private final E extract() {
E e = items[takeIndex];
items[takeIndex] = null;
takeIndex = inc(takeIndex);
count--;
notFull.signal();
return e;
}
private final void removeAt(int i) {
if (i == takeIndex) {
items[takeIndex] = null;
takeIndex = inc(takeIndex);
}
else {
while (true) {
int nexti = inc(i);
if (nexti != putIndex) {
items[i] = items[nexti];
i = nexti;
}
else {
items[i] = null;
putIndex = i;
break;
}
}
}
count--;
notFull.signal();
}
private final static void checkNotNull(Object o) {
if (o == null) {
throw new NullPointerException();
}
}
private final void checkNotShutdown() throws InterruptedException {
if (isShutdown) {
throw new InterruptedException();
}
}
private final boolean hasNoElements() {
return count == 0;
}
private final boolean hasElements() {
return !hasNoElements();
}
private final boolean isFull() {
return count == items.length;
}
private final boolean isNotFull() {
return !isFull();
}
public ArrayBlockingQueueWithShutdown(int capacity) {
this(capacity, false);
}
@SuppressWarnings("unchecked")
public ArrayBlockingQueueWithShutdown(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
items = (E[]) new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
/**
* Shutdown the Queue. Will method currently waiting for a not full/empty condition will unblock
* (and usually throw a InterruptedException).
*/
public void shutdown() {
lock.lock();
try {
isShutdown = true;
notEmpty.signalAll();
notFull.signalAll();
}
finally {
lock.unlock();
}
}
/**
* Start the queue. Newly created instances will be started automatically, thus this only needs
* to be called after {@link #shutdown()}.
*/
public void start() {
lock.lock();
try {
isShutdown = false;
}
finally {
lock.unlock();
}
}
/**
* Returns true if the queue is currently shut down.
*
* @return true if the queue is shut down.
*/
public boolean isShutdown() {
lock.lock();
try {
return isShutdown;
} finally {
lock.unlock();
}
}
@Override
public E poll() {
lock.lock();
try {
if (hasNoElements()) {
return null;
}
E e = extract();
return e;
}
finally {
lock.unlock();
}
}
@Override
public E peek() {
lock.lock();
try {
return hasNoElements() ? null : items[takeIndex];
}
finally {
lock.unlock();
}
}
@Override
public boolean offer(E e) {
checkNotNull(e);
lock.lock();
try {
if (isFull() || isShutdown) {
return false;
}
else {
insert(e);
return true;
}
}
finally {
lock.unlock();
}
}
@Override
public void put(E e) throws InterruptedException {
checkNotNull(e);
lock.lockInterruptibly();
try {
while (isFull()) {
try {
notFull.await();
checkNotShutdown();
}
catch (InterruptedException ie) {
notFull.signal();
throw ie;
}
}
insert(e);
}
finally {
lock.unlock();
}
}
@Override
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
checkNotNull(e);
long nanos = unit.toNanos(timeout);
lock.lockInterruptibly();
try {
while (true) {
if (isNotFull()) {
insert(e);
return true;
}
if (nanos <= 0) {
return false;
}
try {
nanos = notFull.awaitNanos(nanos);
checkNotShutdown();
}
catch (InterruptedException ie) {
notFull.signal();
throw ie;
}
}
}
finally {
lock.unlock();
}
}
@Override
public E take() throws InterruptedException {
lock.lockInterruptibly();
try {
checkNotShutdown();
try {
while (hasNoElements()) {
notEmpty.await();
checkNotShutdown();
}
}
catch (InterruptedException ie) {
notEmpty.signal();
throw ie;
}
E e = extract();
return e;
}
finally {
lock.unlock();
}
}
@Override
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
lock.lockInterruptibly();
try {
checkNotShutdown();
while (true) {
if (hasElements()) {
E e = extract();
return e;
}
if (nanos <= 0) {
return null;
}
try {
nanos = notEmpty.awaitNanos(nanos);
checkNotShutdown();
}
catch (InterruptedException ie) {
notEmpty.signal();
throw ie;
}
}
}
finally {
lock.unlock();
}
}
@Override
public int remainingCapacity() {
lock.lock();
try {
return items.length - count;
}
finally {
lock.unlock();
}
}
@Override
public int drainTo(Collection<? super E> c) {
checkNotNull(c);
if (c == this) {
throw new IllegalArgumentException();
}
lock.lock();
try {
int i = takeIndex;
int n = 0;
for (; n < count; n++) {
c.add(items[i]);
items[i] = null;
i = inc(i);
}
if (n > 0) {
count = 0;
putIndex = 0;
takeIndex = 0;
notFull.signalAll();
}
return n;
}
finally {
lock.unlock();
}
}
@Override
public int drainTo(Collection<? super E> c, int maxElements) {
checkNotNull(c);
if (c == this) {
throw new IllegalArgumentException();
}
if (maxElements <= 0) {
return 0;
}
lock.lock();
try {
int i = takeIndex;
int n = 0;
int max = (maxElements < count) ? maxElements : count;
for (; n < max; n++) {
c.add(items[i]);
items[i] = null;
i = inc(i);
}
if (n > 0) {
count -= n;
takeIndex = i;
notFull.signalAll();
}
return n;
}
finally {
lock.unlock();
}
}
@Override
public int size() {
lock.lock();
try {
return count;
}
finally {
lock.unlock();
}
}
@Override
public Iterator<E> iterator() {
lock.lock();
try {
return new Itr();
}
finally {
lock.unlock();
}
}
private class Itr implements Iterator<E> {
private int nextIndex;
private E nextItem;
private int lastRet;
Itr() {
lastRet = -1;
if (count == 0) {
nextIndex = -1;
}
else {
nextIndex = takeIndex;
nextItem = items[takeIndex];
}
}
public boolean hasNext() {
return nextIndex >= 0;
}
private void checkNext() {
if (nextIndex == putIndex) {
nextIndex = -1;
nextItem = null;
}
else {
nextItem = items[nextIndex];
if (nextItem == null) {
nextIndex = -1;
}
}
}
public E next() {
lock.lock();
try {
if (nextIndex < 0) {
throw new NoSuchElementException();
}
lastRet = nextIndex;
E e = nextItem;
nextIndex = inc(nextIndex);
checkNext();
return e;
}
finally {
lock.unlock();
}
}
public void remove() {
lock.lock();
try {
int i = lastRet;
if (i < 0) {
throw new IllegalStateException();
}
lastRet = -1;
int ti = takeIndex;
removeAt(i);
nextIndex = (i == ti) ? takeIndex : i;
checkNext();
}
finally {
lock.unlock();
}
}
}
}

View file

@ -18,11 +18,10 @@
package org.jivesoftware.smack;
import org.jivesoftware.smack.packet.Packet;
import org.jivesoftware.smack.util.ArrayBlockingQueueWithShutdown;
import java.io.IOException;
import java.io.Writer;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
@ -37,12 +36,16 @@ import java.util.logging.Logger;
* @author Matt Tucker
*/
class PacketWriter {
public static final int QUEUE_SIZE = 500;
private static final Logger LOGGER = Logger.getLogger(PacketWriter.class.getName());
private final XMPPTCPConnection connection;
private final ArrayBlockingQueueWithShutdown<Packet> queue = new ArrayBlockingQueueWithShutdown<Packet>(QUEUE_SIZE, true);
private Thread writerThread;
private Writer writer;
private XMPPTCPConnection connection;
private final BlockingQueue<Packet> queue;
volatile boolean done;
/**
@ -51,7 +54,6 @@ class PacketWriter {
* @param connection the connection.
*/
protected PacketWriter(XMPPTCPConnection connection) {
this.queue = new ArrayBlockingQueue<Packet>(500, true);
this.connection = connection;
init();
}
@ -64,6 +66,7 @@ class PacketWriter {
this.writer = connection.writer;
done = false;
queue.start();
writerThread = new Thread() {
public void run() {
writePackets(this);
@ -79,17 +82,17 @@ class PacketWriter {
* @param packet the packet to send.
*/
public void sendPacket(Packet packet) {
if (!done) {
try {
queue.put(packet);
}
catch (InterruptedException ie) {
LOGGER.log(Level.SEVERE, "Failed to queue packet to send to server: " + packet.toString(), ie);
return;
}
synchronized (queue) {
queue.notifyAll();
}
if (done) {
return;
}
try {
queue.put(packet);
}
catch (InterruptedException ie) {
LOGGER.log(Level.SEVERE,
"Failed to queue packet to send to server: " + packet.toString(), ie);
return;
}
}
@ -112,9 +115,7 @@ class PacketWriter {
*/
public void shutdown() {
done = true;
synchronized (queue) {
queue.notifyAll();
}
queue.shutdown();
}
/**
@ -123,17 +124,16 @@ class PacketWriter {
* @return the next packet for writing.
*/
private Packet nextPacket() {
if (done) {
return null;
}
Packet packet = null;
// Wait until there's a packet or we're done.
while (!done && (packet = queue.poll()) == null) {
try {
synchronized (queue) {
queue.wait();
}
}
catch (InterruptedException ie) {
// Do nothing
}
try {
packet = queue.take();
}
catch (InterruptedException e) {
// Do nothing
}
return packet;
}
@ -191,12 +191,8 @@ class PacketWriter {
// The exception can be ignored if the the connection is 'done'
// or if the it was caused because the socket got closed
if (!(done || connection.isSocketClosed())) {
done = true;
// packetReader could be set to null by an concurrent disconnect() call.
// Therefore Prevent NPE exceptions by checking packetReader.
if (connection.packetReader != null) {
connection.notifyConnectionError(ioe);
}
shutdown();
connection.notifyConnectionError(ioe);
}
}
}

View file

@ -0,0 +1,112 @@
/**
*
* Copyright 2014 Florian Schmaus
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jivesoftware.smack;
import java.io.IOException;
import java.io.Writer;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import org.jivesoftware.smack.packet.Message;
import org.junit.Test;
import static org.junit.Assert.fail;
public class PacketWriterTest {
volatile boolean shutdown;
volatile boolean prematureUnblocked;
/**
* Make sure that packet writer does block once the queue reaches
* {@link PacketWriter#QUEUE_SIZE} and that
* {@link PacketWriter#sendPacket(org.jivesoftware.smack.packet.Packet)} does unblock after the
* interrupt.
*
* @throws InterruptedException
* @throws BrokenBarrierException
*/
@SuppressWarnings("javadoc")
@Test
public void shouldBlockAndUnblockTest() throws InterruptedException, BrokenBarrierException {
XMPPTCPConnection connection = new XMPPTCPConnection("foobar.com");
final PacketWriter pw = new PacketWriter(connection);
pw.setWriter(new BlockingStringWriter());
pw.startup();
for (int i = 0; i < PacketWriter.QUEUE_SIZE; i++) {
pw.sendPacket(new Message());
}
final CyclicBarrier barrier = new CyclicBarrier(2);
shutdown = false;
prematureUnblocked = false;
Thread t = new Thread(new Runnable() {
@Override
public void run() {
try {
barrier.await();
}
catch (InterruptedException | BrokenBarrierException e1) {
}
pw.sendPacket(new Message());
// should only return after the pw was shutdown
if (!shutdown) {
prematureUnblocked = true;
}
try {
barrier.await();
}
catch (InterruptedException | BrokenBarrierException e) {
}
}
});
t.start();
// This barrier is not strictly necessary, but may increases the chances that the threat
// will block before we call shutdown. Otherwise we may get false positives (which is still
// better then false negatives).
barrier.await();
// Not really cool, but may increases the chances for 't' to block in sendPacket.
Thread.sleep(250);
// Shutdown the packetwriter
pw.shutdown();
shutdown = true;
barrier.await();
if (prematureUnblocked) {
fail("Should not unblock before the thread got shutdown");
}
}
public class BlockingStringWriter extends Writer {
@Override
public void write(char[] cbuf, int off, int len) throws IOException {
try {
Thread.sleep(Integer.MAX_VALUE);
}
catch (InterruptedException e) {
}
}
@Override
public void flush() throws IOException {
}
@Override
public void close() throws IOException {
}
}
}