Also add stanzas to unacknowledgedStanzas while shutting down

This caused the WaitForClosingStreamElementTest integration test to
fail, because the last presences stanzas, which are send after done()
in the writer thread would return true, are not added to the
unacknowledgedStanzas queue.

The result was:

SEVERE: WaitForClosingStreamElementTest.waitForClosingStreamElementTest (LowLevel): Failed
java.lang.reflect.InvocationTargetException
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.igniterealtime.smack.inttest.SmackIntegrationTestFramework.invokeLowLevel(SmackIntegrationTestFramework.java:466)
        at org.igniterealtime.smack.inttest.SmackIntegrationTestFramework.runTests(SmackIntegrationTestFramework.java:375)
        at org.igniterealtime.smack.inttest.SmackIntegrationTestFramework.run(SmackIntegrationTestFramework.java:165)
        at org.igniterealtime.smack.inttest.SmackIntegrationTestFramework.main(SmackIntegrationTestFramework.java:84)
Caused by: java.lang.AssertionError: Sync poing yielded failure exception
        at org.jivesoftware.smack.WaitForClosingStreamElementTest.waitForClosingStreamElementTest(WaitForClosingStreamElementTest.java:46)
        ... 8 more
Caused by: org.jivesoftware.smack.sm.StreamManagementException$StreamManagementCounterError: There was an error regarding the Stream Mangement counters. Server reported 3 handled stanzas, which means that the 3 recently send stanzas by client are now acked by the server. But Smack had only 1 to acknowledge. The stanza id of the last acked outstanding stanza is FqL1X-144
        at org.jivesoftware.smack.tcp.XMPPTCPConnection.processHandledCount(XMPPTCPConnection.java:1847)
        at org.jivesoftware.smack.tcp.XMPPTCPConnection.access$2600(XMPPTCPConnection.java:149)
        at org.jivesoftware.smack.tcp.XMPPTCPConnection$PacketReader.parsePackets(XMPPTCPConnection.java:1176)
        at org.jivesoftware.smack.tcp.XMPPTCPConnection$PacketReader.access$300(XMPPTCPConnection.java:980)
        at org.jivesoftware.smack.tcp.XMPPTCPConnection$PacketReader$1.run(XMPPTCPConnection.java:996)
        at java.lang.Thread.run(Thread.java:745)
This commit is contained in:
Florian Schmaus 2017-03-18 17:41:30 +01:00
parent 3d1cf61caf
commit 3ce5973915
1 changed files with 28 additions and 20 deletions

View File

@ -1423,26 +1423,7 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
// unacknowledgedStanzas is not null.
unacknowledgedStanzas = new ArrayBlockingQueue<>(QUEUE_SIZE);
}
// Check if the stream element should be put to the unacknowledgedStanza
// queue. Note that we can not do the put() in sendStanzaInternal() and the
// packet order is not stable at this point (sendStanzaInternal() can be
// called concurrently).
if (unacknowledgedStanzas != null && packet != null) {
// If the unacknowledgedStanza queue is nearly full, request an new ack
// from the server in order to drain it
if (unacknowledgedStanzas.size() == 0.8 * XMPPTCPConnection.QUEUE_SIZE) {
writer.write(AckRequest.INSTANCE.toXML().toString());
writer.flush();
}
try {
// It is important the we put the stanza in the unacknowledged stanza
// queue before we put it on the wire
unacknowledgedStanzas.put(packet);
}
catch (InterruptedException e) {
throw new IllegalStateException(e);
}
}
maybeAddToUnacknowledgedStanzas(packet);
CharSequence elementXml = element.toXML();
if (elementXml instanceof XmlStringBuilder) {
@ -1464,6 +1445,10 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
try {
while (!queue.isEmpty()) {
Element packet = queue.remove();
if (packet instanceof Stanza) {
Stanza stanza = (Stanza) packet;
maybeAddToUnacknowledgedStanzas(stanza);
}
writer.write(packet.toXML().toString());
}
writer.flush();
@ -1522,6 +1507,29 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
}
}
}
private void maybeAddToUnacknowledgedStanzas(Stanza stanza) throws IOException {
// Check if the stream element should be put to the unacknowledgedStanza
// queue. Note that we can not do the put() in sendStanzaInternal() and the
// packet order is not stable at this point (sendStanzaInternal() can be
// called concurrently).
if (unacknowledgedStanzas != null && stanza != null) {
// If the unacknowledgedStanza queue is nearly full, request an new ack
// from the server in order to drain it
if (unacknowledgedStanzas.size() == 0.8 * XMPPTCPConnection.QUEUE_SIZE) {
writer.write(AckRequest.INSTANCE.toXML().toString());
writer.flush();
}
try {
// It is important the we put the stanza in the unacknowledged stanza
// queue before we put it on the wire
unacknowledgedStanzas.put(stanza);
}
catch (InterruptedException e) {
throw new IllegalStateException(e);
}
}
}
}
/**