From 15d59299a2c28d9a40d543f179c00803b7f6a2ec Mon Sep 17 00:00:00 2001 From: Florian Schmaus Date: Fri, 10 Oct 2014 00:36:58 +0200 Subject: [PATCH] Reworked (send|recv)Listeners, ListenerWrapper instead of rely on ListenerWrapper checking the PacketFilter *and* invoking the PacketListener we now use two for-each loops, where the first filters the PacketListeners that should get invoked and use the second for-each loop to actually invoke the PacketListener. Before, the code was not thread safe if a PacketListener would remove itself from the (send|recv)Listeners. Also make packet(Listener|Filter) in ListenerWrapper final. --- .../smack/AbstractXMPPConnection.java | 105 ++++++++++-------- .../jivesoftware/smack/DummyConnection.java | 10 +- 2 files changed, 61 insertions(+), 54 deletions(-) diff --git a/smack-core/src/main/java/org/jivesoftware/smack/AbstractXMPPConnection.java b/smack-core/src/main/java/org/jivesoftware/smack/AbstractXMPPConnection.java index ab2884181..e7b91404d 100644 --- a/smack-core/src/main/java/org/jivesoftware/smack/AbstractXMPPConnection.java +++ b/smack-core/src/main/java/org/jivesoftware/smack/AbstractXMPPConnection.java @@ -21,11 +21,12 @@ import java.io.Reader; import java.io.Writer; import java.util.Collection; import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; @@ -107,14 +108,14 @@ public abstract class AbstractXMPPConnection implements XMPPConnection { /** * List of PacketListeners that will be notified when a new packet was received. */ - protected final Map recvListeners = - new ConcurrentHashMap(); + private final Map recvListeners = + new HashMap(); /** * List of PacketListeners that will be notified when a new packet was sent. */ - protected final Map sendListeners = - new ConcurrentHashMap(); + private final Map sendListeners = + new HashMap(); /** * List of PacketInterceptors that will be notified when a new packet is about to be @@ -672,12 +673,16 @@ public abstract class AbstractXMPPConnection implements XMPPConnection { throw new NullPointerException("Packet listener is null."); } ListenerWrapper wrapper = new ListenerWrapper(packetListener, packetFilter); - recvListeners.put(packetListener, wrapper); + synchronized (recvListeners) { + recvListeners.put(packetListener, wrapper); + } } @Override public boolean removePacketListener(PacketListener packetListener) { - return recvListeners.remove(packetListener) != null; + synchronized (recvListeners) { + return recvListeners.remove(packetListener) != null; + } } @Override @@ -686,34 +691,35 @@ public abstract class AbstractXMPPConnection implements XMPPConnection { throw new NullPointerException("Packet listener is null."); } ListenerWrapper wrapper = new ListenerWrapper(packetListener, packetFilter); - sendListeners.put(packetListener, wrapper); + synchronized (sendListeners) { + sendListeners.put(packetListener, wrapper); + } } @Override public void removePacketSendingListener(PacketListener packetListener) { - sendListeners.remove(packetListener); + synchronized (sendListeners) { + sendListeners.remove(packetListener); + } } - /** - * Get a map of all packet listeners for sending packets of this connection. - * - * @return a map of all packet listeners for sent packets. - */ - protected Map getPacketSendingListeners() { - return sendListeners; - } - - /** * Process all packet listeners for sending packets. * * @param packet the packet to process. */ private void firePacketSendingListeners(Packet packet) { - // Notify the listeners of the new sent packet - for (ListenerWrapper listenerWrapper : sendListeners.values()) { + List listenersToNotify = new LinkedList(); + synchronized (sendListeners) { + for (ListenerWrapper listenerWrapper : sendListeners.values()) { + if (listenerWrapper.filterMatches(packet)) { + listenersToNotify.add(listenerWrapper.getListener()); + } + } + } + for (PacketListener listener : listenersToNotify) { try { - listenerWrapper.notifyListener(packet); + listener.processPacket(packet); } catch (NotConnectedException e) { LOGGER.log(Level.WARNING, "Got not connected exception, aborting"); @@ -819,28 +825,41 @@ public abstract class AbstractXMPPConnection implements XMPPConnection { executorService.submit(new ListenerNotification(packet)); } + protected void notifiyReceivedListeners(Packet packet) { + List listenersToNotify = new LinkedList(); + synchronized (recvListeners) { + for (ListenerWrapper listenerWrapper : recvListeners.values()) { + if (listenerWrapper.filterMatches(packet)) { + listenersToNotify.add(listenerWrapper.getListener()); + } + } + } + + for (PacketListener listener : listenersToNotify) { + try { + listener.processPacket(packet); + } catch(NotConnectedException e) { + LOGGER.log(Level.WARNING, "Got not connected exception, aborting", e); + break; + } catch (Exception e) { + LOGGER.log(Level.SEVERE, "Exception in packet listener", e); + } + } + } + /** * A runnable to notify all listeners of a packet. */ private class ListenerNotification implements Runnable { - private Packet packet; + private final Packet packet; public ListenerNotification(Packet packet) { this.packet = packet; } public void run() { - for (ListenerWrapper listenerWrapper : recvListeners.values()) { - try { - listenerWrapper.notifyListener(packet); - } catch(NotConnectedException e) { - LOGGER.log(Level.WARNING, "Got not connected exception, aborting", e); - break; - } catch (Exception e) { - LOGGER.log(Level.SEVERE, "Exception in packet listener", e); - } - } + notifiyReceivedListeners(packet); } } @@ -901,8 +920,8 @@ public abstract class AbstractXMPPConnection implements XMPPConnection { */ protected static class ListenerWrapper { - private PacketListener packetListener; - private PacketFilter packetFilter; + private final PacketListener packetListener; + private final PacketFilter packetFilter; /** * Create a class which associates a packet filter with a listener. @@ -915,16 +934,12 @@ public abstract class AbstractXMPPConnection implements XMPPConnection { this.packetFilter = packetFilter; } - /** - * Notify and process the packet listener if the filter matches the packet. - * - * @param packet the packet which was sent or received. - * @throws NotConnectedException - */ - public void notifyListener(Packet packet) throws NotConnectedException { - if (packetFilter == null || packetFilter.accept(packet)) { - packetListener.processPacket(packet); - } + public boolean filterMatches(Packet packet) { + return packetFilter == null || packetFilter.accept(packet); + } + + public PacketListener getListener() { + return packetListener; } } diff --git a/smack-core/src/test/java/org/jivesoftware/smack/DummyConnection.java b/smack-core/src/test/java/org/jivesoftware/smack/DummyConnection.java index 92bbcbe91..deb02e445 100644 --- a/smack-core/src/test/java/org/jivesoftware/smack/DummyConnection.java +++ b/smack-core/src/test/java/org/jivesoftware/smack/DummyConnection.java @@ -23,7 +23,6 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; -import org.jivesoftware.smack.SmackException.NotConnectedException; import org.jivesoftware.smack.packet.Element; import org.jivesoftware.smack.packet.Packet; import org.jivesoftware.smack.packet.PlainStreamElement; @@ -251,13 +250,6 @@ public class DummyConnection extends AbstractXMPPConnection { } // Deliver the incoming packet to listeners. - for (ListenerWrapper listenerWrapper : recvListeners.values()) { - try { - listenerWrapper.notifyListener(packet); - } - catch (NotConnectedException e) { - e.printStackTrace(); - } - } + notifiyReceivedListeners(packet); } }