1
0
Fork 0
mirror of https://codeberg.org/Mercury-IM/Smack synced 2024-11-25 15:52:06 +01:00

Asynchronous invoke Stanza Acknowledged Listeners

This commit is contained in:
Florian Schmaus 2014-10-29 10:15:57 +01:00
parent ac41fda1e5
commit 6b21455b32

View file

@ -72,6 +72,7 @@ import org.jivesoftware.smack.tcp.sm.packet.StreamManagement.StreamManagementFea
import org.jivesoftware.smack.tcp.sm.predicates.Predicate; import org.jivesoftware.smack.tcp.sm.predicates.Predicate;
import org.jivesoftware.smack.tcp.sm.provider.ParseStreamManagement; import org.jivesoftware.smack.tcp.sm.provider.ParseStreamManagement;
import org.jivesoftware.smack.util.ArrayBlockingQueueWithShutdown; import org.jivesoftware.smack.util.ArrayBlockingQueueWithShutdown;
import org.jivesoftware.smack.util.Async;
import org.jivesoftware.smack.util.PacketParserUtils; import org.jivesoftware.smack.util.PacketParserUtils;
import org.jivesoftware.smack.util.StringUtils; import org.jivesoftware.smack.util.StringUtils;
import org.jivesoftware.smack.util.TLSUtils; import org.jivesoftware.smack.util.TLSUtils;
@ -1738,7 +1739,7 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
private void processHandledCount(long handledCount) throws NotConnectedException { private void processHandledCount(long handledCount) throws NotConnectedException {
long ackedStanzasCount = SMUtils.calculateDelta(handledCount, serverHandledStanzasCount); long ackedStanzasCount = SMUtils.calculateDelta(handledCount, serverHandledStanzasCount);
List<Packet> ackedStanzas = new ArrayList<Packet>( final List<Packet> ackedStanzas = new ArrayList<Packet>(
handledCount <= Integer.MAX_VALUE ? (int) handledCount handledCount <= Integer.MAX_VALUE ? (int) handledCount
: Integer.MAX_VALUE); : Integer.MAX_VALUE);
for (long i = 0; i < ackedStanzasCount; i++) { for (long i = 0; i < ackedStanzasCount; i++) {
@ -1748,18 +1749,42 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
assert(ackedStanza != null); assert(ackedStanza != null);
ackedStanzas.add(ackedStanza); ackedStanzas.add(ackedStanza);
} }
boolean atLeastOneStanzaIdAcknowledgedListener = false;
for (Packet ackedStanza : ackedStanzas) { for (Packet ackedStanza : ackedStanzas) {
for (PacketListener listener : stanzaAcknowledgedListeners) {
listener.processPacket(ackedStanza);
}
String id = ackedStanza.getPacketID(); String id = ackedStanza.getPacketID();
if (id != null) { if (id != null && stanzaAcknowledgedListeners.contains(id)) {
PacketListener listener = stanzaIdAcknowledgedListeners.remove(id); atLeastOneStanzaIdAcknowledgedListener = true;
if (listener != null) { break;
listener.processPacket(ackedStanza);
}
} }
} }
// Only spawn a new thread if there is a chance that some listener is invoked
if (atLeastOneStanzaIdAcknowledgedListener || !stanzaAcknowledgedListeners.isEmpty()) {
Async.go(new Runnable() {
@Override
public void run() {
try {
for (Packet ackedStanza : ackedStanzas) {
for (PacketListener listener : stanzaAcknowledgedListeners) {
listener.processPacket(ackedStanza);
}
String id = ackedStanza.getPacketID();
if (id != null) {
PacketListener listener = stanzaIdAcknowledgedListeners.remove(id);
if (listener != null) {
listener.processPacket(ackedStanza);
}
}
}
}
catch (NotConnectedException e) {
LOGGER.log(Level.FINER, "Received not connected exception, aborting", e);
}
}
}, "Stanza Acknowledged Listener Executor Thread " + handledCount + " (" + getConnectionCounter() + ')');
}
serverHandledStanzasCount = handledCount; serverHandledStanzasCount = handledCount;
} }
} }