Inroduce AsyncButOrdered

This commit is contained in:
Florian Schmaus 2018-04-08 21:21:50 +02:00
parent 1acfd872a7
commit 476fdf99a1
11 changed files with 414 additions and 272 deletions

View File

@ -29,13 +29,12 @@ import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
@ -86,7 +85,6 @@ import org.jivesoftware.smack.util.DNSUtil;
import org.jivesoftware.smack.util.Objects; import org.jivesoftware.smack.util.Objects;
import org.jivesoftware.smack.util.PacketParserUtils; import org.jivesoftware.smack.util.PacketParserUtils;
import org.jivesoftware.smack.util.ParserUtils; import org.jivesoftware.smack.util.ParserUtils;
import org.jivesoftware.smack.util.SmackExecutorThreadFactory;
import org.jivesoftware.smack.util.StringUtils; import org.jivesoftware.smack.util.StringUtils;
import org.jivesoftware.smack.util.dns.HostAddress; import org.jivesoftware.smack.util.dns.HostAddress;
@ -266,14 +264,7 @@ public abstract class AbstractXMPPConnection implements XMPPConnection {
} }
}); });
/** private static final AsyncButOrdered<AbstractXMPPConnection> ASYNC_BUT_ORDERED = new AsyncButOrdered<>();
* A executor service used to invoke the callbacks of synchronous stanza listeners. We use a executor service to
* decouple incoming stanza processing from callback invocation. It is important that order of callback invocation
* is the same as the order of the incoming stanzas. Therefore we use a <i>single</i> threaded executor service.
*/
private final ExecutorService singleThreadedExecutorService = new ThreadPoolExecutor(0, 1, 30L,
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
new SmackExecutorThreadFactory(this, "Single Threaded Executor"));
/** /**
* The used host to establish the connection to * The used host to establish the connection to
@ -1112,10 +1103,10 @@ public abstract class AbstractXMPPConnection implements XMPPConnection {
LOGGER.log(Level.WARNING, "Exception while sending error IQ to unkown IQ request", e); LOGGER.log(Level.WARNING, "Exception while sending error IQ to unkown IQ request", e);
} }
} else { } else {
ExecutorService executorService = null; Executor executorService = null;
switch (iqRequestHandler.getMode()) { switch (iqRequestHandler.getMode()) {
case sync: case sync:
executorService = singleThreadedExecutorService; executorService = ASYNC_BUT_ORDERED.asExecutorFor(this);
break; break;
case async: case async:
executorService = CACHED_EXECUTOR_SERVICE; executorService = CACHED_EXECUTOR_SERVICE;
@ -1192,7 +1183,7 @@ public abstract class AbstractXMPPConnection implements XMPPConnection {
// Decouple incoming stanza processing from listener invocation. Unlike async listeners, this uses a single // Decouple incoming stanza processing from listener invocation. Unlike async listeners, this uses a single
// threaded executor service and therefore keeps the order. // threaded executor service and therefore keeps the order.
singleThreadedExecutorService.execute(new Runnable() { ASYNC_BUT_ORDERED.performAsyncButOrdered(this, new Runnable() {
@Override @Override
public void run() { public void run() {
for (StanzaListener listener : listenersToNotify) { for (StanzaListener listener : listenersToNotify) {
@ -1207,7 +1198,6 @@ public abstract class AbstractXMPPConnection implements XMPPConnection {
} }
} }
}); });
} }
/** /**
@ -1350,24 +1340,6 @@ public abstract class AbstractXMPPConnection implements XMPPConnection {
return this.fromMode; return this.fromMode;
} }
@Override
protected void finalize() throws Throwable {
LOGGER.fine("finalizing " + this + ": Shutting down executor services");
try {
// It's usually not a good idea to rely on finalize. But this is the easiest way to
// avoid the "Smack Listener Processor" leaking. The thread(s) of the executor have a
// reference to their ExecutorService which prevents the ExecutorService from being
// gc'ed. It is possible that the XMPPConnection instance is gc'ed while the
// listenerExecutor ExecutorService call not be gc'ed until it got shut down.
singleThreadedExecutorService.shutdownNow();
} catch (Throwable t) {
LOGGER.log(Level.WARNING, "finalize() threw throwable", t);
}
finally {
super.finalize();
}
}
protected final void parseFeatures(XmlPullParser parser) throws Exception { protected final void parseFeatures(XmlPullParser parser) throws Exception {
streamFeatures.clear(); streamFeatures.clear();
final int initialDepth = parser.getDepth(); final int initialDepth = parser.getDepth();

View File

@ -0,0 +1,142 @@
/**
*
* Copyright 2018 Florian Schmaus
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jivesoftware.smack;
import java.util.Map;
import java.util.Queue;
import java.util.WeakHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
/**
* Helper class to perform an operation asynchronous but keeping the order in respect to a given key.
* <p>
* A typical use pattern for this helper class consists of callbacks for an abstract entity where the order of callbacks
* matters, which eventually call user code in form of listeners. Since the order the callbacks matters, you need to use
* synchronous connection listeners. But if those listeners would invoke the user provided listeners, and if those user
* provided listeners would take a long time to complete, or even worse, block, then Smack's total progress is stalled,
* since synchronous connection listeners are invoked from the main event loop.
* </p>
* <p>
* It is common for those situations that the order of callbacks is not globally important, but only important in
* respect to an particular entity. Take chat state notifications (CSN) for example: Assume there are two contacts which
* send you CSNs. If a contact sends you first 'active' and then 'inactive, it is crucial that first the listener is
* called with 'active' and afterwards with 'inactive'. But if there is another contact is sending 'composing' followed
* by 'paused', then it is also important that the listeners are invoked in the correct order, but the order in which
* the listeners for those two contacts are invoked does not matter.
* </p>
* <p>
* Using this helper class, one would call {@link #performAsyncButOrdered(Object, Runnable)} which the remote contacts
* JID as first argument and a {@link Runnable} invoking the user listeners as second. This class guarantees that
* runnables of subsequent invocations are always executed after the runnables of previous invocations using the same
* key.
* </p>
*
* @param <K> the type of the key
* @since 4.3
*/
public class AsyncButOrdered<K> {
private final Map<K, Queue<Runnable>> pendingRunnables = new WeakHashMap<>();
private final Map<K, Boolean> threadActiveMap = new WeakHashMap<>();
/**
* Invoke the given {@link Runnable} asynchronous but ordered in respect to the given key.
*
* @param key the key deriving the order
* @param runnable the {@link Runnable} to run
* @return true if a new thread was created
*/
public boolean performAsyncButOrdered(K key, Runnable runnable) {
Queue<Runnable> keyQueue;
synchronized (pendingRunnables) {
keyQueue = pendingRunnables.get(key);
if (keyQueue == null) {
keyQueue = new ConcurrentLinkedQueue<>();
pendingRunnables.put(key, keyQueue);
}
}
keyQueue.add(runnable);
boolean newHandler;
synchronized (threadActiveMap) {
Boolean threadActive = threadActiveMap.get(key);
if (threadActive == null) {
threadActive = false;
threadActiveMap.put(key, threadActive);
}
newHandler = !threadActive;
if (newHandler) {
Handler handler = new Handler(keyQueue, key);
threadActiveMap.put(key, true);
AbstractXMPPConnection.asyncGo(handler);
}
}
return newHandler;
}
public Executor asExecutorFor(final K key) {
return new Executor() {
@Override
public void execute(Runnable runnable) {
performAsyncButOrdered(key, runnable);
}
};
}
private class Handler implements Runnable {
private final Queue<Runnable> keyQueue;
private final K key;
Handler(Queue<Runnable> keyQueue, K key) {
this.keyQueue = keyQueue;
this.key = key;
}
@Override
public void run() {
mainloop:
while (true) {
Runnable runnable = null;
while ((runnable = keyQueue.poll()) != null) {
try {
runnable.run();
} catch (Throwable t) {
// The run() method threw, this handler thread is going to terminate because of that. Ensure we note
// that in the map.
synchronized (threadActiveMap) {
threadActiveMap.put(key, false);
}
throw t;
}
}
synchronized (threadActiveMap) {
// If the queue is empty, stop this handler, otherwise continue looping.
if (keyQueue.isEmpty()) {
threadActiveMap.put(key, false);
break mainloop;
}
}
}
}
}
}

View File

@ -1,44 +0,0 @@
/**
*
* Copyright 2014-2015 Florian Schmaus
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jivesoftware.smack.util;
import java.util.concurrent.ThreadFactory;
import org.jivesoftware.smack.XMPPConnection;
/**
* SmackExecutorThreadFactory creates daemon threads with a particular name. Note that you should
* not use anonymous inner classes for thread factories in order to prevent threads from leaking.
*/
public final class SmackExecutorThreadFactory implements ThreadFactory {
private final int connectionCounterValue;
private final String name;
private int count = 0;
public SmackExecutorThreadFactory(XMPPConnection connection, String name) {
this.connectionCounterValue = connection.getConnectionCounter();
this.name = name;
}
@Override
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable);
thread.setName("Smack-" + name + ' ' + count++ + " (" + connectionCounterValue + ")");
thread.setDaemon(true);
return thread;
}
}

View File

@ -1,6 +1,6 @@
/** /**
* *
* Copyright 2013-2014 Georg Lukas, 2017 Florian Schmaus * Copyright 2013-2014 Georg Lukas, 2017-2018 Florian Schmaus
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -22,6 +22,7 @@ import java.util.WeakHashMap;
import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.CopyOnWriteArraySet;
import org.jivesoftware.smack.AbstractConnectionListener; import org.jivesoftware.smack.AbstractConnectionListener;
import org.jivesoftware.smack.AsyncButOrdered;
import org.jivesoftware.smack.ConnectionCreationListener; import org.jivesoftware.smack.ConnectionCreationListener;
import org.jivesoftware.smack.Manager; import org.jivesoftware.smack.Manager;
import org.jivesoftware.smack.SmackException; import org.jivesoftware.smack.SmackException;
@ -52,6 +53,7 @@ import org.jivesoftware.smackx.carbons.packet.CarbonExtension.Private;
import org.jivesoftware.smackx.disco.ServiceDiscoveryManager; import org.jivesoftware.smackx.disco.ServiceDiscoveryManager;
import org.jivesoftware.smackx.forward.packet.Forwarded; import org.jivesoftware.smackx.forward.packet.Forwarded;
import org.jxmpp.jid.BareJid;
import org.jxmpp.jid.EntityFullJid; import org.jxmpp.jid.EntityFullJid;
/** /**
@ -100,6 +102,8 @@ public final class CarbonManager extends Manager {
private final StanzaListener carbonsListener; private final StanzaListener carbonsListener;
private final AsyncButOrdered<BareJid> carbonsListenerAsyncButOrdered = new AsyncButOrdered<>();
private CarbonManager(XMPPConnection connection) { private CarbonManager(XMPPConnection connection) {
super(connection); super(connection);
ServiceDiscoveryManager sdm = ServiceDiscoveryManager.getInstanceFor(connection); ServiceDiscoveryManager sdm = ServiceDiscoveryManager.getInstanceFor(connection);
@ -113,9 +117,16 @@ public final class CarbonManager extends Manager {
final Direction direction = carbonExtension.getDirection(); final Direction direction = carbonExtension.getDirection();
final Forwarded forwarded = carbonExtension.getForwarded(); final Forwarded forwarded = carbonExtension.getForwarded();
final Message carbonCopy = (Message) forwarded.getForwardedStanza(); final Message carbonCopy = (Message) forwarded.getForwardedStanza();
for (CarbonCopyReceivedListener listener : listeners) { final BareJid from = carbonCopy.getFrom().asBareJid();
listener.onCarbonCopyReceived(direction, carbonCopy, wrappingMessage);
} carbonsListenerAsyncButOrdered.performAsyncButOrdered(from, new Runnable() {
@Override
public void run() {
for (CarbonCopyReceivedListener listener : listeners) {
listener.onCarbonCopyReceived(direction, carbonCopy, wrappingMessage);
}
}
});
} }
}; };

View File

@ -22,6 +22,7 @@ import java.util.WeakHashMap;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.CopyOnWriteArraySet;
import org.jivesoftware.smack.AsyncButOrdered;
import org.jivesoftware.smack.Manager; import org.jivesoftware.smack.Manager;
import org.jivesoftware.smack.SmackException.NotConnectedException; import org.jivesoftware.smack.SmackException.NotConnectedException;
import org.jivesoftware.smack.StanzaListener; import org.jivesoftware.smack.StanzaListener;
@ -91,6 +92,8 @@ public final class ChatManager extends Manager {
private final Set<OutgoingChatMessageListener> outgoingListeners = new CopyOnWriteArraySet<>(); private final Set<OutgoingChatMessageListener> outgoingListeners = new CopyOnWriteArraySet<>();
private final AsyncButOrdered<Chat> asyncButOrdered = new AsyncButOrdered<>();
private boolean xhtmlIm; private boolean xhtmlIm;
private ChatManager(final XMPPConnection connection) { private ChatManager(final XMPPConnection connection) {
@ -98,7 +101,7 @@ public final class ChatManager extends Manager {
connection.addSyncStanzaListener(new StanzaListener() { connection.addSyncStanzaListener(new StanzaListener() {
@Override @Override
public void processStanza(Stanza stanza) { public void processStanza(Stanza stanza) {
Message message = (Message) stanza; final Message message = (Message) stanza;
if (!shouldAcceptMessage(message)) { if (!shouldAcceptMessage(message)) {
return; return;
} }
@ -109,9 +112,15 @@ public final class ChatManager extends Manager {
final Chat chat = chatWith(bareFrom); final Chat chat = chatWith(bareFrom);
chat.lockedResource = fullFrom; chat.lockedResource = fullFrom;
for (IncomingChatMessageListener listener : incomingListeners) { asyncButOrdered.performAsyncButOrdered(chat, new Runnable() {
listener.newIncomingMessage(bareFrom, message, chat); @Override
} public void run() {
for (IncomingChatMessageListener listener : incomingListeners) {
listener.newIncomingMessage(bareFrom, message, chat);
}
}
});
} }
}, INCOMING_MESSAGE_FILTER); }, INCOMING_MESSAGE_FILTER);

View File

@ -26,6 +26,7 @@ import java.util.WeakHashMap;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
import org.jivesoftware.smack.AsyncButOrdered;
import org.jivesoftware.smack.Manager; import org.jivesoftware.smack.Manager;
import org.jivesoftware.smack.SmackException.NotConnectedException; import org.jivesoftware.smack.SmackException.NotConnectedException;
import org.jivesoftware.smack.StanzaListener; import org.jivesoftware.smack.StanzaListener;
@ -85,6 +86,8 @@ public final class ChatStateManager extends Manager {
*/ */
private final Map<Chat, ChatState> chatStates = new WeakHashMap<>(); private final Map<Chat, ChatState> chatStates = new WeakHashMap<>();
private final AsyncButOrdered<Chat> asyncButOrdered = new AsyncButOrdered<>();
/** /**
* Returns the ChatStateManager related to the XMPPConnection and it will create one if it does * Returns the ChatStateManager related to the XMPPConnection and it will create one if it does
* not yet exist. * not yet exist.
@ -129,15 +132,15 @@ public final class ChatStateManager extends Manager {
} }
}); });
connection.addAsyncStanzaListener(new StanzaListener() { connection.addSyncStanzaListener(new StanzaListener() {
@Override @Override
public void processStanza(Stanza stanza) { public void processStanza(Stanza stanza) {
Message message = (Message) stanza; final Message message = (Message) stanza;
EntityFullJid fullFrom = message.getFrom().asEntityFullJidIfPossible(); EntityFullJid fullFrom = message.getFrom().asEntityFullJidIfPossible();
EntityBareJid bareFrom = fullFrom.asEntityBareJid(); EntityBareJid bareFrom = fullFrom.asEntityBareJid();
Chat chat = ChatManager.getInstanceFor(connection()).chatWith(bareFrom); final Chat chat = ChatManager.getInstanceFor(connection()).chatWith(bareFrom);
ExtensionElement extension = message.getExtension(NAMESPACE); ExtensionElement extension = message.getExtension(NAMESPACE);
String chatStateElementName = extension.getElementName(); String chatStateElementName = extension.getElementName();
@ -149,6 +152,7 @@ public final class ChatStateManager extends Manager {
LOGGER.log(Level.WARNING, "Invalid chat state element name: " + chatStateElementName, ex); LOGGER.log(Level.WARNING, "Invalid chat state element name: " + chatStateElementName, ex);
return; return;
} }
final ChatState finalState = state;
List<ChatStateListener> listeners; List<ChatStateListener> listeners;
synchronized (chatStateListeners) { synchronized (chatStateListeners) {
@ -156,9 +160,15 @@ public final class ChatStateManager extends Manager {
listeners.addAll(chatStateListeners); listeners.addAll(chatStateListeners);
} }
for (ChatStateListener listener : listeners) { final List<ChatStateListener> finalListeners = listeners;
listener.stateChanged(chat, state, message); asyncButOrdered.performAsyncButOrdered(chat, new Runnable() {
} @Override
public void run() {
for (ChatStateListener listener : finalListeners) {
listener.stateChanged(chat, finalState, message);
}
}
});
} }
}, INCOMING_CHAT_STATE_FILTER); }, INCOMING_CHAT_STATE_FILTER);

View File

@ -27,6 +27,7 @@ import java.util.concurrent.CopyOnWriteArraySet;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
import org.jivesoftware.smack.AsyncButOrdered;
import org.jivesoftware.smack.MessageListener; import org.jivesoftware.smack.MessageListener;
import org.jivesoftware.smack.PresenceListener; import org.jivesoftware.smack.PresenceListener;
import org.jivesoftware.smack.SmackException; import org.jivesoftware.smack.SmackException;
@ -141,6 +142,8 @@ public class MultiUserChat {
private final StanzaListener presenceListener; private final StanzaListener presenceListener;
private final StanzaListener subjectListener; private final StanzaListener subjectListener;
private static final AsyncButOrdered<MultiUserChat> asyncButOrdered = new AsyncButOrdered<>();
private static final StanzaFilter DECLINE_FILTER = new AndFilter(MessageTypeFilter.NORMAL, private static final StanzaFilter DECLINE_FILTER = new AndFilter(MessageTypeFilter.NORMAL,
new StanzaExtensionFilter(MUCUser.ELEMENT, MUCUser.NAMESPACE)); new StanzaExtensionFilter(MUCUser.ELEMENT, MUCUser.NAMESPACE));
private final StanzaListener declinesListener; private final StanzaListener declinesListener;
@ -161,10 +164,16 @@ public class MultiUserChat {
messageListener = new StanzaListener() { messageListener = new StanzaListener() {
@Override @Override
public void processStanza(Stanza packet) throws NotConnectedException { public void processStanza(Stanza packet) throws NotConnectedException {
Message message = (Message) packet; final Message message = (Message) packet;
for (MessageListener listener : messageListeners) {
listener.processMessage(message); asyncButOrdered.performAsyncButOrdered(MultiUserChat.this, new Runnable() {
} @Override
public void run() {
for (MessageListener listener : messageListeners) {
listener.processMessage(message);
}
}
});
} }
}; };
@ -172,84 +181,96 @@ public class MultiUserChat {
subjectListener = new StanzaListener() { subjectListener = new StanzaListener() {
@Override @Override
public void processStanza(Stanza packet) { public void processStanza(Stanza packet) {
Message msg = (Message) packet; final Message msg = (Message) packet;
EntityFullJid from = msg.getFrom().asEntityFullJidIfPossible(); final EntityFullJid from = msg.getFrom().asEntityFullJidIfPossible();
// Update the room subject // Update the room subject
subject = msg.getSubject(); subject = msg.getSubject();
// Fire event for subject updated listeners
for (SubjectUpdatedListener listener : subjectUpdatedListeners) { asyncButOrdered.performAsyncButOrdered(MultiUserChat.this, new Runnable() {
listener.subjectUpdated(subject, from); @Override
} public void run() {
// Fire event for subject updated listeners
for (SubjectUpdatedListener listener : subjectUpdatedListeners) {
listener.subjectUpdated(msg.getSubject(), from);
}
}
});
} }
}; };
// Create a listener for all presence updates. // Create a listener for all presence updates.
presenceListener = new StanzaListener() { presenceListener = new StanzaListener() {
@Override @Override
public void processStanza(Stanza packet) { public void processStanza(final Stanza packet) {
Presence presence = (Presence) packet; final Presence presence = (Presence) packet;
final EntityFullJid from = presence.getFrom().asEntityFullJidIfPossible(); final EntityFullJid from = presence.getFrom().asEntityFullJidIfPossible();
if (from == null) { if (from == null) {
LOGGER.warning("Presence not from a full JID: " + presence.getFrom()); LOGGER.warning("Presence not from a full JID: " + presence.getFrom());
return; return;
} }
String myRoomJID = MultiUserChat.this.room + "/" + nickname; final String myRoomJID = MultiUserChat.this.room + "/" + nickname;
boolean isUserStatusModification = presence.getFrom().equals(myRoomJID); final boolean isUserStatusModification = presence.getFrom().equals(myRoomJID);
switch (presence.getType()) {
case available: asyncButOrdered.performAsyncButOrdered(MultiUserChat.this, new Runnable() {
Presence oldPresence = occupantsMap.put(from, presence); @Override
if (oldPresence != null) { public void run() {
// Get the previous occupant's affiliation & role switch (presence.getType()) {
MUCUser mucExtension = MUCUser.from(oldPresence); case available:
MUCAffiliation oldAffiliation = mucExtension.getItem().getAffiliation(); Presence oldPresence = occupantsMap.put(from, presence);
MUCRole oldRole = mucExtension.getItem().getRole(); if (oldPresence != null) {
// Get the new occupant's affiliation & role // Get the previous occupant's affiliation & role
mucExtension = MUCUser.from(packet); MUCUser mucExtension = MUCUser.from(oldPresence);
MUCAffiliation newAffiliation = mucExtension.getItem().getAffiliation(); MUCAffiliation oldAffiliation = mucExtension.getItem().getAffiliation();
MUCRole newRole = mucExtension.getItem().getRole(); MUCRole oldRole = mucExtension.getItem().getRole();
// Fire role modification events // Get the new occupant's affiliation & role
checkRoleModifications(oldRole, newRole, isUserStatusModification, from); mucExtension = MUCUser.from(packet);
// Fire affiliation modification events MUCAffiliation newAffiliation = mucExtension.getItem().getAffiliation();
checkAffiliationModifications( MUCRole newRole = mucExtension.getItem().getRole();
oldAffiliation, // Fire role modification events
newAffiliation, checkRoleModifications(oldRole, newRole, isUserStatusModification, from);
isUserStatusModification, // Fire affiliation modification events
from); checkAffiliationModifications(
} oldAffiliation,
else { newAffiliation,
// A new occupant has joined the room isUserStatusModification,
if (!isUserStatusModification) { from);
for (ParticipantStatusListener listener : participantStatusListeners) {
listener.joined(from);
} }
else {
// A new occupant has joined the room
if (!isUserStatusModification) {
for (ParticipantStatusListener listener : participantStatusListeners) {
listener.joined(from);
}
}
}
break;
case unavailable:
occupantsMap.remove(from);
MUCUser mucUser = MUCUser.from(packet);
if (mucUser != null && mucUser.hasStatus()) {
// Fire events according to the received presence code
checkPresenceCode(
mucUser.getStatus(),
presence.getFrom().equals(myRoomJID),
mucUser,
from);
} else {
// An occupant has left the room
if (!isUserStatusModification) {
for (ParticipantStatusListener listener : participantStatusListeners) {
listener.left(from);
}
}
}
break;
default:
break;
}
for (PresenceListener listener : presenceListeners) {
listener.processPresence(presence);
} }
} }
break; });
case unavailable:
occupantsMap.remove(from);
MUCUser mucUser = MUCUser.from(packet);
if (mucUser != null && mucUser.hasStatus()) {
// Fire events according to the received presence code
checkPresenceCode(
mucUser.getStatus(),
presence.getFrom().equals(myRoomJID),
mucUser,
from);
} else {
// An occupant has left the room
if (!isUserStatusModification) {
for (ParticipantStatusListener listener : participantStatusListeners) {
listener.left(from);
}
}
}
break;
default:
break;
}
for (PresenceListener listener : presenceListeners) {
listener.processPresence(presence);
}
} }
}; };

View File

@ -1,6 +1,6 @@
/** /**
* *
* Copyright 2003-2007 Jive Software, 2015 Florian Schmaus * Copyright 2003-2007 Jive Software, 2015-2018 Florian Schmaus
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -22,6 +22,7 @@ import java.util.Set;
import java.util.WeakHashMap; import java.util.WeakHashMap;
import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.CopyOnWriteArraySet;
import org.jivesoftware.smack.AsyncButOrdered;
import org.jivesoftware.smack.Manager; import org.jivesoftware.smack.Manager;
import org.jivesoftware.smack.SmackException.NoResponseException; import org.jivesoftware.smack.SmackException.NoResponseException;
import org.jivesoftware.smack.SmackException.NotConnectedException; import org.jivesoftware.smack.SmackException.NotConnectedException;
@ -86,6 +87,8 @@ public final class PEPManager extends Manager {
private final Set<PEPListener> pepListeners = new CopyOnWriteArraySet<>(); private final Set<PEPListener> pepListeners = new CopyOnWriteArraySet<>();
private final AsyncButOrdered<EntityBareJid> asyncButOrdered = new AsyncButOrdered<>();
/** /**
* Creates a new PEP exchange manager. * Creates a new PEP exchange manager.
* *
@ -96,14 +99,19 @@ public final class PEPManager extends Manager {
StanzaListener packetListener = new StanzaListener() { StanzaListener packetListener = new StanzaListener() {
@Override @Override
public void processStanza(Stanza stanza) { public void processStanza(Stanza stanza) {
Message message = (Message) stanza; final Message message = (Message) stanza;
EventElement event = EventElement.from(stanza); final EventElement event = EventElement.from(stanza);
assert (event != null); assert (event != null);
EntityBareJid from = message.getFrom().asEntityBareJidIfPossible(); final EntityBareJid from = message.getFrom().asEntityBareJidIfPossible();
assert (from != null); assert (from != null);
for (PEPListener listener : pepListeners) { asyncButOrdered.performAsyncButOrdered(from, new Runnable() {
listener.eventReceived(from, event, message); @Override
} public void run() {
for (PEPListener listener : pepListeners) {
listener.eventReceived(from, event, message);
}
}
});
} }
}; };
// TODO Add filter to check if from supports PubSub as per xep163 2 2.4 // TODO Add filter to check if from supports PubSub as per xep163 2 2.4

View File

@ -521,6 +521,7 @@ public abstract class Node {
EventContentFilter deleteItem = new EventContentFilter(EventElementType.items.toString(), "retract"); EventContentFilter deleteItem = new EventContentFilter(EventElementType.items.toString(), "retract");
EventContentFilter purge = new EventContentFilter(EventElementType.purge.toString()); EventContentFilter purge = new EventContentFilter(EventElementType.purge.toString());
// TODO: Use AsyncButOrdered (with Node as Key?)
pubSubManager.getConnection().addSyncStanzaListener(delListener, new OrFilter(deleteItem, purge)); pubSubManager.getConnection().addSyncStanzaListener(delListener, new OrFilter(deleteItem, purge));
} }
@ -588,6 +589,7 @@ public abstract class Node {
EventElement event = packet.getExtension("event", PubSubNamespace.EVENT.getXmlns()); EventElement event = packet.getExtension("event", PubSubNamespace.EVENT.getXmlns());
ItemsExtension itemsElem = (ItemsExtension) event.getEvent(); ItemsExtension itemsElem = (ItemsExtension) event.getEvent();
ItemPublishEvent eventItems = new ItemPublishEvent(itemsElem.getNode(), itemsElem.getItems(), getSubscriptionIds(packet), DelayInformationManager.getDelayTimestamp(packet)); ItemPublishEvent eventItems = new ItemPublishEvent(itemsElem.getNode(), itemsElem.getItems(), getSubscriptionIds(packet), DelayInformationManager.getDelayTimestamp(packet));
// TODO: Use AsyncButOrdered (with Node as Key?)
listener.handlePublishedItems(eventItems); listener.handlePublishedItems(eventItems);
} }
} }
@ -650,6 +652,7 @@ public abstract class Node {
EventElement event = packet.getExtension("event", PubSubNamespace.EVENT.getXmlns()); EventElement event = packet.getExtension("event", PubSubNamespace.EVENT.getXmlns());
ConfigurationEvent config = (ConfigurationEvent) event.getEvent(); ConfigurationEvent config = (ConfigurationEvent) event.getEvent();
// TODO: Use AsyncButOrdered (with Node as Key?)
listener.handleNodeConfiguration(config); listener.handleNodeConfiguration(config);
} }
} }

View File

@ -169,6 +169,8 @@ public final class ChatManager extends Manager{
// The chat could not be created, abort here // The chat could not be created, abort here
if (chat == null) if (chat == null)
return; return;
// TODO: Use AsyncButOrdered (with Chat as Key?)
deliverMessage(chat, message); deliverMessage(chat, message);
} }
}, packetFilter); }, packetFilter);

View File

@ -34,6 +34,7 @@ import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
import org.jivesoftware.smack.AbstractConnectionListener; import org.jivesoftware.smack.AbstractConnectionListener;
import org.jivesoftware.smack.AsyncButOrdered;
import org.jivesoftware.smack.ConnectionCreationListener; import org.jivesoftware.smack.ConnectionCreationListener;
import org.jivesoftware.smack.Manager; import org.jivesoftware.smack.Manager;
import org.jivesoftware.smack.SmackException; import org.jivesoftware.smack.SmackException;
@ -234,6 +235,8 @@ public final class Roster extends Manager {
defaultSubscriptionMode = subscriptionMode; defaultSubscriptionMode = subscriptionMode;
} }
private final AsyncButOrdered<BareJid> asyncButOrdered = new AsyncButOrdered<>();
/** /**
* Creates a new roster. * Creates a new roster.
* *
@ -1466,120 +1469,125 @@ public final class Roster extends Manager {
if (!isLoaded() && rosterLoadedAtLogin) { if (!isLoaded() && rosterLoadedAtLogin) {
LOGGER.warning("Roster not loaded while processing " + packet); LOGGER.warning("Roster not loaded while processing " + packet);
} }
Presence presence = (Presence) packet; final Presence presence = (Presence) packet;
Jid from = presence.getFrom(); final Jid from = presence.getFrom();
Resourcepart fromResource = Resourcepart.EMPTY;
BareJid bareFrom = null;
FullJid fullFrom = null;
if (from != null) {
fromResource = from.getResourceOrNull();
if (fromResource == null) {
fromResource = Resourcepart.EMPTY;
bareFrom = from.asBareJid();
}
else {
fullFrom = from.asFullJidIfPossible();
// We know that this must be a full JID in this case.
assert (fullFrom != null);
}
}
BareJid key = from != null ? from.asBareJid() : null; final BareJid key = from != null ? from.asBareJid() : null;
Map<Resourcepart, Presence> userPresences;
// If an "available" presence, add it to the presence map. Each presence asyncButOrdered.performAsyncButOrdered(key, new Runnable() {
// map will hold for a particular user a map with the presence @Override
// packets saved for each resource. public void run() {
switch (presence.getType()) { Resourcepart fromResource = Resourcepart.EMPTY;
case available: BareJid bareFrom = null;
// Get the user presence map FullJid fullFrom = null;
userPresences = getOrCreatePresencesInternal(key); if (from != null) {
// See if an offline presence was being stored in the map. If so, remove fromResource = from.getResourceOrNull();
// it since we now have an online presence. if (fromResource == null) {
userPresences.remove(Resourcepart.EMPTY); fromResource = Resourcepart.EMPTY;
// Add the new presence, using the resources as a key. bareFrom = from.asBareJid();
userPresences.put(fromResource, presence); }
// If the user is in the roster, fire an event. else {
if (contains(key)) { fullFrom = from.asFullJidIfPossible();
fireRosterPresenceEvent(presence); // We know that this must be a full JID in this case.
} assert (fullFrom != null);
for (PresenceEventListener presenceEventListener : presenceEventListeners) { }
presenceEventListener.presenceAvailable(fullFrom, presence);
}
break;
// If an "unavailable" packet.
case unavailable:
// If no resource, this is likely an offline presence as part of
// a roster presence flood. In that case, we store it.
userPresences = getOrCreatePresencesInternal(key);
if (from.hasNoResource()) {
// Get the user presence map
userPresences.put(Resourcepart.EMPTY, presence);
}
// Otherwise, this is a normal offline presence.
else {
// Store the offline presence, as it may include extra information
// such as the user being on vacation.
userPresences.put(fromResource, presence);
}
// If the user is in the roster, fire an event.
if (contains(key)) {
fireRosterPresenceEvent(presence);
}
// Ensure that 'from' is a full JID before invoking the presence unavailable
// listeners. Usually unavailable presences always have a resourcepart, i.e. are
// full JIDs, but RFC 6121 § 4.5.4 has an implementation note that unavailable
// presences from a bare JID SHOULD be treated as applying to all resources. I don't
// think any client or server ever implemented that, I do think that this
// implementation note is a terrible idea since it adds another corner case in
// client code, instead of just having the invariant
// "unavailable presences are always from the full JID".
if (fullFrom != null) {
for (PresenceEventListener presenceEventListener : presenceEventListeners) {
presenceEventListener.presenceUnavailable(fullFrom, presence);
} }
} else { Map<Resourcepart, Presence> userPresences;
LOGGER.fine("Unavailable presence from bare JID: " + presence); // If an "available" presence, add it to the presence map. Each presence
} // map will hold for a particular user a map with the presence
// packets saved for each resource.
switch (presence.getType()) {
case available:
// Get the user presence map
userPresences = getOrCreatePresencesInternal(key);
// See if an offline presence was being stored in the map. If so, remove
// it since we now have an online presence.
userPresences.remove(Resourcepart.EMPTY);
// Add the new presence, using the resources as a key.
userPresences.put(fromResource, presence);
// If the user is in the roster, fire an event.
if (contains(key)) {
fireRosterPresenceEvent(presence);
}
for (PresenceEventListener presenceEventListener : presenceEventListeners) {
presenceEventListener.presenceAvailable(fullFrom, presence);
}
break;
// If an "unavailable" packet.
case unavailable:
// If no resource, this is likely an offline presence as part of
// a roster presence flood. In that case, we store it.
userPresences = getOrCreatePresencesInternal(key);
if (from.hasNoResource()) {
// Get the user presence map
userPresences.put(Resourcepart.EMPTY, presence);
}
// Otherwise, this is a normal offline presence.
else {
// Store the offline presence, as it may include extra information
// such as the user being on vacation.
userPresences.put(fromResource, presence);
}
// If the user is in the roster, fire an event.
if (contains(key)) {
fireRosterPresenceEvent(presence);
}
break; // Ensure that 'from' is a full JID before invoking the presence unavailable
// Error presence packets from a bare JID mean we invalidate all existing // listeners. Usually unavailable presences always have a resourcepart, i.e. are
// presence info for the user. // full JIDs, but RFC 6121 § 4.5.4 has an implementation note that unavailable
case error: // presences from a bare JID SHOULD be treated as applying to all resources. I don't
// No need to act on error presences send without from, i.e. // think any client or server ever implemented that, I do think that this
// directly send from the users XMPP service, or where the from // implementation note is a terrible idea since it adds another corner case in
// address is not a bare JID // client code, instead of just having the invariant
if (from == null || !from.isEntityBareJid()) { // "unavailable presences are always from the full JID".
break; if (fullFrom != null) {
} for (PresenceEventListener presenceEventListener : presenceEventListeners) {
userPresences = getOrCreatePresencesInternal(key); presenceEventListener.presenceUnavailable(fullFrom, presence);
// Any other presence data is invalidated by the error packet. }
userPresences.clear(); } else {
LOGGER.fine("Unavailable presence from bare JID: " + presence);
}
// Set the new presence using the empty resource as a key. break;
userPresences.put(Resourcepart.EMPTY, presence); // Error presence packets from a bare JID mean we invalidate all existing
// If the user is in the roster, fire an event. // presence info for the user.
if (contains(key)) { case error:
fireRosterPresenceEvent(presence); // No need to act on error presences send without from, i.e.
// directly send from the users XMPP service, or where the from
// address is not a bare JID
if (from == null || !from.isEntityBareJid()) {
break;
}
userPresences = getOrCreatePresencesInternal(key);
// Any other presence data is invalidated by the error packet.
userPresences.clear();
// Set the new presence using the empty resource as a key.
userPresences.put(Resourcepart.EMPTY, presence);
// If the user is in the roster, fire an event.
if (contains(key)) {
fireRosterPresenceEvent(presence);
}
for (PresenceEventListener presenceEventListener : presenceEventListeners) {
presenceEventListener.presenceError(from, presence);
}
break;
case subscribed:
for (PresenceEventListener presenceEventListener : presenceEventListeners) {
presenceEventListener.presenceSubscribed(bareFrom, presence);
}
break;
case unsubscribed:
for (PresenceEventListener presenceEventListener : presenceEventListeners) {
presenceEventListener.presenceUnsubscribed(bareFrom, presence);
}
break;
default:
break;
}
} }
for (PresenceEventListener presenceEventListener : presenceEventListeners) { });
presenceEventListener.presenceError(from, presence);
}
break;
case subscribed:
for (PresenceEventListener presenceEventListener : presenceEventListeners) {
presenceEventListener.presenceSubscribed(bareFrom, presence);
}
break;
case unsubscribed:
for (PresenceEventListener presenceEventListener : presenceEventListeners) {
presenceEventListener.presenceUnsubscribed(bareFrom, presence);
}
break;
default:
break;
}
} }
} }