Gracefully disconnect XMPPTCPConnection

That is, wait with a timeout for a closing stream element if we have
sent a closing stream element to the server. See RFC 6120 § 4.4.

Fixes SMACK-633.
This commit is contained in:
Florian Schmaus 2015-05-27 17:36:04 +02:00
parent 01f08e87c2
commit 8840236b72
4 changed files with 127 additions and 16 deletions

View File

@ -65,8 +65,9 @@ public class SynchronizationPoint<E extends Exception> {
* @param request the plain stream element to send. * @param request the plain stream element to send.
* @throws NoResponseException if no response was received. * @throws NoResponseException if no response was received.
* @throws NotConnectedException if the connection is not connected. * @throws NotConnectedException if the connection is not connected.
* @return <code>true</code> if synchronization point was successful, <code>false</code> on failure.
*/ */
public void sendAndWaitForResponse(TopLevelStreamElement request) throws NoResponseException, public boolean sendAndWaitForResponse(TopLevelStreamElement request) throws NoResponseException,
NotConnectedException, InterruptedException { NotConnectedException, InterruptedException {
assert (state == State.Initial); assert (state == State.Initial);
connectionLock.lock(); connectionLock.lock();
@ -87,7 +88,7 @@ public class SynchronizationPoint<E extends Exception> {
finally { finally {
connectionLock.unlock(); connectionLock.unlock();
} }
checkForResponse(); return checkForResponse();
} }
/** /**
@ -128,20 +129,27 @@ public class SynchronizationPoint<E extends Exception> {
/** /**
* Check if this synchronization point is successful or wait the connections reply timeout. * Check if this synchronization point is successful or wait the connections reply timeout.
* @throws NoResponseException if there was no response marking the synchronization point as success or failed. * @throws NoResponseException if there was no response marking the synchronization point as success or failed.
* @throws InterruptedException * @throws InterruptedException
* @return <code>true</code> if synchronization point was successful, <code>false</code> on failure.
*/ */
public void checkIfSuccessOrWait() throws NoResponseException, InterruptedException { public boolean checkIfSuccessOrWait() throws NoResponseException, InterruptedException {
connectionLock.lock(); connectionLock.lock();
try { try {
if (state == State.Success) { switch (state) {
// Return immediately // Return immediately on success or failure
return; case Success:
return true;
case Failure:
return false;
default:
// Do nothing
break;
} }
waitForConditionOrTimeout(); waitForConditionOrTimeout();
} finally { } finally {
connectionLock.unlock(); connectionLock.unlock();
} }
checkForResponse(); return checkForResponse();
} }
/** /**
@ -240,15 +248,18 @@ public class SynchronizationPoint<E extends Exception> {
* </p> * </p>
* @throws NoResponseException * @throws NoResponseException
*/ */
private void checkForResponse() throws NoResponseException { private boolean checkForResponse() throws NoResponseException {
switch (state) { switch (state) {
case Initial: case Initial:
case NoResponse: case NoResponse:
case RequestSent: case RequestSent:
throw NoResponseException.newWith(connection); throw NoResponseException.newWith(connection);
case Success:
return true;
case Failure:
return false;
default: default:
// Do nothing throw new AssertionError("Unknown state " + state);
break;
} }
} }

View File

@ -75,6 +75,17 @@ public class IntTestUtil {
public static void disconnectAndMaybeDelete(XMPPTCPConnection connection, boolean delete) public static void disconnectAndMaybeDelete(XMPPTCPConnection connection, boolean delete)
throws InterruptedException { throws InterruptedException {
// If the connection is disconnected, then re-reconnect and login. This could happen when
// (low-level) integration tests disconnect the connection, e.g. to test disconnection
// mechanisms
if (!connection.isConnected()) {
try {
connection.connect().login();
}
catch (XMPPException | SmackException | IOException e) {
LOGGER.log(Level.WARNING, "Exception reconnection account for deletion", e);
}
}
try { try {
if (delete) { if (delete) {
final int maxAttempts = 3; final int maxAttempts = 3;

View File

@ -0,0 +1,45 @@
/**
*
* Copyright 2015 Florian Schmaus
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jivesoftware.smack;
import static org.junit.Assert.assertTrue;
import java.lang.reflect.Field;
import org.igniterealtime.smack.inttest.AbstractSmackLowLevelIntegrationTest;
import org.igniterealtime.smack.inttest.Configuration;
import org.igniterealtime.smack.inttest.SmackIntegrationTest;
import org.jivesoftware.smack.tcp.XMPPTCPConnection;
public class WaitForClosingStreamElementTest extends AbstractSmackLowLevelIntegrationTest {
public WaitForClosingStreamElementTest(Configuration configuration, String testRunId) {
super(configuration, testRunId);
}
@SmackIntegrationTest
public void waitForClosingStreamElementTest(XMPPTCPConnection connection)
throws NoSuchFieldException, SecurityException, IllegalArgumentException,
IllegalAccessException {
connection.disconnect();
Field closingStreamReceivedField = connection.getClass().getDeclaredField("closingStreamReceived");
closingStreamReceivedField.setAccessible(true);
SynchronizationPoint<?> closingStreamReceived = (SynchronizationPoint<?>) closingStreamReceivedField.get(connection);
assertTrue(closingStreamReceived.wasSuccessful());
}
}

View File

@ -188,6 +188,12 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
private final SynchronizationPoint<XMPPException> compressSyncPoint = new SynchronizationPoint<XMPPException>( private final SynchronizationPoint<XMPPException> compressSyncPoint = new SynchronizationPoint<XMPPException>(
this); this);
/**
* A synchronization point which is successful if this connection has received the closing
* stream element from the remote end-point, i.e. the server.
*/
private final SynchronizationPoint<Exception> closingStreamReceived = new SynchronizationPoint<Exception>(this);
/** /**
* The default bundle and defer callback, used for new connections. * The default bundle and defer callback, used for new connections.
* @see bundleAndDeferCallback * @see bundleAndDeferCallback
@ -458,16 +464,28 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
shutdown(true); shutdown(true);
} }
/**
* <code>true</code> if this connection is currently disconnecting the session by performing a
* shut down.
*/
private volatile boolean shutdownInProgress;
private void shutdown(boolean instant) { private void shutdown(boolean instant) {
if (disconnectedButResumeable) { if (disconnectedButResumeable) {
return; return;
} }
shutdownInProgress = true;
// First shutdown the writer, this will result in a closing stream element getting send to
// the server
if (packetWriter != null) {
packetWriter.shutdown(instant);
}
if (packetReader != null) { if (packetReader != null) {
packetReader.shutdown(); packetReader.shutdown();
} }
if (packetWriter != null) {
packetWriter.shutdown(instant);
}
// Set socketClosed to true. This will cause the PacketReader // Set socketClosed to true. This will cause the PacketReader
// and PacketWriter to ignore any Exceptions that are thrown // and PacketWriter to ignore any Exceptions that are thrown
@ -492,6 +510,7 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
// stream tag, there is no longer a stream to resume. // stream tag, there is no longer a stream to resume.
smSessionId = null; smSessionId = null;
} }
shutdownInProgress = false;
authenticated = false; authenticated = false;
connected = false; connected = false;
usingTLS = false; usingTLS = false;
@ -789,6 +808,7 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
*/ */
@Override @Override
protected void connectInternal() throws SmackException, IOException, XMPPException, InterruptedException { protected void connectInternal() throws SmackException, IOException, XMPPException, InterruptedException {
closingStreamReceived.init();
// Establishes the TCP connection to the server and does setup the reader and writer. Throws an exception if // Establishes the TCP connection to the server and does setup the reader and writer. Throws an exception if
// there is an error establishing the connection // there is an error establishing the connection
connectUsingConfiguration(); connectUsingConfiguration();
@ -1102,8 +1122,19 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
break; break;
case XmlPullParser.END_TAG: case XmlPullParser.END_TAG:
if (parser.getName().equals("stream")) { if (parser.getName().equals("stream")) {
// Disconnect the connection closingStreamReceived.reportSuccess();
disconnect(); if (shutdownInProgress) {
// We received a closing stream element *after* we initiated the
// termination of the session by sending a closing stream element to
// the server first
return;
} else {
// We received a closing stream element from the server without us
// sending a closing stream element first. This means that the
// server wants to terminate the session, therefore disconnect
// the connection
disconnect();
}
} }
break; break;
case XmlPullParser.END_DOCUMENT: case XmlPullParser.END_DOCUMENT:
@ -1116,6 +1147,7 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
} }
} }
catch (Exception e) { catch (Exception e) {
closingStreamReceived.reportFailure(e);
// The exception can be ignored if the the connection is 'done' // The exception can be ignored if the the connection is 'done'
// or if the it was caused because the socket got closed // or if the it was caused because the socket got closed
if (!(done || isSocketClosed())) { if (!(done || isSocketClosed())) {
@ -1362,6 +1394,18 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
catch (Exception e) { catch (Exception e) {
LOGGER.log(Level.WARNING, "Exception writing closing stream element", e); LOGGER.log(Level.WARNING, "Exception writing closing stream element", e);
} }
try {
// After we send the closing stream element, check if there was already a
// closing stream element sent by the server or wait with a timeout for a
// closing stream element to be received from the server.
closingStreamReceived.checkIfSuccessOrWait();
} catch (NoResponseException e) {
LOGGER.log(Level.INFO, "No response while waiting for closing stream element from the server (" + getConnectionCounter() + ")", e);
} catch (InterruptedException e) {
LOGGER.log(Level.INFO, "Waiting for closing stream element from the server was interrupted (" + getConnectionCounter() + ")", e);
}
// Delete the queue contents (hopefully nothing is left). // Delete the queue contents (hopefully nothing is left).
queue.clear(); queue.clear();
} else if (instantShutdown && isSmEnabled()) { } else if (instantShutdown && isSmEnabled()) {