Rework MultiUserChat

remove PacketMultiplexListener, RoomListenerMultiplexor and
ConnectionDetachedPacketCollector(Test), which was a bunch of
(in same cases redundant) code that formed a complex construct that
presumably tried to make MultiUserChat instances easily garbage
collect-able.

Now, MultiUserChat should be eligible for gc if the userHashLeft() is
invoked before the reference to the instance is dropped, which should be
the case in the most scenarios. Otherwise the connection may references
the MultiUserChat instance over Packet(Listener|Interceptor)s preventing
the gc.
This commit is contained in:
Florian Schmaus 2014-10-13 10:59:36 +02:00
parent 98a3c46e9a
commit 365c71db79
6 changed files with 98 additions and 622 deletions

View File

@ -1,113 +0,0 @@
/**
*
* Copyright 2003-2006 Jive Software.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jivesoftware.smackx.muc;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.jivesoftware.smack.SmackConfiguration;
import org.jivesoftware.smack.packet.Packet;
/**
* A variant of the {@link org.jivesoftware.smack.PacketCollector} class
* that does not force attachment to a <code>XMPPConnection</code>
* on creation and no filter is required. Used to collect message
* packets targeted to a group chat room.
*
* @author Larry Kirschner
*/
class ConnectionDetachedPacketCollector<P extends Packet> {
private ArrayBlockingQueue<P> resultQueue;
/**
* Creates a new packet collector. If the packet filter is <tt>null</tt>, then
* all packets will match this collector.
*/
public ConnectionDetachedPacketCollector() {
this(SmackConfiguration.getPacketCollectorSize());
}
/**
* Creates a new packet collector. If the packet filter is <tt>null</tt>, then
* all packets will match this collector.
*/
public ConnectionDetachedPacketCollector(int maxSize) {
this.resultQueue = new ArrayBlockingQueue<P>(maxSize);
}
/**
* Polls to see if a packet is currently available and returns it, or
* immediately returns <tt>null</tt> if no packets are currently in the
* result queue.
*
* @return the next packet result, or <tt>null</tt> if there are no more
* results.
*/
public P pollResult() {
return resultQueue.poll();
}
/**
* Returns the next available packet. The method call will block (not return)
* until a packet is available.
*
* @return the next available packet.
*/
public P nextResult() {
try {
return resultQueue.take();
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
/**
* Returns the next available packet. The method call will block (not return)
* until a packet is available or the <tt>timeout</tt> has elapased. If the
* timeout elapses without a result, <tt>null</tt> will be returned.
*
* @param timeout the amount of time to wait for the next packet (in milleseconds).
* @return the next available packet.
*/
public P nextResult(long timeout) {
try {
return resultQueue.poll(timeout, TimeUnit.MILLISECONDS);
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
/**
* Processes a packet to see if it meets the criteria for this packet collector.
* If so, the packet is added to the result queue.
*
* @param packet the packet to process.
*/
protected void processPacket(P packet) {
if (packet == null) {
return;
}
while (!resultQueue.offer(packet)) {
// Since we know the queue is full, this poll should never actually block.
resultQueue.poll();
}
}
}

View File

@ -0,0 +1,36 @@
/**
*
* Copyright © 2014 Florian Schmaus
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jivesoftware.smackx.muc;
import org.jivesoftware.smack.SmackException;
/**
* Thrown if the requested operation required the MUC to be joined by the
* client, while the client is currently joined.
*
*/
public class MUCNotJoinedException extends SmackException {
/**
*
*/
private static final long serialVersionUID = -5204934585663465576L;
public MUCNotJoinedException(MultiUserChat multiUserChat) {
super("Client not currently joined " + multiUserChat.getRoom());
}
}

View File

@ -52,6 +52,8 @@ import org.jivesoftware.smack.XMPPException.XMPPErrorException;
import org.jivesoftware.smack.filter.AndFilter;
import org.jivesoftware.smack.filter.FromMatchesFilter;
import org.jivesoftware.smack.filter.MessageTypeFilter;
import org.jivesoftware.smack.filter.MessageWithSubjectFilter;
import org.jivesoftware.smack.filter.NotFilter;
import org.jivesoftware.smack.filter.PacketExtensionFilter;
import org.jivesoftware.smack.filter.PacketFilter;
import org.jivesoftware.smack.filter.PacketTypeFilter;
@ -83,6 +85,10 @@ import org.jivesoftware.smackx.xdata.Form;
* are "moderator", "participant", and "visitor". Each role and affiliation guarantees
* different privileges (e.g. Send messages to all occupants, Kick participants and visitors,
* Grant voice, Edit member list, etc.).
* <p>
* <b>Note:</b> Make sure to leave the MUC ({@link #leave()}) before you drop the reference to
* it, or otherwise you may leak the instance.
* </p>
*
* @author Gaston Dombiak, Larry Kirschner
*/
@ -109,17 +115,29 @@ public class MultiUserChat {
private final Set<MessageListener> messageListeners = new CopyOnWriteArraySet<MessageListener>();
private final Set<PresenceListener> presenceListeners = new CopyOnWriteArraySet<PresenceListener>();
private final Set<PresenceListener> presenceInterceptors = new CopyOnWriteArraySet<PresenceListener>();
private final ConnectionDetachedPacketCollector<Message> messageCollector = new ConnectionDetachedPacketCollector<Message>();
/**
* This filter will match all stanzas send from the groupchat or from one if
* the groupchat participants, i.e. it filters only the bare JID of the from
* attribute against the JID of the MUC.
*/
private final PacketFilter fromRoomFilter;
/**
* Same as {@link #fromRoomFilter} together with {@link MessageTypeFilter#GROUPCHAT}.
*/
private final PacketFilter fromRoomGroupchatFilter;
private final PacketListener presenceInterceptor;
private final PacketFilter fromRoomFilter;
private final PacketListener messageListener;
private final PacketListener presenceListener;
private final RoomListenerMultiplexor roomListenerMultiplexor;
private final PacketListener subjectListener;
private final PacketListener declinesListener;
private String subject;
private String nickname = null;
private boolean joined = false;
private PacketCollector messageCollector;
static {
XMPPConnectionRegistry.addConnectionCreationListener(new ConnectionCreationListener() {
@ -181,6 +199,9 @@ public class MultiUserChat {
this.connection = connection;
this.room = room.toLowerCase(Locale.US);
fromRoomFilter = FromMatchesFilter.create(room);
fromRoomGroupchatFilter = new AndFilter(fromRoomFilter, MessageTypeFilter.GROUPCHAT);
messageListener = new PacketListener() {
@Override
public void processPacket(Packet packet) throws NotConnectedException {
@ -190,19 +211,9 @@ public class MultiUserChat {
}
}
};
presenceListener = new PacketListener() {
@Override
public void processPacket(Packet packet) throws NotConnectedException {
Presence presence = (Presence) packet;
for (PresenceListener listener : presenceListeners) {
listener.processPresence(presence);
}
}
};
fromRoomFilter = FromMatchesFilter.create(room);
// Create a listener for subject updates.
PacketListener subjectListener = new PacketListener() {
subjectListener = new PacketListener() {
public void processPacket(Packet packet) {
Message msg = (Message) packet;
// Update the room subject
@ -216,7 +227,7 @@ public class MultiUserChat {
};
// Create a listener for all presence updates.
PacketListener presenceListener = new PacketListener() {
presenceListener = new PacketListener() {
public void processPacket(Packet packet) {
Presence presence = (Presence) packet;
String from = presence.getFrom();
@ -270,34 +281,27 @@ public class MultiUserChat {
}
}
}
for (PresenceListener listener : presenceListeners) {
listener.processPresence(presence);
}
}
};
// Listens for all messages that include a MUCUser extension and fire the invitation
// rejection listeners if the message includes an invitation rejection.
PacketListener declinesListener = new PacketListener() {
declinesListener = new PacketListener() {
public void processPacket(Packet packet) {
// Get the MUC User extension
MUCUser mucUser = MUCUser.from(packet);
// Check if the MUCUser informs that the invitee has declined the invitation
if (mucUser.getDecline() != null &&
((Message) packet).getType() != Message.Type.error) {
// Fire event for invitation rejection listeners
fireInvitationRejectionListeners(
mucUser.getDecline().getFrom(),
mucUser.getDecline().getReason());
if (mucUser.getDecline() == null) {
return;
}
// Fire event for invitation rejection listeners
fireInvitationRejectionListeners(mucUser.getDecline().getFrom(), mucUser.getDecline().getReason());
}
};
PacketMultiplexListener packetMultiplexor = new PacketMultiplexListener(
messageCollector, presenceListener, subjectListener,
declinesListener);
roomListenerMultiplexor = RoomListenerMultiplexor.getRoomMultiplexor(connection);
roomListenerMultiplexor.addRoom(room, packetMultiplexor);
presenceInterceptor = new PacketListener() {
@Override
public void processPacket(Packet packet) {
@ -476,12 +480,16 @@ public class MultiUserChat {
this.nickname = nickname;
joined = true;
// Setup the messageListeners and presenceListeners
connection.addPacketListener(messageListener, new AndFilter(fromRoomFilter,
MessageTypeFilter.GROUPCHAT));
connection.addPacketListener(messageListener, fromRoomGroupchatFilter);
connection.addPacketListener(presenceListener, new AndFilter(fromRoomFilter,
PacketTypeFilter.PRESENCE));
connection.addPacketListener(subjectListener, new AndFilter(fromRoomFilter,
MessageWithSubjectFilter.INSTANCE));
connection.addPacketListener(declinesListener, new AndFilter(new PacketExtensionFilter(MUCUser.ELEMENT,
MUCUser.NAMESPACE), new NotFilter(MessageTypeFilter.ERROR)));
connection.addPacketInterceptor(presenceInterceptor, new AndFilter(new ToFilter(room),
PacketTypeFilter.PRESENCE));
messageCollector = connection.createPacketCollector(fromRoomGroupchatFilter);
// Update the list of joined rooms through this connection
List<String> rooms = joinedRooms.get(connection);
if (rooms == null) {
@ -1821,8 +1829,12 @@ public class MultiUserChat {
*
* @return the next message if one is immediately available and
* <tt>null</tt> otherwise.
* @throws MUCNotJoinedException
*/
public Message pollMessage() {
public Message pollMessage() throws MUCNotJoinedException {
if (messageCollector == null) {
throw new MUCNotJoinedException(this);
}
return messageCollector.pollResult();
}
@ -1831,8 +1843,12 @@ public class MultiUserChat {
* (not return) until a message is available.
*
* @return the next message.
* @throws MUCNotJoinedException
*/
public Message nextMessage() {
public Message nextMessage() throws MUCNotJoinedException {
if (messageCollector == null) {
throw new MUCNotJoinedException(this);
}
return messageCollector.nextResult();
}
@ -1844,8 +1860,12 @@ public class MultiUserChat {
* @param timeout the maximum amount of time to wait for the next message.
* @return the next message, or <tt>null</tt> if the timeout elapses without a
* message becoming available.
* @throws MUCNotJoinedException
*/
public Message nextMessage(long timeout) {
public Message nextMessage(long timeout) throws MUCNotJoinedException {
if (messageCollector == null) {
throw new MUCNotJoinedException(this);
}
return messageCollector.nextResult(timeout);
}
@ -1891,11 +1911,8 @@ public class MultiUserChat {
Message message = new Message(room, Message.Type.groupchat);
message.setSubject(subject);
// Wait for an error or confirmation message back from the server.
PacketFilter responseFilter =
new AndFilter(
FromMatchesFilter.create(room),
new PacketTypeFilter(Message.class));
responseFilter = new AndFilter(responseFilter, new PacketFilter() {
PacketFilter responseFilter = new AndFilter(fromRoomGroupchatFilter, new PacketFilter() {
@Override
public boolean accept(Packet packet) {
Message msg = (Message) packet;
return subject.equals(msg.getSubject());
@ -1917,9 +1934,13 @@ public class MultiUserChat {
}
connection.removePacketListener(messageListener);
connection.removePacketListener(presenceListener);
connection.removePacketListener(declinesListener);
connection.removePacketInterceptor(presenceInterceptor);
if (messageCollector != null) {
messageCollector.cancel();
messageCollector = null;
}
rooms.remove(room);
cleanup();
}
/**
@ -2334,21 +2355,6 @@ public class MultiUserChat {
}
}
private void cleanup() {
try {
if (connection != null) {
roomListenerMultiplexor.removeRoom(room);
}
} catch (Exception e) {
// Do nothing
}
}
protected void finalize() throws Throwable {
cleanup();
super.finalize();
}
/**
* An InvitationsMonitor monitors a given connection to detect room invitations. Every
* time the InvitationsMonitor detects a new invitation it will fire the invitation listeners.

View File

@ -1,87 +0,0 @@
/**
*
* Copyright 2003-2006 Jive Software.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jivesoftware.smackx.muc;
import org.jivesoftware.smack.PacketListener;
import org.jivesoftware.smack.SmackException.NotConnectedException;
import org.jivesoftware.smack.filter.MessageTypeFilter;
import org.jivesoftware.smack.filter.MessageWithSubjectFilter;
import org.jivesoftware.smack.filter.PacketExtensionFilter;
import org.jivesoftware.smack.filter.PacketFilter;
import org.jivesoftware.smack.filter.PacketTypeFilter;
import org.jivesoftware.smack.packet.Message;
import org.jivesoftware.smack.packet.Packet;
import org.jivesoftware.smack.packet.Presence;
import org.jivesoftware.smackx.muc.packet.MUCUser;
/**
* The single <code>PacketListener</code> used by each {@link MultiUserChat}
* for all basic processing of presence, and message packets targeted to that chat.
*
* @author Larry Kirschner
*/
class PacketMultiplexListener implements PacketListener {
private static final PacketFilter PRESENCE_FILTER = new PacketTypeFilter(Presence.class);
private static final PacketFilter DECLINES_FILTER = new PacketExtensionFilter(MUCUser.ELEMENT,
MUCUser.NAMESPACE);
private ConnectionDetachedPacketCollector<Message> messageCollector;
private PacketListener presenceListener;
private PacketListener subjectListener;
private PacketListener declinesListener;
public PacketMultiplexListener(
ConnectionDetachedPacketCollector<Message> messageCollector,
PacketListener presenceListener,
PacketListener subjectListener, PacketListener declinesListener) {
if (messageCollector == null) {
throw new IllegalArgumentException("MessageCollector is null");
}
if (presenceListener == null) {
throw new IllegalArgumentException("Presence listener is null");
}
if (subjectListener == null) {
throw new IllegalArgumentException("Subject listener is null");
}
if (declinesListener == null) {
throw new IllegalArgumentException("Declines listener is null");
}
this.messageCollector = messageCollector;
this.presenceListener = presenceListener;
this.subjectListener = subjectListener;
this.declinesListener = declinesListener;
}
public void processPacket(Packet p) throws NotConnectedException {
if (PRESENCE_FILTER.accept(p)) {
presenceListener.processPacket(p);
}
else if (MessageTypeFilter.GROUPCHAT.accept(p)) {
messageCollector.processPacket((Message) p);
if (MessageWithSubjectFilter.INSTANCE.accept(p)) {
subjectListener.processPacket(p);
}
}
else if (DECLINES_FILTER.accept(p)) {
declinesListener.processPacket(p);
}
}
}

View File

@ -1,170 +0,0 @@
/**
*
* Copyright 2003-2006 Jive Software.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jivesoftware.smackx.muc;
import org.jivesoftware.smack.Manager;
import org.jivesoftware.smack.PacketListener;
import org.jivesoftware.smack.SmackException.NotConnectedException;
import org.jivesoftware.smack.XMPPConnection;
import org.jivesoftware.smack.filter.PacketFilter;
import org.jivesoftware.smack.packet.Packet;
import org.jxmpp.util.XmppStringUtils;
import java.util.Locale;
import java.util.Map;
import java.util.WeakHashMap;
import java.util.concurrent.ConcurrentHashMap;
/**
* A <code>RoomListenerMultiplexor</code> multiplexes incoming packets on
* a <code>XMPPConnection</code> using a single listener/filter pair.
* A single <code>RoomListenerMultiplexor</code> is created for each
* {@link org.jivesoftware.smack.XMPPConnection} that has joined MUC rooms
* within its session.
*
* @author Larry Kirschner
*/
class RoomListenerMultiplexor extends Manager {
// We use a WeakHashMap so that the GC can collect the monitor when the
// connection is no longer referenced by any object.
private static final Map<XMPPConnection, RoomListenerMultiplexor> monitors = new WeakHashMap<XMPPConnection, RoomListenerMultiplexor>();
private final RoomMultiplexFilter filter;
private final RoomMultiplexListener listener;
/**
* Returns a new or existing RoomListenerMultiplexor for a given connection.
*
* @param conn the connection to monitor for room invitations.
* @return a new or existing RoomListenerMultiplexor for a given connection.
*/
public static synchronized RoomListenerMultiplexor getRoomMultiplexor(XMPPConnection conn) {
RoomListenerMultiplexor rlm = monitors.get(conn);
if (rlm == null) {
rlm = new RoomListenerMultiplexor(conn, new RoomMultiplexFilter(),
new RoomMultiplexListener());
}
// Return the InvitationsMonitor that monitors the connection
return rlm;
}
/**
* All access should be through
* the static method {@link #getRoomMultiplexor(XMPPConnection)}.
*/
private RoomListenerMultiplexor(XMPPConnection connection, RoomMultiplexFilter filter,
RoomMultiplexListener listener) {
super(connection);
connection.addPacketListener(listener, filter);
this.filter = filter;
this.listener = listener;
monitors.put(connection, this);
}
public void addRoom(String address, PacketMultiplexListener roomListener) {
filter.addRoom(address);
listener.addRoom(address, roomListener);
}
public void removeRoom(String address) {
filter.removeRoom(address);
listener.removeRoom(address);
}
/**
* The single <code>XMPPConnection</code>-level <code>PacketFilter</code> used by a {@link RoomListenerMultiplexor}
* for all muc chat rooms on an <code>XMPPConnection</code>.
* Each time a muc chat room is added to/removed from an
* <code>XMPPConnection</code> the address for that chat room
* is added to/removed from that <code>XMPPConnection</code>'s
* <code>RoomMultiplexFilter</code>.
*/
private static class RoomMultiplexFilter implements PacketFilter {
private Map<String, String> roomAddressTable = new ConcurrentHashMap<String, String>();
public boolean accept(Packet p) {
String from = p.getFrom();
if (from == null) {
return false;
}
return roomAddressTable.containsKey(XmppStringUtils.parseBareAddress(from).toLowerCase(Locale.US));
}
public void addRoom(String address) {
if (address == null) {
return;
}
roomAddressTable.put(address.toLowerCase(Locale.US), address);
}
public void removeRoom(String address) {
if (address == null) {
return;
}
roomAddressTable.remove(address.toLowerCase(Locale.US));
}
}
/**
* The single <code>XMPPConnection</code>-level <code>PacketListener</code>
* used by a {@link RoomListenerMultiplexor}
* for all muc chat rooms on an <code>XMPPConnection</code>.
* Each time a muc chat room is added to/removed from an
* <code>XMPPConnection</code> the address and listener for that chat room
* are added to/removed from that <code>XMPPConnection</code>'s
* <code>RoomMultiplexListener</code>.
*
* @author Larry Kirschner
*/
private static class RoomMultiplexListener implements PacketListener {
private Map<String, PacketMultiplexListener> roomListenersByAddress =
new ConcurrentHashMap<String, PacketMultiplexListener>();
public void processPacket(Packet p) throws NotConnectedException {
String from = p.getFrom();
if (from == null) {
return;
}
PacketMultiplexListener listener =
roomListenersByAddress.get(XmppStringUtils.parseBareAddress(from).toLowerCase(Locale.US));
if (listener != null) {
listener.processPacket(p);
}
}
public void addRoom(String address, PacketMultiplexListener listener) {
if (address == null) {
return;
}
roomListenersByAddress.put(address.toLowerCase(Locale.US), listener);
}
public void removeRoom(String address) {
if (address == null) {
return;
}
roomListenersByAddress.remove(address.toLowerCase(Locale.US));
}
}
}

View File

@ -1,196 +0,0 @@
/**
*
* Copyright the original author or authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jivesoftware.smackx.muc;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import org.jivesoftware.smack.packet.Packet;
import org.junit.Test;
public class ConnectionDetachedPacketCollectorTest
{
@Test
public void verifyRollover()
{
ConnectionDetachedPacketCollector<Packet> collector = new ConnectionDetachedPacketCollector<Packet>(5);
for (int i=0; i<6; i++)
{
Packet testPacket = new TestPacket(i);
collector.processPacket(testPacket);
}
// Assert that '0' has rolled off
assertEquals("1", collector.nextResult().getPacketID());
assertEquals("2", collector.nextResult().getPacketID());
assertEquals("3", collector.nextResult().getPacketID());
assertEquals("4", collector.nextResult().getPacketID());
assertEquals("5", collector.pollResult().getPacketID());
assertNull(collector.pollResult());
for (int i=10; i<15; i++)
{
Packet testPacket = new TestPacket(i);
collector.processPacket(testPacket);
}
assertEquals("10", collector.nextResult().getPacketID());
assertEquals("11", collector.nextResult().getPacketID());
assertEquals("12", collector.nextResult().getPacketID());
assertEquals("13", collector.nextResult().getPacketID());
assertEquals("14", collector.pollResult().getPacketID());
assertNull(collector.pollResult());
assertNull(collector.nextResult(1000));
}
/**
* Although this doesn't guarentee anything due to the nature of threading, it can
* potentially catch problems.
*/
@Test
public void verifyThreadSafety()
{
int insertCount = 500;
final ConnectionDetachedPacketCollector<Packet> collector = new ConnectionDetachedPacketCollector<Packet>(insertCount);
Thread consumer1 = new Thread(new Runnable()
{
@Override
public void run()
{
try
{
while (true)
{
try
{
Thread.sleep(3);
}
catch (InterruptedException e)
{
}
@SuppressWarnings("unused")
Packet packet = collector.nextResult();
// System.out.println(Thread.currentThread().getName() + " packet: " + packet);
}
}
catch (RuntimeException re)
{
if (re.getCause() instanceof InterruptedException)
{
// System.out.println(Thread.currentThread().getName() + " has been interupted");
}
}
}
});
consumer1.setName("consumer 1");
Thread consumer2 = new Thread(new Runnable()
{
@Override
public void run()
{
Packet p = null;
do
{
try
{
Thread.sleep(3);
}
catch (InterruptedException e)
{
}
p = collector.nextResult(1);
// System.out.println(Thread.currentThread().getName() + " packet: " + p);
}
while (p != null);
}
});
consumer2.setName("consumer 2");
Thread consumer3 = new Thread(new Runnable()
{
@Override
public void run()
{
Packet p = null;
do
{
try
{
Thread.sleep(3);
}
catch (InterruptedException e)
{
}
p = collector.pollResult();
// System.out.println(Thread.currentThread().getName() + " packet: " + p);
}
while (p != null);
}
});
consumer3.setName("consumer 3");
consumer1.start();
consumer2.start();
consumer3.start();
for(int i=0; i<insertCount; i++)
{
collector.processPacket(new TestPacket(i));
}
try
{
Thread.sleep(5000);
consumer3.join();
consumer2.join();
consumer1.interrupt();
}
catch (InterruptedException e)
{
}
//We cannot guarantee that this is going to pass due to the possible issue of timing between consumer 1
// and main, but the probability is extremely remote.
assertNull(collector.pollResult());
}
class TestPacket extends Packet
{
public TestPacket(int i)
{
setPacketID(String.valueOf(i));
}
@Override
public String toString()
{
return toXML();
}
@Override
public String toXML()
{
return "<packetId>" + getPacketID() + "</packetId>";
}
}
}