From 8840236b72681f2b18998a26dc3c27c4bf3673ce Mon Sep 17 00:00:00 2001 From: Florian Schmaus Date: Wed, 27 May 2015 17:36:04 +0200 Subject: [PATCH] Gracefully disconnect XMPPTCPConnection MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit That is, wait with a timeout for a closing stream element if we have sent a closing stream element to the server. See RFC 6120 ยง 4.4. Fixes SMACK-633. --- .../smack/SynchronizationPoint.java | 33 ++++++++---- .../smack/inttest/IntTestUtil.java | 11 ++++ .../WaitForClosingStreamElementTest.java | 45 ++++++++++++++++ .../smack/tcp/XMPPTCPConnection.java | 54 +++++++++++++++++-- 4 files changed, 127 insertions(+), 16 deletions(-) create mode 100644 smack-integration-test/src/main/java/org/jivesoftware/smack/WaitForClosingStreamElementTest.java diff --git a/smack-core/src/main/java/org/jivesoftware/smack/SynchronizationPoint.java b/smack-core/src/main/java/org/jivesoftware/smack/SynchronizationPoint.java index 853438778..e7deef172 100644 --- a/smack-core/src/main/java/org/jivesoftware/smack/SynchronizationPoint.java +++ b/smack-core/src/main/java/org/jivesoftware/smack/SynchronizationPoint.java @@ -65,8 +65,9 @@ public class SynchronizationPoint { * @param request the plain stream element to send. * @throws NoResponseException if no response was received. * @throws NotConnectedException if the connection is not connected. + * @return true if synchronization point was successful, false on failure. */ - public void sendAndWaitForResponse(TopLevelStreamElement request) throws NoResponseException, + public boolean sendAndWaitForResponse(TopLevelStreamElement request) throws NoResponseException, NotConnectedException, InterruptedException { assert (state == State.Initial); connectionLock.lock(); @@ -87,7 +88,7 @@ public class SynchronizationPoint { finally { connectionLock.unlock(); } - checkForResponse(); + return checkForResponse(); } /** @@ -128,20 +129,27 @@ public class SynchronizationPoint { /** * Check if this synchronization point is successful or wait the connections reply timeout. * @throws NoResponseException if there was no response marking the synchronization point as success or failed. - * @throws InterruptedException + * @throws InterruptedException + * @return true if synchronization point was successful, false on failure. */ - public void checkIfSuccessOrWait() throws NoResponseException, InterruptedException { + public boolean checkIfSuccessOrWait() throws NoResponseException, InterruptedException { connectionLock.lock(); try { - if (state == State.Success) { - // Return immediately - return; + switch (state) { + // Return immediately on success or failure + case Success: + return true; + case Failure: + return false; + default: + // Do nothing + break; } waitForConditionOrTimeout(); } finally { connectionLock.unlock(); } - checkForResponse(); + return checkForResponse(); } /** @@ -240,15 +248,18 @@ public class SynchronizationPoint { *

* @throws NoResponseException */ - private void checkForResponse() throws NoResponseException { + private boolean checkForResponse() throws NoResponseException { switch (state) { case Initial: case NoResponse: case RequestSent: throw NoResponseException.newWith(connection); + case Success: + return true; + case Failure: + return false; default: - // Do nothing - break; + throw new AssertionError("Unknown state " + state); } } diff --git a/smack-integration-test/src/main/java/org/igniterealtime/smack/inttest/IntTestUtil.java b/smack-integration-test/src/main/java/org/igniterealtime/smack/inttest/IntTestUtil.java index 185b763e9..fe31ef3a6 100644 --- a/smack-integration-test/src/main/java/org/igniterealtime/smack/inttest/IntTestUtil.java +++ b/smack-integration-test/src/main/java/org/igniterealtime/smack/inttest/IntTestUtil.java @@ -75,6 +75,17 @@ public class IntTestUtil { public static void disconnectAndMaybeDelete(XMPPTCPConnection connection, boolean delete) throws InterruptedException { + // If the connection is disconnected, then re-reconnect and login. This could happen when + // (low-level) integration tests disconnect the connection, e.g. to test disconnection + // mechanisms + if (!connection.isConnected()) { + try { + connection.connect().login(); + } + catch (XMPPException | SmackException | IOException e) { + LOGGER.log(Level.WARNING, "Exception reconnection account for deletion", e); + } + } try { if (delete) { final int maxAttempts = 3; diff --git a/smack-integration-test/src/main/java/org/jivesoftware/smack/WaitForClosingStreamElementTest.java b/smack-integration-test/src/main/java/org/jivesoftware/smack/WaitForClosingStreamElementTest.java new file mode 100644 index 000000000..1e2b9584f --- /dev/null +++ b/smack-integration-test/src/main/java/org/jivesoftware/smack/WaitForClosingStreamElementTest.java @@ -0,0 +1,45 @@ +/** + * + * Copyright 2015 Florian Schmaus + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.jivesoftware.smack; + +import static org.junit.Assert.assertTrue; + +import java.lang.reflect.Field; + +import org.igniterealtime.smack.inttest.AbstractSmackLowLevelIntegrationTest; +import org.igniterealtime.smack.inttest.Configuration; +import org.igniterealtime.smack.inttest.SmackIntegrationTest; +import org.jivesoftware.smack.tcp.XMPPTCPConnection; + +public class WaitForClosingStreamElementTest extends AbstractSmackLowLevelIntegrationTest { + + public WaitForClosingStreamElementTest(Configuration configuration, String testRunId) { + super(configuration, testRunId); + } + + @SmackIntegrationTest + public void waitForClosingStreamElementTest(XMPPTCPConnection connection) + throws NoSuchFieldException, SecurityException, IllegalArgumentException, + IllegalAccessException { + connection.disconnect(); + + Field closingStreamReceivedField = connection.getClass().getDeclaredField("closingStreamReceived"); + closingStreamReceivedField.setAccessible(true); + SynchronizationPoint closingStreamReceived = (SynchronizationPoint) closingStreamReceivedField.get(connection); + assertTrue(closingStreamReceived.wasSuccessful()); + } +} diff --git a/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java b/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java index a8d24c176..393be87dd 100644 --- a/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java +++ b/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java @@ -188,6 +188,12 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { private final SynchronizationPoint compressSyncPoint = new SynchronizationPoint( this); + /** + * A synchronization point which is successful if this connection has received the closing + * stream element from the remote end-point, i.e. the server. + */ + private final SynchronizationPoint closingStreamReceived = new SynchronizationPoint(this); + /** * The default bundle and defer callback, used for new connections. * @see bundleAndDeferCallback @@ -458,16 +464,28 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { shutdown(true); } + /** + * true if this connection is currently disconnecting the session by performing a + * shut down. + */ + private volatile boolean shutdownInProgress; + private void shutdown(boolean instant) { if (disconnectedButResumeable) { return; } + + shutdownInProgress = true; + + // First shutdown the writer, this will result in a closing stream element getting send to + // the server + if (packetWriter != null) { + packetWriter.shutdown(instant); + } + if (packetReader != null) { packetReader.shutdown(); } - if (packetWriter != null) { - packetWriter.shutdown(instant); - } // Set socketClosed to true. This will cause the PacketReader // and PacketWriter to ignore any Exceptions that are thrown @@ -492,6 +510,7 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { // stream tag, there is no longer a stream to resume. smSessionId = null; } + shutdownInProgress = false; authenticated = false; connected = false; usingTLS = false; @@ -789,6 +808,7 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { */ @Override protected void connectInternal() throws SmackException, IOException, XMPPException, InterruptedException { + closingStreamReceived.init(); // Establishes the TCP connection to the server and does setup the reader and writer. Throws an exception if // there is an error establishing the connection connectUsingConfiguration(); @@ -1102,8 +1122,19 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { break; case XmlPullParser.END_TAG: if (parser.getName().equals("stream")) { - // Disconnect the connection - disconnect(); + closingStreamReceived.reportSuccess(); + if (shutdownInProgress) { + // We received a closing stream element *after* we initiated the + // termination of the session by sending a closing stream element to + // the server first + return; + } else { + // We received a closing stream element from the server without us + // sending a closing stream element first. This means that the + // server wants to terminate the session, therefore disconnect + // the connection + disconnect(); + } } break; case XmlPullParser.END_DOCUMENT: @@ -1116,6 +1147,7 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { } } catch (Exception e) { + closingStreamReceived.reportFailure(e); // The exception can be ignored if the the connection is 'done' // or if the it was caused because the socket got closed if (!(done || isSocketClosed())) { @@ -1362,6 +1394,18 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { catch (Exception e) { LOGGER.log(Level.WARNING, "Exception writing closing stream element", e); } + + try { + // After we send the closing stream element, check if there was already a + // closing stream element sent by the server or wait with a timeout for a + // closing stream element to be received from the server. + closingStreamReceived.checkIfSuccessOrWait(); + } catch (NoResponseException e) { + LOGGER.log(Level.INFO, "No response while waiting for closing stream element from the server (" + getConnectionCounter() + ")", e); + } catch (InterruptedException e) { + LOGGER.log(Level.INFO, "Waiting for closing stream element from the server was interrupted (" + getConnectionCounter() + ")", e); + } + // Delete the queue contents (hopefully nothing is left). queue.clear(); } else if (instantShutdown && isSmEnabled()) {