Reworked (send|recv)Listeners, ListenerWrapper

instead of rely on ListenerWrapper checking the PacketFilter *and*
invoking the PacketListener we now use two for-each loops, where the
first filters the PacketListeners that should get invoked and use the
second for-each loop to actually invoke the PacketListener.

Before, the code was not thread safe if a PacketListener would remove
itself from the (send|recv)Listeners.

Also make packet(Listener|Filter) in ListenerWrapper final.
This commit is contained in:
Florian Schmaus 2014-10-10 00:36:58 +02:00
parent 03686fbaaf
commit 15d59299a2
2 changed files with 61 additions and 54 deletions

View File

@ -21,11 +21,12 @@ import java.io.Reader;
import java.io.Writer; import java.io.Writer;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ScheduledThreadPoolExecutor;
@ -107,14 +108,14 @@ public abstract class AbstractXMPPConnection implements XMPPConnection {
/** /**
* List of PacketListeners that will be notified when a new packet was received. * List of PacketListeners that will be notified when a new packet was received.
*/ */
protected final Map<PacketListener, ListenerWrapper> recvListeners = private final Map<PacketListener, ListenerWrapper> recvListeners =
new ConcurrentHashMap<PacketListener, ListenerWrapper>(); new HashMap<PacketListener, ListenerWrapper>();
/** /**
* List of PacketListeners that will be notified when a new packet was sent. * List of PacketListeners that will be notified when a new packet was sent.
*/ */
protected final Map<PacketListener, ListenerWrapper> sendListeners = private final Map<PacketListener, ListenerWrapper> sendListeners =
new ConcurrentHashMap<PacketListener, ListenerWrapper>(); new HashMap<PacketListener, ListenerWrapper>();
/** /**
* List of PacketInterceptors that will be notified when a new packet is about to be * List of PacketInterceptors that will be notified when a new packet is about to be
@ -672,12 +673,16 @@ public abstract class AbstractXMPPConnection implements XMPPConnection {
throw new NullPointerException("Packet listener is null."); throw new NullPointerException("Packet listener is null.");
} }
ListenerWrapper wrapper = new ListenerWrapper(packetListener, packetFilter); ListenerWrapper wrapper = new ListenerWrapper(packetListener, packetFilter);
recvListeners.put(packetListener, wrapper); synchronized (recvListeners) {
recvListeners.put(packetListener, wrapper);
}
} }
@Override @Override
public boolean removePacketListener(PacketListener packetListener) { public boolean removePacketListener(PacketListener packetListener) {
return recvListeners.remove(packetListener) != null; synchronized (recvListeners) {
return recvListeners.remove(packetListener) != null;
}
} }
@Override @Override
@ -686,34 +691,35 @@ public abstract class AbstractXMPPConnection implements XMPPConnection {
throw new NullPointerException("Packet listener is null."); throw new NullPointerException("Packet listener is null.");
} }
ListenerWrapper wrapper = new ListenerWrapper(packetListener, packetFilter); ListenerWrapper wrapper = new ListenerWrapper(packetListener, packetFilter);
sendListeners.put(packetListener, wrapper); synchronized (sendListeners) {
sendListeners.put(packetListener, wrapper);
}
} }
@Override @Override
public void removePacketSendingListener(PacketListener packetListener) { public void removePacketSendingListener(PacketListener packetListener) {
sendListeners.remove(packetListener); synchronized (sendListeners) {
sendListeners.remove(packetListener);
}
} }
/**
* Get a map of all packet listeners for sending packets of this connection.
*
* @return a map of all packet listeners for sent packets.
*/
protected Map<PacketListener, ListenerWrapper> getPacketSendingListeners() {
return sendListeners;
}
/** /**
* Process all packet listeners for sending packets. * Process all packet listeners for sending packets.
* *
* @param packet the packet to process. * @param packet the packet to process.
*/ */
private void firePacketSendingListeners(Packet packet) { private void firePacketSendingListeners(Packet packet) {
// Notify the listeners of the new sent packet List<PacketListener> listenersToNotify = new LinkedList<PacketListener>();
for (ListenerWrapper listenerWrapper : sendListeners.values()) { synchronized (sendListeners) {
for (ListenerWrapper listenerWrapper : sendListeners.values()) {
if (listenerWrapper.filterMatches(packet)) {
listenersToNotify.add(listenerWrapper.getListener());
}
}
}
for (PacketListener listener : listenersToNotify) {
try { try {
listenerWrapper.notifyListener(packet); listener.processPacket(packet);
} }
catch (NotConnectedException e) { catch (NotConnectedException e) {
LOGGER.log(Level.WARNING, "Got not connected exception, aborting"); LOGGER.log(Level.WARNING, "Got not connected exception, aborting");
@ -819,28 +825,41 @@ public abstract class AbstractXMPPConnection implements XMPPConnection {
executorService.submit(new ListenerNotification(packet)); executorService.submit(new ListenerNotification(packet));
} }
protected void notifiyReceivedListeners(Packet packet) {
List<PacketListener> listenersToNotify = new LinkedList<PacketListener>();
synchronized (recvListeners) {
for (ListenerWrapper listenerWrapper : recvListeners.values()) {
if (listenerWrapper.filterMatches(packet)) {
listenersToNotify.add(listenerWrapper.getListener());
}
}
}
for (PacketListener listener : listenersToNotify) {
try {
listener.processPacket(packet);
} catch(NotConnectedException e) {
LOGGER.log(Level.WARNING, "Got not connected exception, aborting", e);
break;
} catch (Exception e) {
LOGGER.log(Level.SEVERE, "Exception in packet listener", e);
}
}
}
/** /**
* A runnable to notify all listeners of a packet. * A runnable to notify all listeners of a packet.
*/ */
private class ListenerNotification implements Runnable { private class ListenerNotification implements Runnable {
private Packet packet; private final Packet packet;
public ListenerNotification(Packet packet) { public ListenerNotification(Packet packet) {
this.packet = packet; this.packet = packet;
} }
public void run() { public void run() {
for (ListenerWrapper listenerWrapper : recvListeners.values()) { notifiyReceivedListeners(packet);
try {
listenerWrapper.notifyListener(packet);
} catch(NotConnectedException e) {
LOGGER.log(Level.WARNING, "Got not connected exception, aborting", e);
break;
} catch (Exception e) {
LOGGER.log(Level.SEVERE, "Exception in packet listener", e);
}
}
} }
} }
@ -901,8 +920,8 @@ public abstract class AbstractXMPPConnection implements XMPPConnection {
*/ */
protected static class ListenerWrapper { protected static class ListenerWrapper {
private PacketListener packetListener; private final PacketListener packetListener;
private PacketFilter packetFilter; private final PacketFilter packetFilter;
/** /**
* Create a class which associates a packet filter with a listener. * Create a class which associates a packet filter with a listener.
@ -915,16 +934,12 @@ public abstract class AbstractXMPPConnection implements XMPPConnection {
this.packetFilter = packetFilter; this.packetFilter = packetFilter;
} }
/** public boolean filterMatches(Packet packet) {
* Notify and process the packet listener if the filter matches the packet. return packetFilter == null || packetFilter.accept(packet);
* }
* @param packet the packet which was sent or received.
* @throws NotConnectedException public PacketListener getListener() {
*/ return packetListener;
public void notifyListener(Packet packet) throws NotConnectedException {
if (packetFilter == null || packetFilter.accept(packet)) {
packetListener.processPacket(packet);
}
} }
} }

View File

@ -23,7 +23,6 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.jivesoftware.smack.SmackException.NotConnectedException;
import org.jivesoftware.smack.packet.Element; import org.jivesoftware.smack.packet.Element;
import org.jivesoftware.smack.packet.Packet; import org.jivesoftware.smack.packet.Packet;
import org.jivesoftware.smack.packet.PlainStreamElement; import org.jivesoftware.smack.packet.PlainStreamElement;
@ -251,13 +250,6 @@ public class DummyConnection extends AbstractXMPPConnection {
} }
// Deliver the incoming packet to listeners. // Deliver the incoming packet to listeners.
for (ListenerWrapper listenerWrapper : recvListeners.values()) { notifiyReceivedListeners(packet);
try {
listenerWrapper.notifyListener(packet);
}
catch (NotConnectedException e) {
e.printStackTrace();
}
}
} }
} }