Add concurrent but in-order listeners to XMPPConnection

This commit is contained in:
Florian Schmaus 2019-02-04 21:58:35 +01:00
parent fee3ed81ca
commit 4c42d0cd32
2 changed files with 71 additions and 14 deletions

View File

@ -197,6 +197,8 @@ public abstract class AbstractXMPPConnection implements XMPPConnection {
*/ */
private final Collection<StanzaCollector> collectors = new ConcurrentLinkedQueue<>(); private final Collection<StanzaCollector> collectors = new ConcurrentLinkedQueue<>();
private final Map<StanzaListener, ListenerWrapper> recvListeners = new LinkedHashMap<>();
/** /**
* List of PacketListeners that will be notified synchronously when a new stanza was received. * List of PacketListeners that will be notified synchronously when a new stanza was received.
*/ */
@ -330,6 +332,8 @@ public abstract class AbstractXMPPConnection implements XMPPConnection {
protected static final AsyncButOrdered<AbstractXMPPConnection> ASYNC_BUT_ORDERED = new AsyncButOrdered<>(); protected static final AsyncButOrdered<AbstractXMPPConnection> ASYNC_BUT_ORDERED = new AsyncButOrdered<>();
protected final AsyncButOrdered<StanzaListener> inOrderListeners = new AsyncButOrdered<>();
/** /**
* The used host to establish the connection to * The used host to establish the connection to
*/ */
@ -940,6 +944,24 @@ public abstract class AbstractXMPPConnection implements XMPPConnection {
collectors.remove(collector); collectors.remove(collector);
} }
@Override
public final void addStanzaListener(StanzaListener stanzaListener, StanzaFilter stanzaFilter) {
if (stanzaListener == null) {
throw new NullPointerException("Given stanza listener must not be null");
}
ListenerWrapper wrapper = new ListenerWrapper(stanzaListener, stanzaFilter);
synchronized (recvListeners) {
recvListeners.put(stanzaListener, wrapper);
}
}
@Override
public final boolean removeStanzaListener(StanzaListener stanzaListener) {
synchronized (recvListeners) {
return recvListeners.remove(stanzaListener) != null;
}
}
@Override @Override
public void addSyncStanzaListener(StanzaListener packetListener, StanzaFilter packetFilter) { public void addSyncStanzaListener(StanzaListener packetListener, StanzaFilter packetFilter) {
if (packetListener == null) { if (packetListener == null) {
@ -1323,14 +1345,7 @@ public abstract class AbstractXMPPConnection implements XMPPConnection {
// the only difference is that asyncRecvListeners is used here and that the packet listeners are started in // the only difference is that asyncRecvListeners is used here and that the packet listeners are started in
// their own thread. // their own thread.
final Collection<StanzaListener> listenersToNotify = new LinkedList<>(); final Collection<StanzaListener> listenersToNotify = new LinkedList<>();
synchronized (asyncRecvListeners) { extractMatchingListeners(packet, asyncRecvListeners, listenersToNotify);
for (ListenerWrapper listenerWrapper : asyncRecvListeners.values()) {
if (listenerWrapper.filterMatches(packet)) {
listenersToNotify.add(listenerWrapper.getListener());
}
}
}
for (final StanzaListener listener : listenersToNotify) { for (final StanzaListener listener : listenersToNotify) {
asyncGo(new Runnable() { asyncGo(new Runnable() {
@Override @Override
@ -1349,16 +1364,25 @@ public abstract class AbstractXMPPConnection implements XMPPConnection {
collector.processStanza(packet); collector.processStanza(packet);
} }
// Notify the receive listeners interested in the packet
listenersToNotify.clear(); listenersToNotify.clear();
synchronized (syncRecvListeners) { extractMatchingListeners(packet, recvListeners, listenersToNotify);
for (ListenerWrapper listenerWrapper : syncRecvListeners.values()) { for (StanzaListener stanzaListener : listenersToNotify) {
if (listenerWrapper.filterMatches(packet)) { inOrderListeners.performAsyncButOrdered(stanzaListener, () -> {
listenersToNotify.add(listenerWrapper.getListener()); try {
stanzaListener.processStanza(packet);
} }
} catch (NotConnectedException e) {
LOGGER.log(Level.WARNING, "Got not connected exception, aborting", e);
}
catch (Exception e) {
LOGGER.log(Level.SEVERE, "Exception in packet listener", e);
}
});
} }
// Notify the receive listeners interested in the packet
listenersToNotify.clear();
extractMatchingListeners(packet, syncRecvListeners, listenersToNotify);
// Decouple incoming stanza processing from listener invocation. Unlike async listeners, this uses a single // Decouple incoming stanza processing from listener invocation. Unlike async listeners, this uses a single
// threaded executor service and therefore keeps the order. // threaded executor service and therefore keeps the order.
ASYNC_BUT_ORDERED.performAsyncButOrdered(this, new Runnable() { ASYNC_BUT_ORDERED.performAsyncButOrdered(this, new Runnable() {
@ -1391,6 +1415,17 @@ public abstract class AbstractXMPPConnection implements XMPPConnection {
}); });
} }
private static void extractMatchingListeners(Stanza stanza, Map<StanzaListener, ListenerWrapper> listeners,
Collection<StanzaListener> listenersToNotify) {
synchronized (listeners) {
for (ListenerWrapper listenerWrapper : listeners.values()) {
if (listenerWrapper.filterMatches(stanza)) {
listenersToNotify.add(listenerWrapper.getListener());
}
}
}
}
/** /**
* Sets whether the connection has already logged in the server. This method assures that the * Sets whether the connection has already logged in the server. This method assures that the
* {@link #wasAuthenticated} flag is never reset once it has ever been set. * {@link #wasAuthenticated} flag is never reset once it has ever been set.

View File

@ -313,6 +313,28 @@ public interface XMPPConnection {
*/ */
void removeStanzaCollector(StanzaCollector collector); void removeStanzaCollector(StanzaCollector collector);
/**
* Registers a stanza listener with this connection. The listener will be invoked when a (matching) incoming stanza
* is received. The stanza filter determines which stanzas will be delivered to the listener. It is guaranteed that
* the same listener will not be invoked concurrently and the the order of invocation will reflect the order in
* which the stanzas have been received. If the same stanza listener is added again with a different filter, only
* the new filter will be used.
*
* @param stanzaListener the stanza listener to notify of new received stanzas.
* @param stanzaFilter the stanza filter to use.
* @since 4.4
*/
void addStanzaListener(StanzaListener stanzaListener, StanzaFilter stanzaFilter);
/**
* Removes a stanza listener for received stanzas from this connection.
*
* @param stanzaListener the stanza listener to remove.
* @return true if the stanza listener was removed.
* @since 4.4
*/
boolean removeStanzaListener(StanzaListener stanzaListener);
/** /**
* Registers a <b>synchronous</b> stanza listener with this connection. A stanza listener will be invoked only when * Registers a <b>synchronous</b> stanza listener with this connection. A stanza listener will be invoked only when
* an incoming stanza is received. A stanza filter determines which stanzas will be delivered to the listener. If * an incoming stanza is received. A stanza filter determines which stanzas will be delivered to the listener. If