diff --git a/smack-core/src/main/java/org/jivesoftware/smack/AbstractXMPPConnection.java b/smack-core/src/main/java/org/jivesoftware/smack/AbstractXMPPConnection.java index 24f9238dc..52caca13b 100644 --- a/smack-core/src/main/java/org/jivesoftware/smack/AbstractXMPPConnection.java +++ b/smack-core/src/main/java/org/jivesoftware/smack/AbstractXMPPConnection.java @@ -29,13 +29,12 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; @@ -86,7 +85,6 @@ import org.jivesoftware.smack.util.DNSUtil; import org.jivesoftware.smack.util.Objects; import org.jivesoftware.smack.util.PacketParserUtils; import org.jivesoftware.smack.util.ParserUtils; -import org.jivesoftware.smack.util.SmackExecutorThreadFactory; import org.jivesoftware.smack.util.StringUtils; import org.jivesoftware.smack.util.dns.HostAddress; @@ -266,14 +264,7 @@ public abstract class AbstractXMPPConnection implements XMPPConnection { } }); - /** - * A executor service used to invoke the callbacks of synchronous stanza listeners. We use a executor service to - * decouple incoming stanza processing from callback invocation. It is important that order of callback invocation - * is the same as the order of the incoming stanzas. Therefore we use a single threaded executor service. - */ - private final ExecutorService singleThreadedExecutorService = new ThreadPoolExecutor(0, 1, 30L, - TimeUnit.SECONDS, new LinkedBlockingQueue(), - new SmackExecutorThreadFactory(this, "Single Threaded Executor")); + private static final AsyncButOrdered ASYNC_BUT_ORDERED = new AsyncButOrdered<>(); /** * The used host to establish the connection to @@ -1112,10 +1103,10 @@ public abstract class AbstractXMPPConnection implements XMPPConnection { LOGGER.log(Level.WARNING, "Exception while sending error IQ to unkown IQ request", e); } } else { - ExecutorService executorService = null; + Executor executorService = null; switch (iqRequestHandler.getMode()) { case sync: - executorService = singleThreadedExecutorService; + executorService = ASYNC_BUT_ORDERED.asExecutorFor(this); break; case async: executorService = CACHED_EXECUTOR_SERVICE; @@ -1192,7 +1183,7 @@ public abstract class AbstractXMPPConnection implements XMPPConnection { // Decouple incoming stanza processing from listener invocation. Unlike async listeners, this uses a single // threaded executor service and therefore keeps the order. - singleThreadedExecutorService.execute(new Runnable() { + ASYNC_BUT_ORDERED.performAsyncButOrdered(this, new Runnable() { @Override public void run() { for (StanzaListener listener : listenersToNotify) { @@ -1207,7 +1198,6 @@ public abstract class AbstractXMPPConnection implements XMPPConnection { } } }); - } /** @@ -1350,24 +1340,6 @@ public abstract class AbstractXMPPConnection implements XMPPConnection { return this.fromMode; } - @Override - protected void finalize() throws Throwable { - LOGGER.fine("finalizing " + this + ": Shutting down executor services"); - try { - // It's usually not a good idea to rely on finalize. But this is the easiest way to - // avoid the "Smack Listener Processor" leaking. The thread(s) of the executor have a - // reference to their ExecutorService which prevents the ExecutorService from being - // gc'ed. It is possible that the XMPPConnection instance is gc'ed while the - // listenerExecutor ExecutorService call not be gc'ed until it got shut down. - singleThreadedExecutorService.shutdownNow(); - } catch (Throwable t) { - LOGGER.log(Level.WARNING, "finalize() threw throwable", t); - } - finally { - super.finalize(); - } - } - protected final void parseFeatures(XmlPullParser parser) throws Exception { streamFeatures.clear(); final int initialDepth = parser.getDepth(); diff --git a/smack-core/src/main/java/org/jivesoftware/smack/AsyncButOrdered.java b/smack-core/src/main/java/org/jivesoftware/smack/AsyncButOrdered.java new file mode 100644 index 000000000..00fb94549 --- /dev/null +++ b/smack-core/src/main/java/org/jivesoftware/smack/AsyncButOrdered.java @@ -0,0 +1,142 @@ +/** + * + * Copyright 2018 Florian Schmaus + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.jivesoftware.smack; + +import java.util.Map; +import java.util.Queue; +import java.util.WeakHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executor; + +/** + * Helper class to perform an operation asynchronous but keeping the order in respect to a given key. + *

+ * A typical use pattern for this helper class consists of callbacks for an abstract entity where the order of callbacks + * matters, which eventually call user code in form of listeners. Since the order the callbacks matters, you need to use + * synchronous connection listeners. But if those listeners would invoke the user provided listeners, and if those user + * provided listeners would take a long time to complete, or even worse, block, then Smack's total progress is stalled, + * since synchronous connection listeners are invoked from the main event loop. + *

+ *

+ * It is common for those situations that the order of callbacks is not globally important, but only important in + * respect to an particular entity. Take chat state notifications (CSN) for example: Assume there are two contacts which + * send you CSNs. If a contact sends you first 'active' and then 'inactive, it is crucial that first the listener is + * called with 'active' and afterwards with 'inactive'. But if there is another contact is sending 'composing' followed + * by 'paused', then it is also important that the listeners are invoked in the correct order, but the order in which + * the listeners for those two contacts are invoked does not matter. + *

+ *

+ * Using this helper class, one would call {@link #performAsyncButOrdered(Object, Runnable)} which the remote contacts + * JID as first argument and a {@link Runnable} invoking the user listeners as second. This class guarantees that + * runnables of subsequent invocations are always executed after the runnables of previous invocations using the same + * key. + *

+ * + * @param the type of the key + * @since 4.3 + */ +public class AsyncButOrdered { + + private final Map> pendingRunnables = new WeakHashMap<>(); + + private final Map threadActiveMap = new WeakHashMap<>(); + + /** + * Invoke the given {@link Runnable} asynchronous but ordered in respect to the given key. + * + * @param key the key deriving the order + * @param runnable the {@link Runnable} to run + * @return true if a new thread was created + */ + public boolean performAsyncButOrdered(K key, Runnable runnable) { + Queue keyQueue; + synchronized (pendingRunnables) { + keyQueue = pendingRunnables.get(key); + if (keyQueue == null) { + keyQueue = new ConcurrentLinkedQueue<>(); + pendingRunnables.put(key, keyQueue); + } + } + + keyQueue.add(runnable); + + boolean newHandler; + synchronized (threadActiveMap) { + Boolean threadActive = threadActiveMap.get(key); + if (threadActive == null) { + threadActive = false; + threadActiveMap.put(key, threadActive); + } + + newHandler = !threadActive; + if (newHandler) { + Handler handler = new Handler(keyQueue, key); + threadActiveMap.put(key, true); + AbstractXMPPConnection.asyncGo(handler); + } + } + + return newHandler; + } + + public Executor asExecutorFor(final K key) { + return new Executor() { + @Override + public void execute(Runnable runnable) { + performAsyncButOrdered(key, runnable); + } + }; + } + + private class Handler implements Runnable { + private final Queue keyQueue; + private final K key; + + Handler(Queue keyQueue, K key) { + this.keyQueue = keyQueue; + this.key = key; + } + + @Override + public void run() { + mainloop: + while (true) { + Runnable runnable = null; + while ((runnable = keyQueue.poll()) != null) { + try { + runnable.run(); + } catch (Throwable t) { + // The run() method threw, this handler thread is going to terminate because of that. Ensure we note + // that in the map. + synchronized (threadActiveMap) { + threadActiveMap.put(key, false); + } + throw t; + } + } + + synchronized (threadActiveMap) { + // If the queue is empty, stop this handler, otherwise continue looping. + if (keyQueue.isEmpty()) { + threadActiveMap.put(key, false); + break mainloop; + } + } + } + } + } +} diff --git a/smack-core/src/main/java/org/jivesoftware/smack/util/SmackExecutorThreadFactory.java b/smack-core/src/main/java/org/jivesoftware/smack/util/SmackExecutorThreadFactory.java deleted file mode 100644 index 721b52a59..000000000 --- a/smack-core/src/main/java/org/jivesoftware/smack/util/SmackExecutorThreadFactory.java +++ /dev/null @@ -1,44 +0,0 @@ -/** - * - * Copyright 2014-2015 Florian Schmaus - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.jivesoftware.smack.util; - -import java.util.concurrent.ThreadFactory; - -import org.jivesoftware.smack.XMPPConnection; - -/** - * SmackExecutorThreadFactory creates daemon threads with a particular name. Note that you should - * not use anonymous inner classes for thread factories in order to prevent threads from leaking. - */ -public final class SmackExecutorThreadFactory implements ThreadFactory { - private final int connectionCounterValue; - private final String name; - private int count = 0; - - public SmackExecutorThreadFactory(XMPPConnection connection, String name) { - this.connectionCounterValue = connection.getConnectionCounter(); - this.name = name; - } - - @Override - public Thread newThread(Runnable runnable) { - Thread thread = new Thread(runnable); - thread.setName("Smack-" + name + ' ' + count++ + " (" + connectionCounterValue + ")"); - thread.setDaemon(true); - return thread; - } -} diff --git a/smack-experimental/src/main/java/org/jivesoftware/smackx/carbons/CarbonManager.java b/smack-experimental/src/main/java/org/jivesoftware/smackx/carbons/CarbonManager.java index a0166c05f..ddbb72ef5 100644 --- a/smack-experimental/src/main/java/org/jivesoftware/smackx/carbons/CarbonManager.java +++ b/smack-experimental/src/main/java/org/jivesoftware/smackx/carbons/CarbonManager.java @@ -1,6 +1,6 @@ /** * - * Copyright 2013-2014 Georg Lukas, 2017 Florian Schmaus + * Copyright 2013-2014 Georg Lukas, 2017-2018 Florian Schmaus * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,6 +22,7 @@ import java.util.WeakHashMap; import java.util.concurrent.CopyOnWriteArraySet; import org.jivesoftware.smack.AbstractConnectionListener; +import org.jivesoftware.smack.AsyncButOrdered; import org.jivesoftware.smack.ConnectionCreationListener; import org.jivesoftware.smack.Manager; import org.jivesoftware.smack.SmackException; @@ -52,6 +53,7 @@ import org.jivesoftware.smackx.carbons.packet.CarbonExtension.Private; import org.jivesoftware.smackx.disco.ServiceDiscoveryManager; import org.jivesoftware.smackx.forward.packet.Forwarded; +import org.jxmpp.jid.BareJid; import org.jxmpp.jid.EntityFullJid; /** @@ -100,6 +102,8 @@ public final class CarbonManager extends Manager { private final StanzaListener carbonsListener; + private final AsyncButOrdered carbonsListenerAsyncButOrdered = new AsyncButOrdered<>(); + private CarbonManager(XMPPConnection connection) { super(connection); ServiceDiscoveryManager sdm = ServiceDiscoveryManager.getInstanceFor(connection); @@ -113,9 +117,16 @@ public final class CarbonManager extends Manager { final Direction direction = carbonExtension.getDirection(); final Forwarded forwarded = carbonExtension.getForwarded(); final Message carbonCopy = (Message) forwarded.getForwardedStanza(); - for (CarbonCopyReceivedListener listener : listeners) { - listener.onCarbonCopyReceived(direction, carbonCopy, wrappingMessage); - } + final BareJid from = carbonCopy.getFrom().asBareJid(); + + carbonsListenerAsyncButOrdered.performAsyncButOrdered(from, new Runnable() { + @Override + public void run() { + for (CarbonCopyReceivedListener listener : listeners) { + listener.onCarbonCopyReceived(direction, carbonCopy, wrappingMessage); + } + } + }); } }; diff --git a/smack-extensions/src/main/java/org/jivesoftware/smack/chat2/ChatManager.java b/smack-extensions/src/main/java/org/jivesoftware/smack/chat2/ChatManager.java index 61fc61384..7febfc016 100644 --- a/smack-extensions/src/main/java/org/jivesoftware/smack/chat2/ChatManager.java +++ b/smack-extensions/src/main/java/org/jivesoftware/smack/chat2/ChatManager.java @@ -22,6 +22,7 @@ import java.util.WeakHashMap; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArraySet; +import org.jivesoftware.smack.AsyncButOrdered; import org.jivesoftware.smack.Manager; import org.jivesoftware.smack.SmackException.NotConnectedException; import org.jivesoftware.smack.StanzaListener; @@ -91,6 +92,8 @@ public final class ChatManager extends Manager { private final Set outgoingListeners = new CopyOnWriteArraySet<>(); + private final AsyncButOrdered asyncButOrdered = new AsyncButOrdered<>(); + private boolean xhtmlIm; private ChatManager(final XMPPConnection connection) { @@ -98,7 +101,7 @@ public final class ChatManager extends Manager { connection.addSyncStanzaListener(new StanzaListener() { @Override public void processStanza(Stanza stanza) { - Message message = (Message) stanza; + final Message message = (Message) stanza; if (!shouldAcceptMessage(message)) { return; } @@ -109,9 +112,15 @@ public final class ChatManager extends Manager { final Chat chat = chatWith(bareFrom); chat.lockedResource = fullFrom; - for (IncomingChatMessageListener listener : incomingListeners) { - listener.newIncomingMessage(bareFrom, message, chat); - } + asyncButOrdered.performAsyncButOrdered(chat, new Runnable() { + @Override + public void run() { + for (IncomingChatMessageListener listener : incomingListeners) { + listener.newIncomingMessage(bareFrom, message, chat); + } + } + }); + } }, INCOMING_MESSAGE_FILTER); diff --git a/smack-extensions/src/main/java/org/jivesoftware/smackx/chatstates/ChatStateManager.java b/smack-extensions/src/main/java/org/jivesoftware/smackx/chatstates/ChatStateManager.java index 2b92f0055..e341e6a37 100644 --- a/smack-extensions/src/main/java/org/jivesoftware/smackx/chatstates/ChatStateManager.java +++ b/smack-extensions/src/main/java/org/jivesoftware/smackx/chatstates/ChatStateManager.java @@ -26,6 +26,7 @@ import java.util.WeakHashMap; import java.util.logging.Level; import java.util.logging.Logger; +import org.jivesoftware.smack.AsyncButOrdered; import org.jivesoftware.smack.Manager; import org.jivesoftware.smack.SmackException.NotConnectedException; import org.jivesoftware.smack.StanzaListener; @@ -85,6 +86,8 @@ public final class ChatStateManager extends Manager { */ private final Map chatStates = new WeakHashMap<>(); + private final AsyncButOrdered asyncButOrdered = new AsyncButOrdered<>(); + /** * Returns the ChatStateManager related to the XMPPConnection and it will create one if it does * not yet exist. @@ -129,15 +132,15 @@ public final class ChatStateManager extends Manager { } }); - connection.addAsyncStanzaListener(new StanzaListener() { + connection.addSyncStanzaListener(new StanzaListener() { @Override public void processStanza(Stanza stanza) { - Message message = (Message) stanza; + final Message message = (Message) stanza; EntityFullJid fullFrom = message.getFrom().asEntityFullJidIfPossible(); EntityBareJid bareFrom = fullFrom.asEntityBareJid(); - Chat chat = ChatManager.getInstanceFor(connection()).chatWith(bareFrom); + final Chat chat = ChatManager.getInstanceFor(connection()).chatWith(bareFrom); ExtensionElement extension = message.getExtension(NAMESPACE); String chatStateElementName = extension.getElementName(); @@ -149,6 +152,7 @@ public final class ChatStateManager extends Manager { LOGGER.log(Level.WARNING, "Invalid chat state element name: " + chatStateElementName, ex); return; } + final ChatState finalState = state; List listeners; synchronized (chatStateListeners) { @@ -156,9 +160,15 @@ public final class ChatStateManager extends Manager { listeners.addAll(chatStateListeners); } - for (ChatStateListener listener : listeners) { - listener.stateChanged(chat, state, message); - } + final List finalListeners = listeners; + asyncButOrdered.performAsyncButOrdered(chat, new Runnable() { + @Override + public void run() { + for (ChatStateListener listener : finalListeners) { + listener.stateChanged(chat, finalState, message); + } + } + }); } }, INCOMING_CHAT_STATE_FILTER); diff --git a/smack-extensions/src/main/java/org/jivesoftware/smackx/muc/MultiUserChat.java b/smack-extensions/src/main/java/org/jivesoftware/smackx/muc/MultiUserChat.java index c51eab7b4..05cd2371b 100644 --- a/smack-extensions/src/main/java/org/jivesoftware/smackx/muc/MultiUserChat.java +++ b/smack-extensions/src/main/java/org/jivesoftware/smackx/muc/MultiUserChat.java @@ -27,6 +27,7 @@ import java.util.concurrent.CopyOnWriteArraySet; import java.util.logging.Level; import java.util.logging.Logger; +import org.jivesoftware.smack.AsyncButOrdered; import org.jivesoftware.smack.MessageListener; import org.jivesoftware.smack.PresenceListener; import org.jivesoftware.smack.SmackException; @@ -141,6 +142,8 @@ public class MultiUserChat { private final StanzaListener presenceListener; private final StanzaListener subjectListener; + private static final AsyncButOrdered asyncButOrdered = new AsyncButOrdered<>(); + private static final StanzaFilter DECLINE_FILTER = new AndFilter(MessageTypeFilter.NORMAL, new StanzaExtensionFilter(MUCUser.ELEMENT, MUCUser.NAMESPACE)); private final StanzaListener declinesListener; @@ -161,10 +164,16 @@ public class MultiUserChat { messageListener = new StanzaListener() { @Override public void processStanza(Stanza packet) throws NotConnectedException { - Message message = (Message) packet; - for (MessageListener listener : messageListeners) { - listener.processMessage(message); - } + final Message message = (Message) packet; + + asyncButOrdered.performAsyncButOrdered(MultiUserChat.this, new Runnable() { + @Override + public void run() { + for (MessageListener listener : messageListeners) { + listener.processMessage(message); + } + } + }); } }; @@ -172,84 +181,96 @@ public class MultiUserChat { subjectListener = new StanzaListener() { @Override public void processStanza(Stanza packet) { - Message msg = (Message) packet; - EntityFullJid from = msg.getFrom().asEntityFullJidIfPossible(); + final Message msg = (Message) packet; + final EntityFullJid from = msg.getFrom().asEntityFullJidIfPossible(); // Update the room subject subject = msg.getSubject(); - // Fire event for subject updated listeners - for (SubjectUpdatedListener listener : subjectUpdatedListeners) { - listener.subjectUpdated(subject, from); - } + + asyncButOrdered.performAsyncButOrdered(MultiUserChat.this, new Runnable() { + @Override + public void run() { + // Fire event for subject updated listeners + for (SubjectUpdatedListener listener : subjectUpdatedListeners) { + listener.subjectUpdated(msg.getSubject(), from); + } + } + }); } }; // Create a listener for all presence updates. presenceListener = new StanzaListener() { @Override - public void processStanza(Stanza packet) { - Presence presence = (Presence) packet; + public void processStanza(final Stanza packet) { + final Presence presence = (Presence) packet; final EntityFullJid from = presence.getFrom().asEntityFullJidIfPossible(); if (from == null) { LOGGER.warning("Presence not from a full JID: " + presence.getFrom()); return; } - String myRoomJID = MultiUserChat.this.room + "/" + nickname; - boolean isUserStatusModification = presence.getFrom().equals(myRoomJID); - switch (presence.getType()) { - case available: - Presence oldPresence = occupantsMap.put(from, presence); - if (oldPresence != null) { - // Get the previous occupant's affiliation & role - MUCUser mucExtension = MUCUser.from(oldPresence); - MUCAffiliation oldAffiliation = mucExtension.getItem().getAffiliation(); - MUCRole oldRole = mucExtension.getItem().getRole(); - // Get the new occupant's affiliation & role - mucExtension = MUCUser.from(packet); - MUCAffiliation newAffiliation = mucExtension.getItem().getAffiliation(); - MUCRole newRole = mucExtension.getItem().getRole(); - // Fire role modification events - checkRoleModifications(oldRole, newRole, isUserStatusModification, from); - // Fire affiliation modification events - checkAffiliationModifications( - oldAffiliation, - newAffiliation, - isUserStatusModification, - from); - } - else { - // A new occupant has joined the room - if (!isUserStatusModification) { - for (ParticipantStatusListener listener : participantStatusListeners) { - listener.joined(from); + final String myRoomJID = MultiUserChat.this.room + "/" + nickname; + final boolean isUserStatusModification = presence.getFrom().equals(myRoomJID); + + asyncButOrdered.performAsyncButOrdered(MultiUserChat.this, new Runnable() { + @Override + public void run() { + switch (presence.getType()) { + case available: + Presence oldPresence = occupantsMap.put(from, presence); + if (oldPresence != null) { + // Get the previous occupant's affiliation & role + MUCUser mucExtension = MUCUser.from(oldPresence); + MUCAffiliation oldAffiliation = mucExtension.getItem().getAffiliation(); + MUCRole oldRole = mucExtension.getItem().getRole(); + // Get the new occupant's affiliation & role + mucExtension = MUCUser.from(packet); + MUCAffiliation newAffiliation = mucExtension.getItem().getAffiliation(); + MUCRole newRole = mucExtension.getItem().getRole(); + // Fire role modification events + checkRoleModifications(oldRole, newRole, isUserStatusModification, from); + // Fire affiliation modification events + checkAffiliationModifications( + oldAffiliation, + newAffiliation, + isUserStatusModification, + from); } + else { + // A new occupant has joined the room + if (!isUserStatusModification) { + for (ParticipantStatusListener listener : participantStatusListeners) { + listener.joined(from); + } + } + } + break; + case unavailable: + occupantsMap.remove(from); + MUCUser mucUser = MUCUser.from(packet); + if (mucUser != null && mucUser.hasStatus()) { + // Fire events according to the received presence code + checkPresenceCode( + mucUser.getStatus(), + presence.getFrom().equals(myRoomJID), + mucUser, + from); + } else { + // An occupant has left the room + if (!isUserStatusModification) { + for (ParticipantStatusListener listener : participantStatusListeners) { + listener.left(from); + } + } + } + break; + default: + break; + } + for (PresenceListener listener : presenceListeners) { + listener.processPresence(presence); } } - break; - case unavailable: - occupantsMap.remove(from); - MUCUser mucUser = MUCUser.from(packet); - if (mucUser != null && mucUser.hasStatus()) { - // Fire events according to the received presence code - checkPresenceCode( - mucUser.getStatus(), - presence.getFrom().equals(myRoomJID), - mucUser, - from); - } else { - // An occupant has left the room - if (!isUserStatusModification) { - for (ParticipantStatusListener listener : participantStatusListeners) { - listener.left(from); - } - } - } - break; - default: - break; - } - for (PresenceListener listener : presenceListeners) { - listener.processPresence(presence); - } + }); } }; diff --git a/smack-extensions/src/main/java/org/jivesoftware/smackx/pep/PEPManager.java b/smack-extensions/src/main/java/org/jivesoftware/smackx/pep/PEPManager.java index 8462126b9..bcade8cc3 100644 --- a/smack-extensions/src/main/java/org/jivesoftware/smackx/pep/PEPManager.java +++ b/smack-extensions/src/main/java/org/jivesoftware/smackx/pep/PEPManager.java @@ -1,6 +1,6 @@ /** * - * Copyright 2003-2007 Jive Software, 2015 Florian Schmaus + * Copyright 2003-2007 Jive Software, 2015-2018 Florian Schmaus * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,6 +22,7 @@ import java.util.Set; import java.util.WeakHashMap; import java.util.concurrent.CopyOnWriteArraySet; +import org.jivesoftware.smack.AsyncButOrdered; import org.jivesoftware.smack.Manager; import org.jivesoftware.smack.SmackException.NoResponseException; import org.jivesoftware.smack.SmackException.NotConnectedException; @@ -86,6 +87,8 @@ public final class PEPManager extends Manager { private final Set pepListeners = new CopyOnWriteArraySet<>(); + private final AsyncButOrdered asyncButOrdered = new AsyncButOrdered<>(); + /** * Creates a new PEP exchange manager. * @@ -96,14 +99,19 @@ public final class PEPManager extends Manager { StanzaListener packetListener = new StanzaListener() { @Override public void processStanza(Stanza stanza) { - Message message = (Message) stanza; - EventElement event = EventElement.from(stanza); + final Message message = (Message) stanza; + final EventElement event = EventElement.from(stanza); assert (event != null); - EntityBareJid from = message.getFrom().asEntityBareJidIfPossible(); + final EntityBareJid from = message.getFrom().asEntityBareJidIfPossible(); assert (from != null); - for (PEPListener listener : pepListeners) { - listener.eventReceived(from, event, message); - } + asyncButOrdered.performAsyncButOrdered(from, new Runnable() { + @Override + public void run() { + for (PEPListener listener : pepListeners) { + listener.eventReceived(from, event, message); + } + } + }); } }; // TODO Add filter to check if from supports PubSub as per xep163 2 2.4 diff --git a/smack-extensions/src/main/java/org/jivesoftware/smackx/pubsub/Node.java b/smack-extensions/src/main/java/org/jivesoftware/smackx/pubsub/Node.java index 398a4b4e4..b230b89b1 100644 --- a/smack-extensions/src/main/java/org/jivesoftware/smackx/pubsub/Node.java +++ b/smack-extensions/src/main/java/org/jivesoftware/smackx/pubsub/Node.java @@ -521,6 +521,7 @@ public abstract class Node { EventContentFilter deleteItem = new EventContentFilter(EventElementType.items.toString(), "retract"); EventContentFilter purge = new EventContentFilter(EventElementType.purge.toString()); + // TODO: Use AsyncButOrdered (with Node as Key?) pubSubManager.getConnection().addSyncStanzaListener(delListener, new OrFilter(deleteItem, purge)); } @@ -588,6 +589,7 @@ public abstract class Node { EventElement event = packet.getExtension("event", PubSubNamespace.EVENT.getXmlns()); ItemsExtension itemsElem = (ItemsExtension) event.getEvent(); ItemPublishEvent eventItems = new ItemPublishEvent(itemsElem.getNode(), itemsElem.getItems(), getSubscriptionIds(packet), DelayInformationManager.getDelayTimestamp(packet)); + // TODO: Use AsyncButOrdered (with Node as Key?) listener.handlePublishedItems(eventItems); } } @@ -650,6 +652,7 @@ public abstract class Node { EventElement event = packet.getExtension("event", PubSubNamespace.EVENT.getXmlns()); ConfigurationEvent config = (ConfigurationEvent) event.getEvent(); + // TODO: Use AsyncButOrdered (with Node as Key?) listener.handleNodeConfiguration(config); } } diff --git a/smack-im/src/main/java/org/jivesoftware/smack/chat/ChatManager.java b/smack-im/src/main/java/org/jivesoftware/smack/chat/ChatManager.java index d258c0f82..65373c284 100644 --- a/smack-im/src/main/java/org/jivesoftware/smack/chat/ChatManager.java +++ b/smack-im/src/main/java/org/jivesoftware/smack/chat/ChatManager.java @@ -169,6 +169,8 @@ public final class ChatManager extends Manager{ // The chat could not be created, abort here if (chat == null) return; + + // TODO: Use AsyncButOrdered (with Chat as Key?) deliverMessage(chat, message); } }, packetFilter); diff --git a/smack-im/src/main/java/org/jivesoftware/smack/roster/Roster.java b/smack-im/src/main/java/org/jivesoftware/smack/roster/Roster.java index 4693da83b..7163dcbaa 100644 --- a/smack-im/src/main/java/org/jivesoftware/smack/roster/Roster.java +++ b/smack-im/src/main/java/org/jivesoftware/smack/roster/Roster.java @@ -34,6 +34,7 @@ import java.util.logging.Level; import java.util.logging.Logger; import org.jivesoftware.smack.AbstractConnectionListener; +import org.jivesoftware.smack.AsyncButOrdered; import org.jivesoftware.smack.ConnectionCreationListener; import org.jivesoftware.smack.Manager; import org.jivesoftware.smack.SmackException; @@ -234,6 +235,8 @@ public final class Roster extends Manager { defaultSubscriptionMode = subscriptionMode; } + private final AsyncButOrdered asyncButOrdered = new AsyncButOrdered<>(); + /** * Creates a new roster. * @@ -1466,120 +1469,125 @@ public final class Roster extends Manager { if (!isLoaded() && rosterLoadedAtLogin) { LOGGER.warning("Roster not loaded while processing " + packet); } - Presence presence = (Presence) packet; - Jid from = presence.getFrom(); - Resourcepart fromResource = Resourcepart.EMPTY; - BareJid bareFrom = null; - FullJid fullFrom = null; - if (from != null) { - fromResource = from.getResourceOrNull(); - if (fromResource == null) { - fromResource = Resourcepart.EMPTY; - bareFrom = from.asBareJid(); - } - else { - fullFrom = from.asFullJidIfPossible(); - // We know that this must be a full JID in this case. - assert (fullFrom != null); - } - } + final Presence presence = (Presence) packet; + final Jid from = presence.getFrom(); - BareJid key = from != null ? from.asBareJid() : null; - Map userPresences; + final BareJid key = from != null ? from.asBareJid() : null; - // If an "available" presence, add it to the presence map. Each presence - // map will hold for a particular user a map with the presence - // packets saved for each resource. - switch (presence.getType()) { - case available: - // Get the user presence map - userPresences = getOrCreatePresencesInternal(key); - // See if an offline presence was being stored in the map. If so, remove - // it since we now have an online presence. - userPresences.remove(Resourcepart.EMPTY); - // Add the new presence, using the resources as a key. - userPresences.put(fromResource, presence); - // If the user is in the roster, fire an event. - if (contains(key)) { - fireRosterPresenceEvent(presence); - } - for (PresenceEventListener presenceEventListener : presenceEventListeners) { - presenceEventListener.presenceAvailable(fullFrom, presence); - } - break; - // If an "unavailable" packet. - case unavailable: - // If no resource, this is likely an offline presence as part of - // a roster presence flood. In that case, we store it. - userPresences = getOrCreatePresencesInternal(key); - if (from.hasNoResource()) { - // Get the user presence map - userPresences.put(Resourcepart.EMPTY, presence); - } - // Otherwise, this is a normal offline presence. - else { - // Store the offline presence, as it may include extra information - // such as the user being on vacation. - userPresences.put(fromResource, presence); - } - // If the user is in the roster, fire an event. - if (contains(key)) { - fireRosterPresenceEvent(presence); - } - - // Ensure that 'from' is a full JID before invoking the presence unavailable - // listeners. Usually unavailable presences always have a resourcepart, i.e. are - // full JIDs, but RFC 6121 § 4.5.4 has an implementation note that unavailable - // presences from a bare JID SHOULD be treated as applying to all resources. I don't - // think any client or server ever implemented that, I do think that this - // implementation note is a terrible idea since it adds another corner case in - // client code, instead of just having the invariant - // "unavailable presences are always from the full JID". - if (fullFrom != null) { - for (PresenceEventListener presenceEventListener : presenceEventListeners) { - presenceEventListener.presenceUnavailable(fullFrom, presence); + asyncButOrdered.performAsyncButOrdered(key, new Runnable() { + @Override + public void run() { + Resourcepart fromResource = Resourcepart.EMPTY; + BareJid bareFrom = null; + FullJid fullFrom = null; + if (from != null) { + fromResource = from.getResourceOrNull(); + if (fromResource == null) { + fromResource = Resourcepart.EMPTY; + bareFrom = from.asBareJid(); + } + else { + fullFrom = from.asFullJidIfPossible(); + // We know that this must be a full JID in this case. + assert (fullFrom != null); + } } - } else { - LOGGER.fine("Unavailable presence from bare JID: " + presence); - } + Map userPresences; + // If an "available" presence, add it to the presence map. Each presence + // map will hold for a particular user a map with the presence + // packets saved for each resource. + switch (presence.getType()) { + case available: + // Get the user presence map + userPresences = getOrCreatePresencesInternal(key); + // See if an offline presence was being stored in the map. If so, remove + // it since we now have an online presence. + userPresences.remove(Resourcepart.EMPTY); + // Add the new presence, using the resources as a key. + userPresences.put(fromResource, presence); + // If the user is in the roster, fire an event. + if (contains(key)) { + fireRosterPresenceEvent(presence); + } + for (PresenceEventListener presenceEventListener : presenceEventListeners) { + presenceEventListener.presenceAvailable(fullFrom, presence); + } + break; + // If an "unavailable" packet. + case unavailable: + // If no resource, this is likely an offline presence as part of + // a roster presence flood. In that case, we store it. + userPresences = getOrCreatePresencesInternal(key); + if (from.hasNoResource()) { + // Get the user presence map + userPresences.put(Resourcepart.EMPTY, presence); + } + // Otherwise, this is a normal offline presence. + else { + // Store the offline presence, as it may include extra information + // such as the user being on vacation. + userPresences.put(fromResource, presence); + } + // If the user is in the roster, fire an event. + if (contains(key)) { + fireRosterPresenceEvent(presence); + } - break; - // Error presence packets from a bare JID mean we invalidate all existing - // presence info for the user. - case error: - // No need to act on error presences send without from, i.e. - // directly send from the users XMPP service, or where the from - // address is not a bare JID - if (from == null || !from.isEntityBareJid()) { - break; - } - userPresences = getOrCreatePresencesInternal(key); - // Any other presence data is invalidated by the error packet. - userPresences.clear(); + // Ensure that 'from' is a full JID before invoking the presence unavailable + // listeners. Usually unavailable presences always have a resourcepart, i.e. are + // full JIDs, but RFC 6121 § 4.5.4 has an implementation note that unavailable + // presences from a bare JID SHOULD be treated as applying to all resources. I don't + // think any client or server ever implemented that, I do think that this + // implementation note is a terrible idea since it adds another corner case in + // client code, instead of just having the invariant + // "unavailable presences are always from the full JID". + if (fullFrom != null) { + for (PresenceEventListener presenceEventListener : presenceEventListeners) { + presenceEventListener.presenceUnavailable(fullFrom, presence); + } + } else { + LOGGER.fine("Unavailable presence from bare JID: " + presence); + } - // Set the new presence using the empty resource as a key. - userPresences.put(Resourcepart.EMPTY, presence); - // If the user is in the roster, fire an event. - if (contains(key)) { - fireRosterPresenceEvent(presence); + break; + // Error presence packets from a bare JID mean we invalidate all existing + // presence info for the user. + case error: + // No need to act on error presences send without from, i.e. + // directly send from the users XMPP service, or where the from + // address is not a bare JID + if (from == null || !from.isEntityBareJid()) { + break; + } + userPresences = getOrCreatePresencesInternal(key); + // Any other presence data is invalidated by the error packet. + userPresences.clear(); + + // Set the new presence using the empty resource as a key. + userPresences.put(Resourcepart.EMPTY, presence); + // If the user is in the roster, fire an event. + if (contains(key)) { + fireRosterPresenceEvent(presence); + } + for (PresenceEventListener presenceEventListener : presenceEventListeners) { + presenceEventListener.presenceError(from, presence); + } + break; + case subscribed: + for (PresenceEventListener presenceEventListener : presenceEventListeners) { + presenceEventListener.presenceSubscribed(bareFrom, presence); + } + break; + case unsubscribed: + for (PresenceEventListener presenceEventListener : presenceEventListeners) { + presenceEventListener.presenceUnsubscribed(bareFrom, presence); + } + break; + default: + break; + } } - for (PresenceEventListener presenceEventListener : presenceEventListeners) { - presenceEventListener.presenceError(from, presence); - } - break; - case subscribed: - for (PresenceEventListener presenceEventListener : presenceEventListeners) { - presenceEventListener.presenceSubscribed(bareFrom, presence); - } - break; - case unsubscribed: - for (PresenceEventListener presenceEventListener : presenceEventListeners) { - presenceEventListener.presenceUnsubscribed(bareFrom, presence); - } - break; - default: - break; - } + }); } }