diff --git a/smack-extensions/src/main/java/org/jivesoftware/smackx/muc/ConnectionDetachedPacketCollector.java b/smack-extensions/src/main/java/org/jivesoftware/smackx/muc/ConnectionDetachedPacketCollector.java deleted file mode 100644 index 16f0a182e..000000000 --- a/smack-extensions/src/main/java/org/jivesoftware/smackx/muc/ConnectionDetachedPacketCollector.java +++ /dev/null @@ -1,113 +0,0 @@ -/** - * - * Copyright 2003-2006 Jive Software. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.jivesoftware.smackx.muc; - -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.TimeUnit; - -import org.jivesoftware.smack.SmackConfiguration; -import org.jivesoftware.smack.packet.Packet; - -/** - * A variant of the {@link org.jivesoftware.smack.PacketCollector} class - * that does not force attachment to a XMPPConnection - * on creation and no filter is required. Used to collect message - * packets targeted to a group chat room. - * - * @author Larry Kirschner - */ -class ConnectionDetachedPacketCollector

{ - private ArrayBlockingQueue

resultQueue; - - /** - * Creates a new packet collector. If the packet filter is null, then - * all packets will match this collector. - */ - public ConnectionDetachedPacketCollector() { - this(SmackConfiguration.getPacketCollectorSize()); - } - - /** - * Creates a new packet collector. If the packet filter is null, then - * all packets will match this collector. - */ - public ConnectionDetachedPacketCollector(int maxSize) { - this.resultQueue = new ArrayBlockingQueue

(maxSize); - } - - /** - * Polls to see if a packet is currently available and returns it, or - * immediately returns null if no packets are currently in the - * result queue. - * - * @return the next packet result, or null if there are no more - * results. - */ - public P pollResult() { - return resultQueue.poll(); - } - - /** - * Returns the next available packet. The method call will block (not return) - * until a packet is available. - * - * @return the next available packet. - */ - public P nextResult() { - try { - return resultQueue.take(); - } - catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - - /** - * Returns the next available packet. The method call will block (not return) - * until a packet is available or the timeout has elapased. If the - * timeout elapses without a result, null will be returned. - * - * @param timeout the amount of time to wait for the next packet (in milleseconds). - * @return the next available packet. - */ - public P nextResult(long timeout) { - try { - return resultQueue.poll(timeout, TimeUnit.MILLISECONDS); - } - catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - - /** - * Processes a packet to see if it meets the criteria for this packet collector. - * If so, the packet is added to the result queue. - * - * @param packet the packet to process. - */ - protected void processPacket(P packet) { - if (packet == null) { - return; - } - - while (!resultQueue.offer(packet)) { - // Since we know the queue is full, this poll should never actually block. - resultQueue.poll(); - } - } -} diff --git a/smack-extensions/src/main/java/org/jivesoftware/smackx/muc/MUCNotJoinedException.java b/smack-extensions/src/main/java/org/jivesoftware/smackx/muc/MUCNotJoinedException.java new file mode 100644 index 000000000..0b647546a --- /dev/null +++ b/smack-extensions/src/main/java/org/jivesoftware/smackx/muc/MUCNotJoinedException.java @@ -0,0 +1,36 @@ +/** + * + * Copyright © 2014 Florian Schmaus + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.jivesoftware.smackx.muc; + +import org.jivesoftware.smack.SmackException; + +/** + * Thrown if the requested operation required the MUC to be joined by the + * client, while the client is currently joined. + * + */ +public class MUCNotJoinedException extends SmackException { + + /** + * + */ + private static final long serialVersionUID = -5204934585663465576L; + + public MUCNotJoinedException(MultiUserChat multiUserChat) { + super("Client not currently joined " + multiUserChat.getRoom()); + } +} diff --git a/smack-extensions/src/main/java/org/jivesoftware/smackx/muc/MultiUserChat.java b/smack-extensions/src/main/java/org/jivesoftware/smackx/muc/MultiUserChat.java index ac707e635..0b3a40e52 100644 --- a/smack-extensions/src/main/java/org/jivesoftware/smackx/muc/MultiUserChat.java +++ b/smack-extensions/src/main/java/org/jivesoftware/smackx/muc/MultiUserChat.java @@ -52,6 +52,8 @@ import org.jivesoftware.smack.XMPPException.XMPPErrorException; import org.jivesoftware.smack.filter.AndFilter; import org.jivesoftware.smack.filter.FromMatchesFilter; import org.jivesoftware.smack.filter.MessageTypeFilter; +import org.jivesoftware.smack.filter.MessageWithSubjectFilter; +import org.jivesoftware.smack.filter.NotFilter; import org.jivesoftware.smack.filter.PacketExtensionFilter; import org.jivesoftware.smack.filter.PacketFilter; import org.jivesoftware.smack.filter.PacketTypeFilter; @@ -83,6 +85,10 @@ import org.jivesoftware.smackx.xdata.Form; * are "moderator", "participant", and "visitor". Each role and affiliation guarantees * different privileges (e.g. Send messages to all occupants, Kick participants and visitors, * Grant voice, Edit member list, etc.). + *

+ * Note: Make sure to leave the MUC ({@link #leave()}) before you drop the reference to + * it, or otherwise you may leak the instance. + *

* * @author Gaston Dombiak, Larry Kirschner */ @@ -109,17 +115,29 @@ public class MultiUserChat { private final Set messageListeners = new CopyOnWriteArraySet(); private final Set presenceListeners = new CopyOnWriteArraySet(); private final Set presenceInterceptors = new CopyOnWriteArraySet(); - private final ConnectionDetachedPacketCollector messageCollector = new ConnectionDetachedPacketCollector(); + + /** + * This filter will match all stanzas send from the groupchat or from one if + * the groupchat participants, i.e. it filters only the bare JID of the from + * attribute against the JID of the MUC. + */ + private final PacketFilter fromRoomFilter; + + /** + * Same as {@link #fromRoomFilter} together with {@link MessageTypeFilter#GROUPCHAT}. + */ + private final PacketFilter fromRoomGroupchatFilter; private final PacketListener presenceInterceptor; - private final PacketFilter fromRoomFilter; private final PacketListener messageListener; private final PacketListener presenceListener; - private final RoomListenerMultiplexor roomListenerMultiplexor; + private final PacketListener subjectListener; + private final PacketListener declinesListener; private String subject; private String nickname = null; private boolean joined = false; + private PacketCollector messageCollector; static { XMPPConnectionRegistry.addConnectionCreationListener(new ConnectionCreationListener() { @@ -181,6 +199,9 @@ public class MultiUserChat { this.connection = connection; this.room = room.toLowerCase(Locale.US); + fromRoomFilter = FromMatchesFilter.create(room); + fromRoomGroupchatFilter = new AndFilter(fromRoomFilter, MessageTypeFilter.GROUPCHAT); + messageListener = new PacketListener() { @Override public void processPacket(Packet packet) throws NotConnectedException { @@ -190,19 +211,9 @@ public class MultiUserChat { } } }; - presenceListener = new PacketListener() { - @Override - public void processPacket(Packet packet) throws NotConnectedException { - Presence presence = (Presence) packet; - for (PresenceListener listener : presenceListeners) { - listener.processPresence(presence); - } - } - }; - fromRoomFilter = FromMatchesFilter.create(room); // Create a listener for subject updates. - PacketListener subjectListener = new PacketListener() { + subjectListener = new PacketListener() { public void processPacket(Packet packet) { Message msg = (Message) packet; // Update the room subject @@ -216,7 +227,7 @@ public class MultiUserChat { }; // Create a listener for all presence updates. - PacketListener presenceListener = new PacketListener() { + presenceListener = new PacketListener() { public void processPacket(Packet packet) { Presence presence = (Presence) packet; String from = presence.getFrom(); @@ -270,34 +281,27 @@ public class MultiUserChat { } } } + for (PresenceListener listener : presenceListeners) { + listener.processPresence(presence); + } } }; // Listens for all messages that include a MUCUser extension and fire the invitation // rejection listeners if the message includes an invitation rejection. - PacketListener declinesListener = new PacketListener() { + declinesListener = new PacketListener() { public void processPacket(Packet packet) { // Get the MUC User extension MUCUser mucUser = MUCUser.from(packet); // Check if the MUCUser informs that the invitee has declined the invitation - if (mucUser.getDecline() != null && - ((Message) packet).getType() != Message.Type.error) { - // Fire event for invitation rejection listeners - fireInvitationRejectionListeners( - mucUser.getDecline().getFrom(), - mucUser.getDecline().getReason()); + if (mucUser.getDecline() == null) { + return; } + // Fire event for invitation rejection listeners + fireInvitationRejectionListeners(mucUser.getDecline().getFrom(), mucUser.getDecline().getReason()); } }; - PacketMultiplexListener packetMultiplexor = new PacketMultiplexListener( - messageCollector, presenceListener, subjectListener, - declinesListener); - - roomListenerMultiplexor = RoomListenerMultiplexor.getRoomMultiplexor(connection); - - roomListenerMultiplexor.addRoom(room, packetMultiplexor); - presenceInterceptor = new PacketListener() { @Override public void processPacket(Packet packet) { @@ -476,12 +480,16 @@ public class MultiUserChat { this.nickname = nickname; joined = true; // Setup the messageListeners and presenceListeners - connection.addPacketListener(messageListener, new AndFilter(fromRoomFilter, - MessageTypeFilter.GROUPCHAT)); + connection.addPacketListener(messageListener, fromRoomGroupchatFilter); connection.addPacketListener(presenceListener, new AndFilter(fromRoomFilter, PacketTypeFilter.PRESENCE)); + connection.addPacketListener(subjectListener, new AndFilter(fromRoomFilter, + MessageWithSubjectFilter.INSTANCE)); + connection.addPacketListener(declinesListener, new AndFilter(new PacketExtensionFilter(MUCUser.ELEMENT, + MUCUser.NAMESPACE), new NotFilter(MessageTypeFilter.ERROR))); connection.addPacketInterceptor(presenceInterceptor, new AndFilter(new ToFilter(room), PacketTypeFilter.PRESENCE)); + messageCollector = connection.createPacketCollector(fromRoomGroupchatFilter); // Update the list of joined rooms through this connection List rooms = joinedRooms.get(connection); if (rooms == null) { @@ -1821,8 +1829,12 @@ public class MultiUserChat { * * @return the next message if one is immediately available and * null otherwise. + * @throws MUCNotJoinedException */ - public Message pollMessage() { + public Message pollMessage() throws MUCNotJoinedException { + if (messageCollector == null) { + throw new MUCNotJoinedException(this); + } return messageCollector.pollResult(); } @@ -1831,8 +1843,12 @@ public class MultiUserChat { * (not return) until a message is available. * * @return the next message. + * @throws MUCNotJoinedException */ - public Message nextMessage() { + public Message nextMessage() throws MUCNotJoinedException { + if (messageCollector == null) { + throw new MUCNotJoinedException(this); + } return messageCollector.nextResult(); } @@ -1844,8 +1860,12 @@ public class MultiUserChat { * @param timeout the maximum amount of time to wait for the next message. * @return the next message, or null if the timeout elapses without a * message becoming available. + * @throws MUCNotJoinedException */ - public Message nextMessage(long timeout) { + public Message nextMessage(long timeout) throws MUCNotJoinedException { + if (messageCollector == null) { + throw new MUCNotJoinedException(this); + } return messageCollector.nextResult(timeout); } @@ -1891,11 +1911,8 @@ public class MultiUserChat { Message message = new Message(room, Message.Type.groupchat); message.setSubject(subject); // Wait for an error or confirmation message back from the server. - PacketFilter responseFilter = - new AndFilter( - FromMatchesFilter.create(room), - new PacketTypeFilter(Message.class)); - responseFilter = new AndFilter(responseFilter, new PacketFilter() { + PacketFilter responseFilter = new AndFilter(fromRoomGroupchatFilter, new PacketFilter() { + @Override public boolean accept(Packet packet) { Message msg = (Message) packet; return subject.equals(msg.getSubject()); @@ -1917,9 +1934,13 @@ public class MultiUserChat { } connection.removePacketListener(messageListener); connection.removePacketListener(presenceListener); + connection.removePacketListener(declinesListener); connection.removePacketInterceptor(presenceInterceptor); + if (messageCollector != null) { + messageCollector.cancel(); + messageCollector = null; + } rooms.remove(room); - cleanup(); } /** @@ -2334,21 +2355,6 @@ public class MultiUserChat { } } - private void cleanup() { - try { - if (connection != null) { - roomListenerMultiplexor.removeRoom(room); - } - } catch (Exception e) { - // Do nothing - } - } - - protected void finalize() throws Throwable { - cleanup(); - super.finalize(); - } - /** * An InvitationsMonitor monitors a given connection to detect room invitations. Every * time the InvitationsMonitor detects a new invitation it will fire the invitation listeners. diff --git a/smack-extensions/src/main/java/org/jivesoftware/smackx/muc/PacketMultiplexListener.java b/smack-extensions/src/main/java/org/jivesoftware/smackx/muc/PacketMultiplexListener.java deleted file mode 100644 index b64915a19..000000000 --- a/smack-extensions/src/main/java/org/jivesoftware/smackx/muc/PacketMultiplexListener.java +++ /dev/null @@ -1,87 +0,0 @@ -/** - * - * Copyright 2003-2006 Jive Software. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.jivesoftware.smackx.muc; - -import org.jivesoftware.smack.PacketListener; -import org.jivesoftware.smack.SmackException.NotConnectedException; -import org.jivesoftware.smack.filter.MessageTypeFilter; -import org.jivesoftware.smack.filter.MessageWithSubjectFilter; -import org.jivesoftware.smack.filter.PacketExtensionFilter; -import org.jivesoftware.smack.filter.PacketFilter; -import org.jivesoftware.smack.filter.PacketTypeFilter; -import org.jivesoftware.smack.packet.Message; -import org.jivesoftware.smack.packet.Packet; -import org.jivesoftware.smack.packet.Presence; -import org.jivesoftware.smackx.muc.packet.MUCUser; - -/** - * The single PacketListener used by each {@link MultiUserChat} - * for all basic processing of presence, and message packets targeted to that chat. - * - * @author Larry Kirschner - */ -class PacketMultiplexListener implements PacketListener { - - private static final PacketFilter PRESENCE_FILTER = new PacketTypeFilter(Presence.class); - private static final PacketFilter DECLINES_FILTER = new PacketExtensionFilter(MUCUser.ELEMENT, - MUCUser.NAMESPACE); - - private ConnectionDetachedPacketCollector messageCollector; - private PacketListener presenceListener; - private PacketListener subjectListener; - private PacketListener declinesListener; - - public PacketMultiplexListener( - ConnectionDetachedPacketCollector messageCollector, - PacketListener presenceListener, - PacketListener subjectListener, PacketListener declinesListener) { - if (messageCollector == null) { - throw new IllegalArgumentException("MessageCollector is null"); - } - if (presenceListener == null) { - throw new IllegalArgumentException("Presence listener is null"); - } - if (subjectListener == null) { - throw new IllegalArgumentException("Subject listener is null"); - } - if (declinesListener == null) { - throw new IllegalArgumentException("Declines listener is null"); - } - this.messageCollector = messageCollector; - this.presenceListener = presenceListener; - this.subjectListener = subjectListener; - this.declinesListener = declinesListener; - } - - public void processPacket(Packet p) throws NotConnectedException { - if (PRESENCE_FILTER.accept(p)) { - presenceListener.processPacket(p); - } - else if (MessageTypeFilter.GROUPCHAT.accept(p)) { - messageCollector.processPacket((Message) p); - - if (MessageWithSubjectFilter.INSTANCE.accept(p)) { - subjectListener.processPacket(p); - } - } - else if (DECLINES_FILTER.accept(p)) { - declinesListener.processPacket(p); - } - } - -} diff --git a/smack-extensions/src/main/java/org/jivesoftware/smackx/muc/RoomListenerMultiplexor.java b/smack-extensions/src/main/java/org/jivesoftware/smackx/muc/RoomListenerMultiplexor.java deleted file mode 100644 index 901f0b235..000000000 --- a/smack-extensions/src/main/java/org/jivesoftware/smackx/muc/RoomListenerMultiplexor.java +++ /dev/null @@ -1,170 +0,0 @@ -/** - * - * Copyright 2003-2006 Jive Software. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.jivesoftware.smackx.muc; - -import org.jivesoftware.smack.Manager; -import org.jivesoftware.smack.PacketListener; -import org.jivesoftware.smack.SmackException.NotConnectedException; -import org.jivesoftware.smack.XMPPConnection; -import org.jivesoftware.smack.filter.PacketFilter; -import org.jivesoftware.smack.packet.Packet; -import org.jxmpp.util.XmppStringUtils; - -import java.util.Locale; -import java.util.Map; -import java.util.WeakHashMap; -import java.util.concurrent.ConcurrentHashMap; - -/** - * A RoomListenerMultiplexor multiplexes incoming packets on - * a XMPPConnection using a single listener/filter pair. - * A single RoomListenerMultiplexor is created for each - * {@link org.jivesoftware.smack.XMPPConnection} that has joined MUC rooms - * within its session. - * - * @author Larry Kirschner - */ -class RoomListenerMultiplexor extends Manager { - - // We use a WeakHashMap so that the GC can collect the monitor when the - // connection is no longer referenced by any object. - private static final Map monitors = new WeakHashMap(); - - private final RoomMultiplexFilter filter; - private final RoomMultiplexListener listener; - - /** - * Returns a new or existing RoomListenerMultiplexor for a given connection. - * - * @param conn the connection to monitor for room invitations. - * @return a new or existing RoomListenerMultiplexor for a given connection. - */ - public static synchronized RoomListenerMultiplexor getRoomMultiplexor(XMPPConnection conn) { - RoomListenerMultiplexor rlm = monitors.get(conn); - if (rlm == null) { - rlm = new RoomListenerMultiplexor(conn, new RoomMultiplexFilter(), - new RoomMultiplexListener()); - } - // Return the InvitationsMonitor that monitors the connection - return rlm; - } - - /** - * All access should be through - * the static method {@link #getRoomMultiplexor(XMPPConnection)}. - */ - private RoomListenerMultiplexor(XMPPConnection connection, RoomMultiplexFilter filter, - RoomMultiplexListener listener) { - super(connection); - connection.addPacketListener(listener, filter); - - this.filter = filter; - this.listener = listener; - monitors.put(connection, this); - } - - public void addRoom(String address, PacketMultiplexListener roomListener) { - filter.addRoom(address); - listener.addRoom(address, roomListener); - } - - public void removeRoom(String address) { - filter.removeRoom(address); - listener.removeRoom(address); - } - - /** - * The single XMPPConnection-level PacketFilter used by a {@link RoomListenerMultiplexor} - * for all muc chat rooms on an XMPPConnection. - * Each time a muc chat room is added to/removed from an - * XMPPConnection the address for that chat room - * is added to/removed from that XMPPConnection's - * RoomMultiplexFilter. - */ - private static class RoomMultiplexFilter implements PacketFilter { - - private Map roomAddressTable = new ConcurrentHashMap(); - - public boolean accept(Packet p) { - String from = p.getFrom(); - if (from == null) { - return false; - } - return roomAddressTable.containsKey(XmppStringUtils.parseBareAddress(from).toLowerCase(Locale.US)); - } - - public void addRoom(String address) { - if (address == null) { - return; - } - roomAddressTable.put(address.toLowerCase(Locale.US), address); - } - - public void removeRoom(String address) { - if (address == null) { - return; - } - roomAddressTable.remove(address.toLowerCase(Locale.US)); - } - } - - /** - * The single XMPPConnection-level PacketListener - * used by a {@link RoomListenerMultiplexor} - * for all muc chat rooms on an XMPPConnection. - * Each time a muc chat room is added to/removed from an - * XMPPConnection the address and listener for that chat room - * are added to/removed from that XMPPConnection's - * RoomMultiplexListener. - * - * @author Larry Kirschner - */ - private static class RoomMultiplexListener implements PacketListener { - - private Map roomListenersByAddress = - new ConcurrentHashMap(); - - public void processPacket(Packet p) throws NotConnectedException { - String from = p.getFrom(); - if (from == null) { - return; - } - - PacketMultiplexListener listener = - roomListenersByAddress.get(XmppStringUtils.parseBareAddress(from).toLowerCase(Locale.US)); - - if (listener != null) { - listener.processPacket(p); - } - } - - public void addRoom(String address, PacketMultiplexListener listener) { - if (address == null) { - return; - } - roomListenersByAddress.put(address.toLowerCase(Locale.US), listener); - } - - public void removeRoom(String address) { - if (address == null) { - return; - } - roomListenersByAddress.remove(address.toLowerCase(Locale.US)); - } - } -} diff --git a/smack-extensions/src/test/java/org/jivesoftware/smackx/muc/ConnectionDetachedPacketCollectorTest.java b/smack-extensions/src/test/java/org/jivesoftware/smackx/muc/ConnectionDetachedPacketCollectorTest.java deleted file mode 100644 index 61f05f22b..000000000 --- a/smack-extensions/src/test/java/org/jivesoftware/smackx/muc/ConnectionDetachedPacketCollectorTest.java +++ /dev/null @@ -1,196 +0,0 @@ -/** - * - * Copyright the original author or authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.jivesoftware.smackx.muc; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; - -import org.jivesoftware.smack.packet.Packet; -import org.junit.Test; - -public class ConnectionDetachedPacketCollectorTest -{ - - @Test - public void verifyRollover() - { - ConnectionDetachedPacketCollector collector = new ConnectionDetachedPacketCollector(5); - - for (int i=0; i<6; i++) - { - Packet testPacket = new TestPacket(i); - collector.processPacket(testPacket); - } - - // Assert that '0' has rolled off - assertEquals("1", collector.nextResult().getPacketID()); - assertEquals("2", collector.nextResult().getPacketID()); - assertEquals("3", collector.nextResult().getPacketID()); - assertEquals("4", collector.nextResult().getPacketID()); - assertEquals("5", collector.pollResult().getPacketID()); - assertNull(collector.pollResult()); - - for (int i=10; i<15; i++) - { - Packet testPacket = new TestPacket(i); - collector.processPacket(testPacket); - } - - assertEquals("10", collector.nextResult().getPacketID()); - assertEquals("11", collector.nextResult().getPacketID()); - assertEquals("12", collector.nextResult().getPacketID()); - assertEquals("13", collector.nextResult().getPacketID()); - assertEquals("14", collector.pollResult().getPacketID()); - assertNull(collector.pollResult()); - - assertNull(collector.nextResult(1000)); - } - - /** - * Although this doesn't guarentee anything due to the nature of threading, it can - * potentially catch problems. - */ - @Test - public void verifyThreadSafety() - { - int insertCount = 500; - final ConnectionDetachedPacketCollector collector = new ConnectionDetachedPacketCollector(insertCount); - - Thread consumer1 = new Thread(new Runnable() - { - @Override - public void run() - { - try - { - while (true) - { - try - { - Thread.sleep(3); - } - catch (InterruptedException e) - { - } - @SuppressWarnings("unused") - Packet packet = collector.nextResult(); -// System.out.println(Thread.currentThread().getName() + " packet: " + packet); - } - } - catch (RuntimeException re) - { - if (re.getCause() instanceof InterruptedException) - { -// System.out.println(Thread.currentThread().getName() + " has been interupted"); - } - } - } - }); - consumer1.setName("consumer 1"); - - Thread consumer2 = new Thread(new Runnable() - { - @Override - public void run() - { - Packet p = null; - - do - { - try - { - Thread.sleep(3); - } - catch (InterruptedException e) - { - } - p = collector.nextResult(1); -// System.out.println(Thread.currentThread().getName() + " packet: " + p); - } - while (p != null); - } - }); - consumer2.setName("consumer 2"); - - Thread consumer3 = new Thread(new Runnable() - { - @Override - public void run() - { - Packet p = null; - - do - { - try - { - Thread.sleep(3); - } - catch (InterruptedException e) - { - } - p = collector.pollResult(); -// System.out.println(Thread.currentThread().getName() + " packet: " + p); - } - while (p != null); - } - }); - consumer3.setName("consumer 3"); - - consumer1.start(); - consumer2.start(); - consumer3.start(); - - for(int i=0; i" + getPacketID() + ""; - } - } -}