Fix counting bug Stream Management implementation

Previously Smack would put messages in the unacknowledgedStanzas queue
after it received the 'enabled' element, when it should do so right
after sending the 'enable' stream element.

Imagine a session where '-->' denotes "received from server" and '<--'
"sent to server"

<-- enable
--> iq roster push set
--> presence some presence
<-- iq roster push result
--> enabled

then Smack would not add the iq roster push result stanza to the
unacknowledgedStanzas queue.

This fixes the issue by initializing the unacknowledgedStanzas queue
when the writer thread encounters a 'Enable' stream element.

The additional 'instanceof' invocation in the writer thread should not
be a big performance issue, since the existing "instanceof Stanza" check
should be the common case and the "instanceof Enable" is an exclusive
alternative to this case.
This commit is contained in:
Florian Schmaus 2015-03-02 15:12:19 +01:00
parent dde0cfd7f6
commit 96bb37f924
1 changed files with 13 additions and 2 deletions

View File

@ -397,10 +397,15 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
// There was a previous connection with SM enabled but that was either not resumable or
// failed to resume. Make sure that we (re-)send the unacknowledged stanzas.
unacknowledgedStanzas.drainTo(previouslyUnackedStanzas);
// Reset unacknowledged stanzas to 'null' to signal that we never send 'enable' in this
// XMPP session (There maybe was an enabled in a previous XMPP session of this
// connection instance though). This is used in writePackets to decide if stanzas should
// be added to the unacknowledged stanzas queue, because they have to be added right
// after the 'enable' stream element has been sent.
unacknowledgedStanzas = null;
}
if (isSmAvailable() && useSm) {
// Remove what is maybe left from previously stream managed sessions
unacknowledgedStanzas = new ArrayBlockingQueue<Stanza>(QUEUE_SIZE);
serverHandledStanzasCount = 0;
// XEP-198 3. Enabling Stream Management. If the server response to 'Enable' is 'Failed'
// then this is a non recoverable error and we therefore throw an exception.
@ -1330,11 +1335,17 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
if (element instanceof Stanza) {
packet = (Stanza) element;
}
else if (element instanceof Enable) {
// The client needs to add messages to the unacknowledged stanzas queue
// right after it sent 'enabled'. Stanza will be added once
// unacknowledgedStanzas is not null.
unacknowledgedStanzas = new ArrayBlockingQueue<>(QUEUE_SIZE);
}
// Check if the stream element should be put to the unacknowledgedStanza
// queue. Note that we can not do the put() in sendPacketInternal() and the
// packet order is not stable at this point (sendPacketInternal() can be
// called concurrently).
if (isSmEnabled() && packet != null) {
if (unacknowledgedStanzas != null && packet != null) {
// If the unacknowledgedStanza queue is nearly full, request an new ack
// from the server in order to drain it
if (unacknowledgedStanzas.size() == 0.8 * XMPPTCPConnection.QUEUE_SIZE) {