From d1e9d81769225dcc0a6b6b0cec25e0c1a245e185 Mon Sep 17 00:00:00 2001 From: rcollier Date: Thu, 7 Feb 2013 03:42:33 +0000 Subject: [PATCH] SMACK-341 Updated collectors to use concurrent classes. git-svn-id: http://svn.igniterealtime.org/svn/repos/smack/trunk@13452 b35dd754-fafc-0310-a699-88a17e54d16e --- .../jivesoftware/smack/PacketCollector.java | 110 +++------- .../ConnectionDetachedPacketCollector.java | 78 +++---- .../smack/PacketCollectorTest.java | 198 ++++++++++++++++++ ...ConnectionDetachedPacketCollectorTest.java | 179 ++++++++++++++++ 4 files changed, 437 insertions(+), 128 deletions(-) create mode 100644 test-unit/org/jivesoftware/smack/PacketCollectorTest.java create mode 100644 test-unit/org/jivesoftware/smackx/muc/ConnectionDetachedPacketCollectorTest.java diff --git a/source/org/jivesoftware/smack/PacketCollector.java b/source/org/jivesoftware/smack/PacketCollector.java index 317e940b2..9b4b4ae4a 100644 --- a/source/org/jivesoftware/smack/PacketCollector.java +++ b/source/org/jivesoftware/smack/PacketCollector.java @@ -20,11 +20,12 @@ package org.jivesoftware.smack; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.TimeUnit; + import org.jivesoftware.smack.filter.PacketFilter; import org.jivesoftware.smack.packet.Packet; -import java.util.LinkedList; - /** * Provides a mechanism to collect packets into a result queue that pass a * specified filter. The collector lets you perform blocking and polling @@ -41,16 +42,9 @@ import java.util.LinkedList; */ public class PacketCollector { - /** - * Max number of packets that any one collector can hold. After the max is - * reached, older packets will be automatically dropped from the queue as - * new packets are added. - */ - private int maxPackets = SmackConfiguration.getPacketCollectorSize(); - private PacketFilter packetFilter; - private LinkedList resultQueue; - private Connection conection; + private ArrayBlockingQueue resultQueue; + private Connection connection; private boolean cancelled = false; /** @@ -61,9 +55,7 @@ public class PacketCollector { * @param packetFilter determines which packets will be returned by this collector. */ protected PacketCollector(Connection conection, PacketFilter packetFilter) { - this.conection = conection; - this.packetFilter = packetFilter; - this.resultQueue = new LinkedList(); + this(conection, packetFilter, SmackConfiguration.getPacketCollectorSize()); } /** @@ -75,8 +67,9 @@ public class PacketCollector { * @param maxSize the maximum number of packets that will be stored in the collector. */ protected PacketCollector(Connection conection, PacketFilter packetFilter, int maxSize) { - this(conection, packetFilter); - maxPackets = maxSize; + this.connection = conection; + this.packetFilter = packetFilter; + this.resultQueue = new ArrayBlockingQueue(maxSize); } /** @@ -88,7 +81,7 @@ public class PacketCollector { // If the packet collector has already been cancelled, do nothing. if (!cancelled) { cancelled = true; - conection.removePacketCollector(this); + connection.removePacketCollector(this); } } @@ -110,13 +103,8 @@ public class PacketCollector { * @return the next packet result, or null if there are no more * results. */ - public synchronized Packet pollResult() { - if (resultQueue.isEmpty()) { - return null; - } - else { - return resultQueue.removeLast(); - } + public Packet pollResult() { + return resultQueue.poll(); } /** @@ -125,17 +113,13 @@ public class PacketCollector { * * @return the next available packet. */ - public synchronized Packet nextResult() { - // Wait indefinitely until there is a result to return. - while (resultQueue.isEmpty()) { - try { - wait(); - } - catch (InterruptedException ie) { - // Ignore. - } - } - return resultQueue.removeLast(); + public Packet nextResult() { + try { + return resultQueue.take(); + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } } /** @@ -146,40 +130,13 @@ public class PacketCollector { * @param timeout the amount of time to wait for the next packet (in milleseconds). * @return the next available packet. */ - public synchronized Packet nextResult(long timeout) { - // Wait up to the specified amount of time for a result. - if (resultQueue.isEmpty()) { - long waitTime = timeout; - long start = System.currentTimeMillis(); - try { - // Keep waiting until the specified amount of time has elapsed, or - // a packet is available to return. - while (resultQueue.isEmpty()) { - if (waitTime <= 0) { - break; - } - wait(waitTime); - long now = System.currentTimeMillis(); - waitTime -= (now - start); - start = now; - } - } - catch (InterruptedException ie) { - // Ignore. - } - // Still haven't found a result, so return null. - if (resultQueue.isEmpty()) { - return null; - } - // Return the packet that was found. - else { - return resultQueue.removeLast(); - } - } - // There's already a packet waiting, so return it. - else { - return resultQueue.removeLast(); - } + public Packet nextResult(long timeout) { + try { + return resultQueue.poll(timeout, TimeUnit.MILLISECONDS); + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } } /** @@ -188,19 +145,16 @@ public class PacketCollector { * * @param packet the packet to process. */ - protected synchronized void processPacket(Packet packet) { + protected void processPacket(Packet packet) { if (packet == null) { return; } + if (packetFilter == null || packetFilter.accept(packet)) { - // If the max number of packets has been reached, remove the oldest one. - if (resultQueue.size() == maxPackets) { - resultQueue.removeLast(); - } - // Add the new packet. - resultQueue.addFirst(packet); - // Notify waiting threads a result is available. - notifyAll(); + while (!resultQueue.offer(packet)) { + // Since we know the queue is full, this poll should never actually block. + resultQueue.poll(); + } } } } diff --git a/source/org/jivesoftware/smackx/muc/ConnectionDetachedPacketCollector.java b/source/org/jivesoftware/smackx/muc/ConnectionDetachedPacketCollector.java index 2a719f629..243c2986a 100644 --- a/source/org/jivesoftware/smackx/muc/ConnectionDetachedPacketCollector.java +++ b/source/org/jivesoftware/smackx/muc/ConnectionDetachedPacketCollector.java @@ -20,11 +20,12 @@ package org.jivesoftware.smackx.muc; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.TimeUnit; + import org.jivesoftware.smack.SmackConfiguration; import org.jivesoftware.smack.packet.Packet; -import java.util.LinkedList; - /** * A variant of the {@link org.jivesoftware.smack.PacketCollector} class * that does not force attachment to a Connection @@ -41,14 +42,14 @@ class ConnectionDetachedPacketCollector { */ private int maxPackets = SmackConfiguration.getPacketCollectorSize(); - private LinkedList resultQueue; + private ArrayBlockingQueue resultQueue; /** * Creates a new packet collector. If the packet filter is null, then * all packets will match this collector. */ public ConnectionDetachedPacketCollector() { - this.resultQueue = new LinkedList(); + this(SmackConfiguration.getPacketCollectorSize()); } /** @@ -56,8 +57,7 @@ class ConnectionDetachedPacketCollector { * all packets will match this collector. */ public ConnectionDetachedPacketCollector(int maxSize) { - this.resultQueue = new LinkedList(); - maxPackets = maxSize; + this.resultQueue = new ArrayBlockingQueue(maxSize); } /** @@ -68,13 +68,8 @@ class ConnectionDetachedPacketCollector { * @return the next packet result, or null if there are no more * results. */ - public synchronized Packet pollResult() { - if (resultQueue.isEmpty()) { - return null; - } - else { - return resultQueue.removeLast(); - } + public Packet pollResult() { + return resultQueue.poll(); } /** @@ -83,17 +78,13 @@ class ConnectionDetachedPacketCollector { * * @return the next available packet. */ - public synchronized Packet nextResult() { - // Wait indefinitely until there is a result to return. - while (resultQueue.isEmpty()) { - try { - wait(); - } - catch (InterruptedException ie) { - // Ignore. - } - } - return resultQueue.removeLast(); + public Packet nextResult() { + try { + return resultQueue.take(); + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } } /** @@ -104,23 +95,13 @@ class ConnectionDetachedPacketCollector { * @param timeout the amount of time to wait for the next packet (in milleseconds). * @return the next available packet. */ - public synchronized Packet nextResult(long timeout) { - // Wait up to the specified amount of time for a result. - if (resultQueue.isEmpty()) { - try { - wait(timeout); - } - catch (InterruptedException ie) { - // Ignore. - } - } - // If still no result, return null. - if (resultQueue.isEmpty()) { - return null; - } - else { - return resultQueue.removeLast(); - } + public Packet nextResult(long timeout) { + try { + return resultQueue.poll(timeout, TimeUnit.MILLISECONDS); + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } } /** @@ -129,17 +110,14 @@ class ConnectionDetachedPacketCollector { * * @param packet the packet to process. */ - protected synchronized void processPacket(Packet packet) { + protected void processPacket(Packet packet) { if (packet == null) { return; } - // If the max number of packets has been reached, remove the oldest one. - if (resultQueue.size() == maxPackets) { - resultQueue.removeLast(); - } - // Add the new packet. - resultQueue.addFirst(packet); - // Notify waiting threads a result is available. - notifyAll(); + + while (!resultQueue.offer(packet)) { + // Since we know the queue is full, this poll should never actually block. + resultQueue.poll(); + } } } diff --git a/test-unit/org/jivesoftware/smack/PacketCollectorTest.java b/test-unit/org/jivesoftware/smack/PacketCollectorTest.java new file mode 100644 index 000000000..d86b3ac82 --- /dev/null +++ b/test-unit/org/jivesoftware/smack/PacketCollectorTest.java @@ -0,0 +1,198 @@ +package org.jivesoftware.smack; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +import org.jivesoftware.smack.filter.PacketFilter; +import org.jivesoftware.smack.packet.Packet; +import org.junit.Test; + +public class PacketCollectorTest +{ + + @Test + public void verifyRollover() + { + TestPacketCollector collector = new TestPacketCollector(null, new OKEverything(), 5); + + for (int i=0; i<6; i++) + { + Packet testPacket = new TestPacket(i); + collector.processPacket(testPacket); + } + + // Assert that '0' has rolled off + assertEquals("1", collector.nextResult().getPacketID()); + assertEquals("2", collector.nextResult().getPacketID()); + assertEquals("3", collector.nextResult().getPacketID()); + assertEquals("4", collector.nextResult().getPacketID()); + assertEquals("5", collector.pollResult().getPacketID()); + assertNull(collector.pollResult()); + + for (int i=10; i<15; i++) + { + Packet testPacket = new TestPacket(i); + collector.processPacket(testPacket); + } + + assertEquals("10", collector.nextResult().getPacketID()); + assertEquals("11", collector.nextResult().getPacketID()); + assertEquals("12", collector.nextResult().getPacketID()); + assertEquals("13", collector.nextResult().getPacketID()); + assertEquals("14", collector.pollResult().getPacketID()); + assertNull(collector.pollResult()); + + assertNull(collector.nextResult(1000)); + } + + /** + * Although this doesn't guarentee anything due to the nature of threading, it can + * potentially catch problems. + */ + @Test + public void verifyThreadSafety() + { + int insertCount = 500; + final TestPacketCollector collector = new TestPacketCollector(null, new OKEverything(), insertCount); + + Thread consumer1 = new Thread(new Runnable() + { + @Override + public void run() + { + try + { + while (true) + { + try + { + Thread.sleep(3); + } + catch (InterruptedException e) + { + } + Packet packet = collector.nextResult(); +// System.out.println(Thread.currentThread().getName() + " packet: " + packet); + } + } + catch (RuntimeException re) + { + if (re.getCause() instanceof InterruptedException) + { +// System.out.println(Thread.currentThread().getName() + " has been interupted"); + } + } + } + }); + consumer1.setName("consumer 1"); + + Thread consumer2 = new Thread(new Runnable() + { + @Override + public void run() + { + Packet p = null; + + do + { + try + { + Thread.sleep(3); + } + catch (InterruptedException e) + { + } + p = collector.nextResult(1); +// System.out.println(Thread.currentThread().getName() + " packet: " + p); + } + while (p != null); + } + }); + consumer2.setName("consumer 2"); + + Thread consumer3 = new Thread(new Runnable() + { + @Override + public void run() + { + Packet p = null; + + do + { + try + { + Thread.sleep(3); + } + catch (InterruptedException e) + { + } + p = collector.pollResult(); +// System.out.println(Thread.currentThread().getName() + " packet: " + p); + } + while (p != null); + } + }); + consumer3.setName("consumer 3"); + + consumer1.start(); + consumer2.start(); + consumer3.start(); + + for(int i=0; i" + getPacketID() + ""; + } + } +} diff --git a/test-unit/org/jivesoftware/smackx/muc/ConnectionDetachedPacketCollectorTest.java b/test-unit/org/jivesoftware/smackx/muc/ConnectionDetachedPacketCollectorTest.java new file mode 100644 index 000000000..ae274d7c2 --- /dev/null +++ b/test-unit/org/jivesoftware/smackx/muc/ConnectionDetachedPacketCollectorTest.java @@ -0,0 +1,179 @@ +package org.jivesoftware.smackx.muc; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +import org.jivesoftware.smack.packet.Packet; +import org.junit.Test; + +public class ConnectionDetachedPacketCollectorTest +{ + + @Test + public void verifyRollover() + { + ConnectionDetachedPacketCollector collector = new ConnectionDetachedPacketCollector(5); + + for (int i=0; i<6; i++) + { + Packet testPacket = new TestPacket(i); + collector.processPacket(testPacket); + } + + // Assert that '0' has rolled off + assertEquals("1", collector.nextResult().getPacketID()); + assertEquals("2", collector.nextResult().getPacketID()); + assertEquals("3", collector.nextResult().getPacketID()); + assertEquals("4", collector.nextResult().getPacketID()); + assertEquals("5", collector.pollResult().getPacketID()); + assertNull(collector.pollResult()); + + for (int i=10; i<15; i++) + { + Packet testPacket = new TestPacket(i); + collector.processPacket(testPacket); + } + + assertEquals("10", collector.nextResult().getPacketID()); + assertEquals("11", collector.nextResult().getPacketID()); + assertEquals("12", collector.nextResult().getPacketID()); + assertEquals("13", collector.nextResult().getPacketID()); + assertEquals("14", collector.pollResult().getPacketID()); + assertNull(collector.pollResult()); + + assertNull(collector.nextResult(1000)); + } + + /** + * Although this doesn't guarentee anything due to the nature of threading, it can + * potentially catch problems. + */ + @Test + public void verifyThreadSafety() + { + int insertCount = 500; + final ConnectionDetachedPacketCollector collector = new ConnectionDetachedPacketCollector(insertCount); + + Thread consumer1 = new Thread(new Runnable() + { + @Override + public void run() + { + try + { + while (true) + { + try + { + Thread.sleep(3); + } + catch (InterruptedException e) + { + } + Packet packet = collector.nextResult(); +// System.out.println(Thread.currentThread().getName() + " packet: " + packet); + } + } + catch (RuntimeException re) + { + if (re.getCause() instanceof InterruptedException) + { +// System.out.println(Thread.currentThread().getName() + " has been interupted"); + } + } + } + }); + consumer1.setName("consumer 1"); + + Thread consumer2 = new Thread(new Runnable() + { + @Override + public void run() + { + Packet p = null; + + do + { + try + { + Thread.sleep(3); + } + catch (InterruptedException e) + { + } + p = collector.nextResult(1); +// System.out.println(Thread.currentThread().getName() + " packet: " + p); + } + while (p != null); + } + }); + consumer2.setName("consumer 2"); + + Thread consumer3 = new Thread(new Runnable() + { + @Override + public void run() + { + Packet p = null; + + do + { + try + { + Thread.sleep(3); + } + catch (InterruptedException e) + { + } + p = collector.pollResult(); +// System.out.println(Thread.currentThread().getName() + " packet: " + p); + } + while (p != null); + } + }); + consumer3.setName("consumer 3"); + + consumer1.start(); + consumer2.start(); + consumer3.start(); + + for(int i=0; i" + getPacketID() + ""; + } + } +}