From 3ce597391518e4587bb10efd09f890abec917562 Mon Sep 17 00:00:00 2001 From: Florian Schmaus Date: Sat, 18 Mar 2017 17:41:30 +0100 Subject: [PATCH] Also add stanzas to unacknowledgedStanzas while shutting down This caused the WaitForClosingStreamElementTest integration test to fail, because the last presences stanzas, which are send after done() in the writer thread would return true, are not added to the unacknowledgedStanzas queue. The result was: SEVERE: WaitForClosingStreamElementTest.waitForClosingStreamElementTest (LowLevel): Failed java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.igniterealtime.smack.inttest.SmackIntegrationTestFramework.invokeLowLevel(SmackIntegrationTestFramework.java:466) at org.igniterealtime.smack.inttest.SmackIntegrationTestFramework.runTests(SmackIntegrationTestFramework.java:375) at org.igniterealtime.smack.inttest.SmackIntegrationTestFramework.run(SmackIntegrationTestFramework.java:165) at org.igniterealtime.smack.inttest.SmackIntegrationTestFramework.main(SmackIntegrationTestFramework.java:84) Caused by: java.lang.AssertionError: Sync poing yielded failure exception at org.jivesoftware.smack.WaitForClosingStreamElementTest.waitForClosingStreamElementTest(WaitForClosingStreamElementTest.java:46) ... 8 more Caused by: org.jivesoftware.smack.sm.StreamManagementException$StreamManagementCounterError: There was an error regarding the Stream Mangement counters. Server reported 3 handled stanzas, which means that the 3 recently send stanzas by client are now acked by the server. But Smack had only 1 to acknowledge. The stanza id of the last acked outstanding stanza is FqL1X-144 at org.jivesoftware.smack.tcp.XMPPTCPConnection.processHandledCount(XMPPTCPConnection.java:1847) at org.jivesoftware.smack.tcp.XMPPTCPConnection.access$2600(XMPPTCPConnection.java:149) at org.jivesoftware.smack.tcp.XMPPTCPConnection$PacketReader.parsePackets(XMPPTCPConnection.java:1176) at org.jivesoftware.smack.tcp.XMPPTCPConnection$PacketReader.access$300(XMPPTCPConnection.java:980) at org.jivesoftware.smack.tcp.XMPPTCPConnection$PacketReader$1.run(XMPPTCPConnection.java:996) at java.lang.Thread.run(Thread.java:745) --- .../smack/tcp/XMPPTCPConnection.java | 48 +++++++++++-------- 1 file changed, 28 insertions(+), 20 deletions(-) diff --git a/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java b/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java index 2b3665886..192c4813e 100644 --- a/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java +++ b/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java @@ -1423,26 +1423,7 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { // unacknowledgedStanzas is not null. unacknowledgedStanzas = new ArrayBlockingQueue<>(QUEUE_SIZE); } - // Check if the stream element should be put to the unacknowledgedStanza - // queue. Note that we can not do the put() in sendStanzaInternal() and the - // packet order is not stable at this point (sendStanzaInternal() can be - // called concurrently). - if (unacknowledgedStanzas != null && packet != null) { - // If the unacknowledgedStanza queue is nearly full, request an new ack - // from the server in order to drain it - if (unacknowledgedStanzas.size() == 0.8 * XMPPTCPConnection.QUEUE_SIZE) { - writer.write(AckRequest.INSTANCE.toXML().toString()); - writer.flush(); - } - try { - // It is important the we put the stanza in the unacknowledged stanza - // queue before we put it on the wire - unacknowledgedStanzas.put(packet); - } - catch (InterruptedException e) { - throw new IllegalStateException(e); - } - } + maybeAddToUnacknowledgedStanzas(packet); CharSequence elementXml = element.toXML(); if (elementXml instanceof XmlStringBuilder) { @@ -1464,6 +1445,10 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { try { while (!queue.isEmpty()) { Element packet = queue.remove(); + if (packet instanceof Stanza) { + Stanza stanza = (Stanza) packet; + maybeAddToUnacknowledgedStanzas(stanza); + } writer.write(packet.toXML().toString()); } writer.flush(); @@ -1522,6 +1507,29 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { } } } + + private void maybeAddToUnacknowledgedStanzas(Stanza stanza) throws IOException { + // Check if the stream element should be put to the unacknowledgedStanza + // queue. Note that we can not do the put() in sendStanzaInternal() and the + // packet order is not stable at this point (sendStanzaInternal() can be + // called concurrently). + if (unacknowledgedStanzas != null && stanza != null) { + // If the unacknowledgedStanza queue is nearly full, request an new ack + // from the server in order to drain it + if (unacknowledgedStanzas.size() == 0.8 * XMPPTCPConnection.QUEUE_SIZE) { + writer.write(AckRequest.INSTANCE.toXML().toString()); + writer.flush(); + } + try { + // It is important the we put the stanza in the unacknowledged stanza + // queue before we put it on the wire + unacknowledgedStanzas.put(stanza); + } + catch (InterruptedException e) { + throw new IllegalStateException(e); + } + } + } } /**