diff --git a/smack-tcp/src/main/java/org/jivesoftware/smack/sm/StreamManagementException.java b/smack-tcp/src/main/java/org/jivesoftware/smack/sm/StreamManagementException.java index f5e30784d..553d9d702 100644 --- a/smack-tcp/src/main/java/org/jivesoftware/smack/sm/StreamManagementException.java +++ b/smack-tcp/src/main/java/org/jivesoftware/smack/sm/StreamManagementException.java @@ -16,10 +16,13 @@ */ package org.jivesoftware.smack.sm; +import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.BlockingQueue; import org.jivesoftware.smack.SmackException; +import org.jivesoftware.smack.packet.Element; import org.jivesoftware.smack.packet.Stanza; public abstract class StreamManagementException extends SmackException { @@ -110,5 +113,56 @@ public abstract class StreamManagementException extends SmackException { return ackedStanzas; } } + + public static final class UnacknowledgedQueueFullException extends StreamManagementException { + + /** + * + */ + private static final long serialVersionUID = 1L; + + private final int overflowElementNum; + private final int droppedElements; + private final List elements; + private final List unacknowledgesStanzas; + + private UnacknowledgedQueueFullException(String message, int overflowElementNum, int droppedElements, List elements, + List unacknowledgesStanzas) { + super(message); + this.overflowElementNum = overflowElementNum; + this.droppedElements = droppedElements; + this.elements = elements; + this.unacknowledgesStanzas = unacknowledgesStanzas; + } + + public int getOverflowElementNum() { + return overflowElementNum; + } + + public int getDroppedElements() { + return droppedElements; + } + + public List getElements() { + return elements; + } + + public List getUnacknowledgesStanzas() { + return unacknowledgesStanzas; + } + + public static UnacknowledgedQueueFullException newWith(int overflowElementNum, List elements, + BlockingQueue unacknowledgedStanzas) { + final int unacknowledgesStanzasQueueSize = unacknowledgedStanzas.size(); + List localUnacknowledgesStanzas = new ArrayList<>(unacknowledgesStanzasQueueSize); + localUnacknowledgesStanzas.addAll(unacknowledgedStanzas); + int droppedElements = elements.size() - overflowElementNum - 1; + + String message = "The queue size " + unacknowledgesStanzasQueueSize + " is not able to fit another " + + droppedElements + " potential stanzas type top-level stream-elements."; + return new UnacknowledgedQueueFullException(message, overflowElementNum, droppedElements, elements, + localUnacknowledgesStanzas); + } + } } diff --git a/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java b/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java index b48e3cf4b..6ba5e64fd 100644 --- a/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java +++ b/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java @@ -1514,7 +1514,16 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { private void drainWriterQueueToUnacknowledgedStanzas() { List elements = new ArrayList<>(queue.size()); queue.drainTo(elements); - for (Element element : elements) { + for (int i = 0; i < elements.size(); i++) { + Element element = elements.get(i); + // If the unacknowledgedStanza queue is full, then bail out with a warning message. See SMACK-844. + if (unacknowledgedStanzas.remainingCapacity() == 0) { + StreamManagementException.UnacknowledgedQueueFullException exception = StreamManagementException.UnacknowledgedQueueFullException + .newWith(i, elements, unacknowledgedStanzas); + LOGGER.log(Level.WARNING, + "Some stanzas may be lost as not all could be drained to the unacknowledged stanzas queue", exception); + return; + } if (element instanceof Stanza) { unacknowledgedStanzas.add((Stanza) element); }