From 4c42d0cd32cc4e8459b4be773437eec54674e254 Mon Sep 17 00:00:00 2001 From: Florian Schmaus Date: Mon, 4 Feb 2019 21:58:35 +0100 Subject: [PATCH] Add concurrent but in-order listeners to XMPPConnection --- .../smack/AbstractXMPPConnection.java | 63 ++++++++++++++----- .../jivesoftware/smack/XMPPConnection.java | 22 +++++++ 2 files changed, 71 insertions(+), 14 deletions(-) diff --git a/smack-core/src/main/java/org/jivesoftware/smack/AbstractXMPPConnection.java b/smack-core/src/main/java/org/jivesoftware/smack/AbstractXMPPConnection.java index 1bdbf9fd9..79274bfe5 100644 --- a/smack-core/src/main/java/org/jivesoftware/smack/AbstractXMPPConnection.java +++ b/smack-core/src/main/java/org/jivesoftware/smack/AbstractXMPPConnection.java @@ -197,6 +197,8 @@ public abstract class AbstractXMPPConnection implements XMPPConnection { */ private final Collection collectors = new ConcurrentLinkedQueue<>(); + private final Map recvListeners = new LinkedHashMap<>(); + /** * List of PacketListeners that will be notified synchronously when a new stanza was received. */ @@ -330,6 +332,8 @@ public abstract class AbstractXMPPConnection implements XMPPConnection { protected static final AsyncButOrdered ASYNC_BUT_ORDERED = new AsyncButOrdered<>(); + protected final AsyncButOrdered inOrderListeners = new AsyncButOrdered<>(); + /** * The used host to establish the connection to */ @@ -940,6 +944,24 @@ public abstract class AbstractXMPPConnection implements XMPPConnection { collectors.remove(collector); } + @Override + public final void addStanzaListener(StanzaListener stanzaListener, StanzaFilter stanzaFilter) { + if (stanzaListener == null) { + throw new NullPointerException("Given stanza listener must not be null"); + } + ListenerWrapper wrapper = new ListenerWrapper(stanzaListener, stanzaFilter); + synchronized (recvListeners) { + recvListeners.put(stanzaListener, wrapper); + } + } + + @Override + public final boolean removeStanzaListener(StanzaListener stanzaListener) { + synchronized (recvListeners) { + return recvListeners.remove(stanzaListener) != null; + } + } + @Override public void addSyncStanzaListener(StanzaListener packetListener, StanzaFilter packetFilter) { if (packetListener == null) { @@ -1323,14 +1345,7 @@ public abstract class AbstractXMPPConnection implements XMPPConnection { // the only difference is that asyncRecvListeners is used here and that the packet listeners are started in // their own thread. final Collection listenersToNotify = new LinkedList<>(); - synchronized (asyncRecvListeners) { - for (ListenerWrapper listenerWrapper : asyncRecvListeners.values()) { - if (listenerWrapper.filterMatches(packet)) { - listenersToNotify.add(listenerWrapper.getListener()); - } - } - } - + extractMatchingListeners(packet, asyncRecvListeners, listenersToNotify); for (final StanzaListener listener : listenersToNotify) { asyncGo(new Runnable() { @Override @@ -1349,16 +1364,25 @@ public abstract class AbstractXMPPConnection implements XMPPConnection { collector.processStanza(packet); } - // Notify the receive listeners interested in the packet listenersToNotify.clear(); - synchronized (syncRecvListeners) { - for (ListenerWrapper listenerWrapper : syncRecvListeners.values()) { - if (listenerWrapper.filterMatches(packet)) { - listenersToNotify.add(listenerWrapper.getListener()); + extractMatchingListeners(packet, recvListeners, listenersToNotify); + for (StanzaListener stanzaListener : listenersToNotify) { + inOrderListeners.performAsyncButOrdered(stanzaListener, () -> { + try { + stanzaListener.processStanza(packet); } - } + catch (NotConnectedException e) { + LOGGER.log(Level.WARNING, "Got not connected exception, aborting", e); + } + catch (Exception e) { + LOGGER.log(Level.SEVERE, "Exception in packet listener", e); + } + }); } + // Notify the receive listeners interested in the packet + listenersToNotify.clear(); + extractMatchingListeners(packet, syncRecvListeners, listenersToNotify); // Decouple incoming stanza processing from listener invocation. Unlike async listeners, this uses a single // threaded executor service and therefore keeps the order. ASYNC_BUT_ORDERED.performAsyncButOrdered(this, new Runnable() { @@ -1391,6 +1415,17 @@ public abstract class AbstractXMPPConnection implements XMPPConnection { }); } + private static void extractMatchingListeners(Stanza stanza, Map listeners, + Collection listenersToNotify) { + synchronized (listeners) { + for (ListenerWrapper listenerWrapper : listeners.values()) { + if (listenerWrapper.filterMatches(stanza)) { + listenersToNotify.add(listenerWrapper.getListener()); + } + } + } + } + /** * Sets whether the connection has already logged in the server. This method assures that the * {@link #wasAuthenticated} flag is never reset once it has ever been set. diff --git a/smack-core/src/main/java/org/jivesoftware/smack/XMPPConnection.java b/smack-core/src/main/java/org/jivesoftware/smack/XMPPConnection.java index a871ed343..8c7354a2b 100644 --- a/smack-core/src/main/java/org/jivesoftware/smack/XMPPConnection.java +++ b/smack-core/src/main/java/org/jivesoftware/smack/XMPPConnection.java @@ -313,6 +313,28 @@ public interface XMPPConnection { */ void removeStanzaCollector(StanzaCollector collector); + /** + * Registers a stanza listener with this connection. The listener will be invoked when a (matching) incoming stanza + * is received. The stanza filter determines which stanzas will be delivered to the listener. It is guaranteed that + * the same listener will not be invoked concurrently and the the order of invocation will reflect the order in + * which the stanzas have been received. If the same stanza listener is added again with a different filter, only + * the new filter will be used. + * + * @param stanzaListener the stanza listener to notify of new received stanzas. + * @param stanzaFilter the stanza filter to use. + * @since 4.4 + */ + void addStanzaListener(StanzaListener stanzaListener, StanzaFilter stanzaFilter); + + /** + * Removes a stanza listener for received stanzas from this connection. + * + * @param stanzaListener the stanza listener to remove. + * @return true if the stanza listener was removed. + * @since 4.4 + */ + boolean removeStanzaListener(StanzaListener stanzaListener); + /** * Registers a synchronous stanza listener with this connection. A stanza listener will be invoked only when * an incoming stanza is received. A stanza filter determines which stanzas will be delivered to the listener. If