[core] Replace AbstractXMPPConnection.inOrderListeners

Using AsyncButOrdered for the receive listeners means that a listener
may not have been yet run once
invokeStanzaCollectorsAndNotifyRecvListeners() returnes. This can lead
to deadlocks as reported by Boris Grozev [1].

Fixes SMACK-927.

1: https://discourse.igniterealtime.org/t/thread-stuck-in-multiuserchat-enter-forever/92090
This commit is contained in:
Florian Schmaus 2022-09-28 13:52:58 +02:00
parent 9486fd6176
commit 92f253cc74
1 changed files with 6 additions and 3 deletions

View File

@ -33,6 +33,7 @@ import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -348,8 +349,6 @@ public abstract class AbstractXMPPConnection implements XMPPConnection {
protected static final AsyncButOrdered<AbstractXMPPConnection> ASYNC_BUT_ORDERED = new AsyncButOrdered<>(); protected static final AsyncButOrdered<AbstractXMPPConnection> ASYNC_BUT_ORDERED = new AsyncButOrdered<>();
protected final AsyncButOrdered<StanzaListener> inOrderListeners = new AsyncButOrdered<>();
/** /**
* The used host to establish the connection to * The used host to establish the connection to
*/ */
@ -1613,8 +1612,9 @@ public abstract class AbstractXMPPConnection implements XMPPConnection {
listenersToNotify.clear(); listenersToNotify.clear();
extractMatchingListeners(packet, recvListeners, listenersToNotify); extractMatchingListeners(packet, recvListeners, listenersToNotify);
final Semaphore listenerSemaphore = new Semaphore(1 - listenersToNotify.size());
for (StanzaListener stanzaListener : listenersToNotify) { for (StanzaListener stanzaListener : listenersToNotify) {
inOrderListeners.performAsyncButOrdered(stanzaListener, () -> { asyncGoLimited(() -> {
try { try {
stanzaListener.processStanza(packet); stanzaListener.processStanza(packet);
} }
@ -1623,9 +1623,12 @@ public abstract class AbstractXMPPConnection implements XMPPConnection {
} }
catch (Exception e) { catch (Exception e) {
LOGGER.log(Level.SEVERE, "Exception in packet listener", e); LOGGER.log(Level.SEVERE, "Exception in packet listener", e);
} finally {
listenerSemaphore.release();
} }
}); });
} }
listenerSemaphore.acquireUninterruptibly();
// Notify the receive listeners interested in the packet // Notify the receive listeners interested in the packet
listenersToNotify.clear(); listenersToNotify.clear();