From f029b576a558f9a8d484eb8cf52c5e42a150e1c5 Mon Sep 17 00:00:00 2001 From: Florian Schmaus Date: Tue, 30 Dec 2014 20:45:15 +0100 Subject: [PATCH] Add XMPPConnection.addAsyncPacketListener(PacketListener, PacketFilter) and use this method in packet listeners that previously used Async.go(). --- .../smack/AbstractXMPPConnection.java | 49 ++++++++++++++++++- .../jivesoftware/smack/XMPPConnection.java | 23 +++++++++ .../filetransfer/FileTransferManager.java | 12 +---- .../smackx/muc/MultiUserChatManager.java | 16 ++---- 4 files changed, 75 insertions(+), 25 deletions(-) diff --git a/smack-core/src/main/java/org/jivesoftware/smack/AbstractXMPPConnection.java b/smack-core/src/main/java/org/jivesoftware/smack/AbstractXMPPConnection.java index b95e1d1ad..1633e5526 100644 --- a/smack-core/src/main/java/org/jivesoftware/smack/AbstractXMPPConnection.java +++ b/smack-core/src/main/java/org/jivesoftware/smack/AbstractXMPPConnection.java @@ -131,6 +131,8 @@ public abstract class AbstractXMPPConnection implements XMPPConnection { private final Map recvListeners = new HashMap(); + private final Map asyncRecvListeners = new HashMap<>(); + /** * List of PacketListeners that will be notified when a new packet was sent. */ @@ -741,6 +743,24 @@ public abstract class AbstractXMPPConnection implements XMPPConnection { } } + @Override + public void addAsyncPacketListener(PacketListener packetListener, PacketFilter packetFilter) { + if (packetListener == null) { + throw new NullPointerException("Packet listener is null."); + } + ListenerWrapper wrapper = new ListenerWrapper(packetListener, packetFilter); + synchronized (asyncRecvListeners) { + asyncRecvListeners.put(packetListener, wrapper); + } + } + + @Override + public boolean removeAsyncPacketListener(PacketListener packetListener) { + synchronized (asyncRecvListeners) { + return asyncRecvListeners.remove(packetListener) != null; + } + } + @Override public void addPacketSendingListener(PacketListener packetListener, PacketFilter packetFilter) { if (packetListener == null) { @@ -937,14 +957,39 @@ public abstract class AbstractXMPPConnection implements XMPPConnection { * * @param packet the packet to notify the PacketCollectors and receive listeners about. */ - protected void invokePacketCollectorsAndNotifyRecvListeners(Packet packet) { + protected void invokePacketCollectorsAndNotifyRecvListeners(final Packet packet) { + // First handle the async recv listeners. Note that this code is very similar to what follows a few lines below, + // the only difference is that asyncRecvListeners is used here and that the packet listeners are started in + // their own thread. + Collection listenersToNotify = new LinkedList(); + synchronized (asyncRecvListeners) { + for (ListenerWrapper listenerWrapper : asyncRecvListeners.values()) { + if (listenerWrapper.filterMatches(packet)) { + listenersToNotify.add(listenerWrapper.getListener()); + } + } + } + + for (final PacketListener listener : listenersToNotify) { + asyncGo(new Runnable() { + @Override + public void run() { + try { + listener.processPacket(packet); + } catch (Exception e) { + LOGGER.log(Level.SEVERE, "Exception in async packet listener", e); + } + } + }); + } + // Loop through all collectors and notify the appropriate ones. for (PacketCollector collector: collectors) { collector.processPacket(packet); } // Notify the receive listeners interested in the packet - List listenersToNotify = new LinkedList(); + listenersToNotify = new LinkedList(); synchronized (recvListeners) { for (ListenerWrapper listenerWrapper : recvListeners.values()) { if (listenerWrapper.filterMatches(packet)) { diff --git a/smack-core/src/main/java/org/jivesoftware/smack/XMPPConnection.java b/smack-core/src/main/java/org/jivesoftware/smack/XMPPConnection.java index d3f2c303f..5a3eb9d12 100644 --- a/smack-core/src/main/java/org/jivesoftware/smack/XMPPConnection.java +++ b/smack-core/src/main/java/org/jivesoftware/smack/XMPPConnection.java @@ -266,6 +266,29 @@ public interface XMPPConnection { */ public boolean removePacketListener(PacketListener packetListener); + /** + * Registers an asynchronous packet listener with this connection. A packet listener will be invoked only + * when an incoming packet is received. A packet filter determines which packets will be delivered to the listener. + * If the same packet listener is added again with a different filter, only the new filter will be used. + *

+ * Unlike {@link #addPacketListener(PacketListener, PacketFilter)} packet listeners added with this method will be + * invoked asynchronously in their own thread. Use this method if the order of the packet listeners must not depend + * on the order how the stanzas where received. + *

+ * + * @param packetListener the packet listener to notify of new received packets. + * @param packetFilter the packet filter to use. + */ + public void addAsyncPacketListener(PacketListener packetListener, PacketFilter packetFilter); + + /** + * Removes an asynchronous packet listener for received packets from this connection. + * + * @param packetListener the packet listener to remove. + * @return true if the packet listener was removed + */ + public boolean removeAsyncPacketListener(PacketListener packetListener); + /** * Registers a packet listener with this connection. The listener will be * notified of every packet that this connection sends. A packet filter determines diff --git a/smack-extensions/src/main/java/org/jivesoftware/smackx/filetransfer/FileTransferManager.java b/smack-extensions/src/main/java/org/jivesoftware/smackx/filetransfer/FileTransferManager.java index 04550d612..b0775b58b 100644 --- a/smack-extensions/src/main/java/org/jivesoftware/smackx/filetransfer/FileTransferManager.java +++ b/smack-extensions/src/main/java/org/jivesoftware/smackx/filetransfer/FileTransferManager.java @@ -26,7 +26,6 @@ import org.jivesoftware.smack.filter.PacketTypeFilter; import org.jivesoftware.smack.packet.IQ; import org.jivesoftware.smack.packet.Packet; import org.jivesoftware.smack.packet.XMPPError; -import org.jivesoftware.smack.util.Async; import org.jivesoftware.smackx.si.packet.StreamInitiation; import org.jxmpp.util.XmppStringUtils; @@ -74,21 +73,12 @@ public class FileTransferManager extends Manager { super(connection); this.fileTransferNegotiator = FileTransferNegotiator .getInstanceFor(connection); - connection.addPacketListener(new PacketListener() { + connection.addAsyncPacketListener(new PacketListener() { public void processPacket(Packet packet) { StreamInitiation si = (StreamInitiation) packet; final FileTransferRequest request = new FileTransferRequest(FileTransferManager.this, si); for (final FileTransferListener listener : listeners) { - // Those listeners need to be called asynchronously, in - // order to not block further processing of incoming - // stanzas. They also may send further requests, whose - // responses are not processed if we do not call them - // asynchronously. - Async.go(new Runnable() { - public void run() { listener.fileTransferRequest(request); - } - }); } } }, new AndFilter(new PacketTypeFilter(StreamInitiation.class), IQTypeFilter.SET)); diff --git a/smack-extensions/src/main/java/org/jivesoftware/smackx/muc/MultiUserChatManager.java b/smack-extensions/src/main/java/org/jivesoftware/smackx/muc/MultiUserChatManager.java index c563374bf..ba0dba8b3 100644 --- a/smack-extensions/src/main/java/org/jivesoftware/smackx/muc/MultiUserChatManager.java +++ b/smack-extensions/src/main/java/org/jivesoftware/smackx/muc/MultiUserChatManager.java @@ -43,7 +43,6 @@ import org.jivesoftware.smack.filter.NotFilter; import org.jivesoftware.smack.filter.PacketTypeFilter; import org.jivesoftware.smack.packet.Message; import org.jivesoftware.smack.packet.Packet; -import org.jivesoftware.smack.util.Async; import org.jivesoftware.smackx.disco.AbstractNodeInformationProvider; import org.jivesoftware.smackx.disco.ServiceDiscoveryManager; import org.jivesoftware.smackx.disco.packet.DiscoverInfo; @@ -114,13 +113,11 @@ public class MultiUserChatManager extends Manager { */ private final Map> multiUserChats = new HashMap>(); - private final PacketListener invitationPacketListener; - private MultiUserChatManager(XMPPConnection connection) { super(connection); // Listens for all messages that include a MUCUser extension and fire the invitation // listeners if the message includes an invitation. - invitationPacketListener = new PacketListener() { + PacketListener invitationPacketListener = new PacketListener() { public void processPacket(Packet packet) { final Message message = (Message) packet; // Get the MUCUser extension @@ -130,18 +127,13 @@ public class MultiUserChatManager extends Manager { // Fire event for invitation listeners final MultiUserChat muc = getMultiUserChat(packet.getFrom()); for (final InvitationListener listener : invitationsListeners) { - Async.go(new Runnable() { - @Override - public void run() { - listener.invitationReceived(connection(), muc, mucUser.getInvite().getFrom(), - mucUser.getInvite().getReason(), mucUser.getPassword(), message); - } - }); + listener.invitationReceived(connection(), muc, mucUser.getInvite().getFrom(), + mucUser.getInvite().getReason(), mucUser.getPassword(), message); } } } }; - connection.addPacketListener(invitationPacketListener, INVITATION_FILTER); + connection.addAsyncPacketListener(invitationPacketListener, INVITATION_FILTER); } /**