Smack/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java

1907 lines
85 KiB
Java
Raw Normal View History

/**
*
* Copyright 2003-2007 Jive Software.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jivesoftware.smack.tcp;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.security.cert.CertificateException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.net.SocketFactory;
import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.SSLSession;
import javax.net.ssl.SSLSocket;
import javax.net.ssl.SSLSocketFactory;
import org.jivesoftware.smack.AbstractXMPPConnection;
import org.jivesoftware.smack.ConnectionConfiguration;
import org.jivesoftware.smack.ConnectionConfiguration.SecurityMode;
import org.jivesoftware.smack.ConnectionListener;
import org.jivesoftware.smack.SmackConfiguration;
import org.jivesoftware.smack.SmackException;
import org.jivesoftware.smack.SmackException.AlreadyConnectedException;
import org.jivesoftware.smack.SmackException.AlreadyLoggedInException;
import org.jivesoftware.smack.SmackException.ConnectionException;
import org.jivesoftware.smack.SmackException.EndpointConnectionException;
import org.jivesoftware.smack.SmackException.NotConnectedException;
import org.jivesoftware.smack.SmackException.NotLoggedInException;
import org.jivesoftware.smack.SmackException.SecurityNotPossibleException;
import org.jivesoftware.smack.SmackException.SecurityRequiredByServerException;
import org.jivesoftware.smack.SmackFuture;
import org.jivesoftware.smack.StanzaListener;
import org.jivesoftware.smack.XMPPConnection;
import org.jivesoftware.smack.XMPPException;
import org.jivesoftware.smack.XMPPException.FailedNonzaException;
import org.jivesoftware.smack.XMPPException.StreamErrorException;
import org.jivesoftware.smack.compress.packet.Compress;
import org.jivesoftware.smack.compress.packet.Compressed;
import org.jivesoftware.smack.compression.XMPPInputOutputStream;
import org.jivesoftware.smack.datatypes.UInt16;
import org.jivesoftware.smack.filter.StanzaFilter;
import org.jivesoftware.smack.internal.SmackTlsContext;
import org.jivesoftware.smack.packet.Element;
import org.jivesoftware.smack.packet.IQ;
import org.jivesoftware.smack.packet.Message;
import org.jivesoftware.smack.packet.Nonza;
import org.jivesoftware.smack.packet.Presence;
import org.jivesoftware.smack.packet.Stanza;
import org.jivesoftware.smack.packet.StartTls;
import org.jivesoftware.smack.packet.StreamError;
import org.jivesoftware.smack.packet.StreamOpen;
import org.jivesoftware.smack.proxy.ProxyInfo;
import org.jivesoftware.smack.sasl.packet.SaslNonza;
import org.jivesoftware.smack.sm.SMUtils;
import org.jivesoftware.smack.sm.StreamManagementException;
import org.jivesoftware.smack.sm.StreamManagementException.StreamIdDoesNotMatchException;
import org.jivesoftware.smack.sm.StreamManagementException.StreamManagementCounterError;
import org.jivesoftware.smack.sm.StreamManagementException.StreamManagementNotEnabledException;
import org.jivesoftware.smack.sm.packet.StreamManagement;
import org.jivesoftware.smack.sm.packet.StreamManagement.AckAnswer;
import org.jivesoftware.smack.sm.packet.StreamManagement.AckRequest;
import org.jivesoftware.smack.sm.packet.StreamManagement.Enable;
import org.jivesoftware.smack.sm.packet.StreamManagement.Enabled;
import org.jivesoftware.smack.sm.packet.StreamManagement.Failed;
import org.jivesoftware.smack.sm.packet.StreamManagement.Resume;
import org.jivesoftware.smack.sm.packet.StreamManagement.Resumed;
import org.jivesoftware.smack.sm.packet.StreamManagement.StreamManagementFeature;
import org.jivesoftware.smack.sm.predicates.Predicate;
import org.jivesoftware.smack.sm.provider.ParseStreamManagement;
import org.jivesoftware.smack.tcp.rce.RemoteXmppTcpConnectionEndpoints;
import org.jivesoftware.smack.tcp.rce.Rfc6120TcpRemoteConnectionEndpoint;
import org.jivesoftware.smack.util.ArrayBlockingQueueWithShutdown;
import org.jivesoftware.smack.util.Async;
2018-08-15 17:25:22 +02:00
import org.jivesoftware.smack.util.CloseableUtil;
import org.jivesoftware.smack.util.PacketParserUtils;
import org.jivesoftware.smack.util.StringUtils;
import org.jivesoftware.smack.util.TLSUtils;
import org.jivesoftware.smack.util.XmlStringBuilder;
import org.jivesoftware.smack.util.rce.RemoteConnectionException;
import org.jivesoftware.smack.xml.SmackXmlParser;
import org.jivesoftware.smack.xml.XmlPullParser;
import org.jivesoftware.smack.xml.XmlPullParserException;
import org.jxmpp.jid.impl.JidCreate;
import org.jxmpp.jid.parts.Resourcepart;
import org.jxmpp.stringprep.XmppStringprepException;
import org.minidns.dnsname.DnsName;
/**
2015-02-12 12:13:19 +01:00
* Creates a socket connection to an XMPP server. This is the default connection
* to an XMPP server and is specified in the XMPP Core (RFC 6120).
*
2014-03-10 18:31:45 +01:00
* @see XMPPConnection
* @author Matt Tucker
*/
public class XMPPTCPConnection extends AbstractXMPPConnection {
private static final int QUEUE_SIZE = 500;
private static final Logger LOGGER = Logger.getLogger(XMPPTCPConnection.class.getName());
/**
* The socket which is used for this connection.
*/
private Socket socket;
/**
*
*/
private boolean disconnectedButResumeable = false;
private SSLSocket secureSocket;
/**
* Protected access level because of unit test purposes
*/
protected final PacketWriter packetWriter = new PacketWriter();
/**
* Protected access level because of unit test purposes
*/
protected final PacketReader packetReader = new PacketReader();
/**
*
*/
private boolean streamFeaturesAfterAuthenticationReceived;
/**
*
*/
private boolean compressSyncPoint;
2015-02-22 10:33:08 +01:00
/**
* The default bundle and defer callback, used for new connections.
* @see bundleAndDeferCallback
*/
private static BundleAndDeferCallback defaultBundleAndDeferCallback;
2015-02-22 10:33:08 +01:00
/**
* The used bundle and defer callback.
* <p>
* Although this field may be set concurrently, the 'volatile' keyword was deliberately not added, in order to avoid
* having a 'volatile' read within the writer threads loop.
* </p>
*/
private BundleAndDeferCallback bundleAndDeferCallback = defaultBundleAndDeferCallback;
private static boolean useSmDefault = true;
private static boolean useSmResumptionDefault = true;
/**
* The stream ID of the stream that is currently resumable, ie. the stream we hold the state
* for in {@link #clientHandledStanzasCount}, {@link #serverHandledStanzasCount} and
* {@link #unacknowledgedStanzas}.
*/
private String smSessionId;
2020-06-12 21:16:05 +02:00
/**
* Represents the state of stream management resumption.
* <p>
* Unlike other sync points, this sync point is marked volatile because it is also read by the reader thread.
* </p>
*/
private volatile SyncPointState smResumedSyncPoint;
private Failed smResumptionFailed;
2020-06-12 21:16:05 +02:00
/**
* Represents the state of stream magement.
* <p>
* This boolean is marked volatile as it is read by various threads, including the reader thread via {@link #isSmEnabled()}.
* </p>
*/
private volatile boolean smEnabledSyncPoint;
/**
* The client's preferred maximum resumption time in seconds.
*/
private int smClientMaxResumptionTime = -1;
/**
* The server's preferred maximum resumption time in seconds.
*/
2017-12-13 23:10:11 +01:00
private int smServerMaxResumptionTime = -1;
/**
* Indicates whether Stream Management (XEP-198) should be used if it's supported by the server.
*/
private boolean useSm = useSmDefault;
private boolean useSmResumption = useSmResumptionDefault;
/**
* The counter that the server sends the client about it's current height. For example, if the server sends
* {@code <a h='42'/>}, then this will be set to 42 (while also handling the {@link #unacknowledgedStanzas} queue).
*/
private long serverHandledStanzasCount = 0;
/**
* The counter for stanzas handled ("received") by the client.
* <p>
* Note that we don't need to synchronize this counter. Although JLS 17.7 states that reads and writes to longs are
* not atomic, it guarantees that there are at most 2 separate writes, one to each 32-bit half. And since
* {@link SMUtils#incrementHeight(long)} masks the lower 32 bit, we only operate on one half of the long and
* therefore have no concurrency problem because the read/write operations on one half are guaranteed to be atomic.
* </p>
*/
private long clientHandledStanzasCount = 0;
private BlockingQueue<Stanza> unacknowledgedStanzas;
/**
* Set to true if Stream Management was at least once enabled for this connection.
*/
private boolean smWasEnabledAtLeastOnce = false;
/**
* This listeners are invoked for every stanza that got acknowledged.
* <p>
2018-03-09 14:15:46 +01:00
* We use a {@link ConcurrentLinkedQueue} here in order to allow the listeners to remove
* themselves after they have been invoked.
* </p>
*/
2017-12-13 23:10:11 +01:00
private final Collection<StanzaListener> stanzaAcknowledgedListeners = new ConcurrentLinkedQueue<>();
/**
* These listeners are invoked for every stanza that got dropped.
* <p>
* We use a {@link ConcurrentLinkedQueue} here in order to allow the listeners to remove
* themselves after they have been invoked.
* </p>
*/
private final Collection<StanzaListener> stanzaDroppedListeners = new ConcurrentLinkedQueue<>();
/**
* This listeners are invoked for a acknowledged stanza that has the given stanza ID. They will
* only be invoked once and automatically removed after that.
*/
2017-12-13 23:10:11 +01:00
private final Map<String, StanzaListener> stanzaIdAcknowledgedListeners = new ConcurrentHashMap<>();
/**
* Predicates that determine if an stream management ack should be requested from the server.
* <p>
* We use a linked hash set here, so that the order how the predicates are added matches the
* order in which they are invoked in order to determine if an ack request should be send or not.
* </p>
*/
2017-12-13 23:10:11 +01:00
private final Set<StanzaFilter> requestAckPredicates = new LinkedHashSet<>();
2018-03-28 15:56:04 +02:00
@SuppressWarnings("HidingField")
private final XMPPTCPConnectionConfiguration config;
/**
* Creates a new XMPP connection over TCP (optionally using proxies).
* <p>
* Note that XMPPTCPConnection constructors do not establish a connection to the server
* and you must call {@link #connect()}.
* </p>
*
* @param config the connection configuration.
*/
public XMPPTCPConnection(XMPPTCPConnectionConfiguration config) {
super(config);
this.config = config;
addConnectionListener(new ConnectionListener() {
@Override
public void connectionClosedOnError(Exception e) {
if (e instanceof XMPPException.StreamErrorException || e instanceof StreamManagementException) {
dropSmState();
}
}
});
// Re-init the reader and writer in case of SASL <success/>. This is done to reset the parser since a new stream
// is initiated.
buildNonzaCallback().listenFor(SaslNonza.Success.class, s -> resetParser()).install();
}
/**
* Creates a new XMPP connection over TCP.
* <p>
* Note that {@code jid} must be the bare JID, e.g. "user@example.org". More fine-grained control over the
* connection settings is available using the {@link #XMPPTCPConnection(XMPPTCPConnectionConfiguration)}
* constructor.
* </p>
*
* @param jid the bare JID used by the client.
* @param password the password or authentication token.
* @throws XmppStringprepException if the provided string is invalid.
*/
public XMPPTCPConnection(CharSequence jid, String password) throws XmppStringprepException {
this(XMPPTCPConnectionConfiguration.builder().setXmppAddressAndPassword(jid, password).build());
}
/**
* Creates a new XMPP connection over TCP.
* <p>
* This is the simplest constructor for connecting to an XMPP server. Alternatively,
* you can get fine-grained control over connection settings using the
* {@link #XMPPTCPConnection(XMPPTCPConnectionConfiguration)} constructor.
* </p>
* @param username TODO javadoc me please
* @param password TODO javadoc me please
* @param serviceName TODO javadoc me please
* @throws XmppStringprepException if the provided string is invalid.
*/
public XMPPTCPConnection(CharSequence username, String password, String serviceName) throws XmppStringprepException {
this(XMPPTCPConnectionConfiguration.builder().setUsernameAndPassword(username, password).setXmppDomain(
JidCreate.domainBareFrom(serviceName)).build());
}
@Override
protected void throwNotConnectedExceptionIfAppropriate() throws NotConnectedException {
if (packetWriter == null) {
throw new NotConnectedException();
}
packetWriter.throwNotConnectedExceptionIfDoneAndResumptionNotPossible();
}
@Override
protected void throwAlreadyConnectedExceptionIfAppropriate() throws AlreadyConnectedException {
if (isConnected() && !disconnectedButResumeable) {
throw new AlreadyConnectedException();
}
}
@Override
protected void throwAlreadyLoggedInExceptionIfAppropriate() throws AlreadyLoggedInException {
if (isAuthenticated() && !disconnectedButResumeable) {
throw new AlreadyLoggedInException();
}
}
@Override
protected void afterSuccessfulLogin(final boolean resumed) throws NotConnectedException, InterruptedException {
// Reset the flag in case it was set
disconnectedButResumeable = false;
super.afterSuccessfulLogin(resumed);
}
@Override
protected synchronized void loginInternal(String username, String password, Resourcepart resource) throws XMPPException,
SmackException, IOException, InterruptedException {
// Authenticate using SASL
SSLSession sslSession = secureSocket != null ? secureSocket.getSession() : null;
streamFeaturesAfterAuthenticationReceived = false;
authenticate(username, password, config.getAuthzid(), sslSession);
// Wait for stream features after the authentication.
// TODO: The name of this synchronization point "maybeCompressFeaturesReceived" is not perfect. It should be
// renamed to "streamFeaturesAfterAuthenticationReceived".
waitForConditionOrThrowConnectionException(() -> streamFeaturesAfterAuthenticationReceived, "compress features from server");
// If compression is enabled then request the server to use stream compression. XEP-170
// recommends to perform stream compression before resource binding.
maybeEnableCompression();
smResumedSyncPoint = SyncPointState.initial;
smResumptionFailed = null;
if (isSmResumptionPossible()) {
smResumedSyncPoint = SyncPointState.request_sent;
sendNonza(new Resume(clientHandledStanzasCount, smSessionId));
waitForConditionOrThrowConnectionException(() -> smResumedSyncPoint == SyncPointState.successful || smResumptionFailed != null, "resume previous stream");
if (smResumedSyncPoint == SyncPointState.successful) {
// We successfully resumed the stream, be done here
afterSuccessfulLogin(true);
return;
}
// SM resumption failed, what Smack does here is to report success of
// lastFeaturesReceived in case of sm resumption was answered with 'failed' so that
// normal resource binding can be tried.
assert smResumptionFailed != null;
LOGGER.fine("Stream resumption failed, continuing with normal stream establishment process: " + smResumptionFailed);
}
// We either failed to resume a previous stream management (SM) session, or we did not even try. In any case,
// mark SM as not enabled. Most importantly, we do this prior calling bindResourceAndEstablishSession(), as the
// bind IQ may trigger a SM ack request, which would be invalid in the pre resource bound state.
smEnabledSyncPoint = false;
List<Stanza> previouslyUnackedStanzas = new LinkedList<Stanza>();
if (unacknowledgedStanzas != null) {
// There was a previous connection with SM enabled but that was either not resumable or
// failed to resume. Make sure that we (re-)send the unacknowledged stanzas.
unacknowledgedStanzas.drainTo(previouslyUnackedStanzas);
// Reset unacknowledged stanzas to 'null' to signal that we never send 'enable' in this
// XMPP session (There maybe was an enabled in a previous XMPP session of this
// connection instance though). This is used in writePackets to decide if stanzas should
// be added to the unacknowledged stanzas queue, because they have to be added right
// after the 'enable' stream element has been sent.
dropSmState();
}
// Now bind the resource. It is important to do this *after* we dropped an eventually
// existing Stream Management state. As otherwise <bind/> and <session/> may end up in
// unacknowledgedStanzas and become duplicated on reconnect. See SMACK-706.
bindResourceAndEstablishSession(resource);
if (isSmAvailable() && useSm) {
// Remove what is maybe left from previously stream managed sessions
serverHandledStanzasCount = 0;
sendNonza(new Enable(useSmResumption, smClientMaxResumptionTime));
// XEP-198 3. Enabling Stream Management. If the server response to 'Enable' is 'Failed'
// then this is a non recoverable error and we therefore throw an exception.
waitForConditionOrThrowConnectionException(() -> smEnabledSyncPoint, "enabling stream mangement");
synchronized (requestAckPredicates) {
if (requestAckPredicates.isEmpty()) {
// Assure that we have at lest one predicate set up that so that we request acks
// for the server and eventually flush some stanzas from the unacknowledged
// stanza queue
requestAckPredicates.add(Predicate.forMessagesOrAfter5Stanzas());
}
}
}
// Inform client about failed resumption if possible, resend stanzas otherwise
// Process the stanzas synchronously so a client can re-queue them for transmission
// before it is informed about connection success
if (!stanzaDroppedListeners.isEmpty()) {
for (Stanza stanza : previouslyUnackedStanzas) {
for (StanzaListener listener : stanzaDroppedListeners) {
try {
listener.processStanza(stanza);
}
catch (InterruptedException | NotConnectedException | NotLoggedInException e) {
LOGGER.log(Level.FINER, "StanzaDroppedListener received exception", e);
}
}
}
} else {
for (Stanza stanza : previouslyUnackedStanzas) {
sendStanzaInternal(stanza);
}
}
afterSuccessfulLogin(false);
}
@Override
public boolean isSecureConnection() {
return secureSocket != null;
}
/**
* Shuts the current connection down. After this method returns, the connection must be ready
* for re-use by connect.
*/
@Override
protected void shutdown() {
2014-10-28 12:22:16 +01:00
if (isSmEnabled()) {
try {
// Try to send a last SM Acknowledgement. Most servers won't find this information helpful, as the SM
// state is dropped after a clean disconnect anyways. OTOH it doesn't hurt much either.
sendSmAcknowledgementInternal();
} catch (InterruptedException | NotConnectedException e) {
2014-10-28 12:22:16 +01:00
LOGGER.log(Level.FINE, "Can not send final SM ack as connection is not connected", e);
}
}
shutdown(false);
}
@Override
public synchronized void instantShutdown() {
shutdown(true);
}
private void shutdown(boolean instant) {
// The writer thread may already been finished at this point, for example when the connection is in the
// disconnected-but-resumable state. There is no need to wait for the closing stream tag from the server in this
// case.
if (!packetWriter.done()) {
// First shutdown the writer, this will result in a closing stream element getting send to
// the server
LOGGER.finer(packetWriter.threadName + " shutdown()");
packetWriter.shutdown(instant);
LOGGER.finer(packetWriter.threadName + " shutdown() returned");
if (!instant) {
waitForClosingStreamTagFromServer();
}
}
LOGGER.finer(packetReader.threadName + " shutdown()");
packetReader.shutdown();
LOGGER.finer(packetReader.threadName + " shutdown() returned");
2018-08-15 17:25:22 +02:00
CloseableUtil.maybeClose(socket, LOGGER);
setWasAuthenticated();
try {
boolean readerAndWriterThreadsTermianted = waitFor(() -> !packetWriter.running && !packetReader.running);
if (!readerAndWriterThreadsTermianted) {
LOGGER.severe("Reader and/or writer threads did not terminate timely. Writer running: "
+ packetWriter.running + ", Reader running: " + packetReader.running);
} else {
LOGGER.fine("Reader and writer threads terminated");
}
} catch (InterruptedException e) {
LOGGER.log(Level.FINE, "Interrupted while waiting for reader and writer threads to terminate", e);
}
if (disconnectedButResumeable) {
return;
}
// If we are able to resume the stream, then don't set
// connected/authenticated/usingTLS to false since we like to behave like we are still
// connected (e.g. sendStanza should not throw a NotConnectedException).
if (instant) {
disconnectedButResumeable = isSmResumptionPossible();
if (!disconnectedButResumeable) {
// Reset the stream management session id to null, since the stream is no longer resumable. Note that we
// keep the unacknowledgedStanzas queue, because we want to resend them when we are reconnected.
smSessionId = null;
}
} else {
disconnectedButResumeable = false;
// Drop the stream management state if this is not an instant shutdown. We send
// a </stream> close tag and now the stream management state is no longer valid.
// This also prevents that we will potentially (re-)send any unavailable presence we
// may have send, because it got put into the unacknowledged queue and was not acknowledged before the
// connection terminated.
dropSmState();
// Note that we deliberately do not reset authenticatedConnectionInitiallyEstablishedTimestamp here, so that the
// information is available in the connectionClosedOnError() listeners.
}
authenticated = false;
connected = false;
secureSocket = null;
reader = null;
writer = null;
initState();
}
@Override
public void sendNonza(Nonza element) throws NotConnectedException, InterruptedException {
packetWriter.sendStreamElement(element);
}
@Override
protected void sendStanzaInternal(Stanza packet) throws NotConnectedException, InterruptedException {
packetWriter.sendStreamElement(packet);
if (isSmEnabled()) {
for (StanzaFilter requestAckPredicate : requestAckPredicates) {
if (requestAckPredicate.accept(packet)) {
requestSmAcknowledgementInternal();
break;
}
}
}
}
private void connectUsingConfiguration() throws ConnectionException, IOException, InterruptedException {
RemoteXmppTcpConnectionEndpoints.Result<Rfc6120TcpRemoteConnectionEndpoint> result = RemoteXmppTcpConnectionEndpoints.lookup(config);
List<RemoteConnectionException<Rfc6120TcpRemoteConnectionEndpoint>> connectionExceptions = new ArrayList<>();
SocketFactory socketFactory = config.getSocketFactory();
ProxyInfo proxyInfo = config.getProxyInfo();
int timeout = config.getConnectTimeout();
if (socketFactory == null) {
socketFactory = SocketFactory.getDefault();
}
for (Rfc6120TcpRemoteConnectionEndpoint endpoint : result.discoveredRemoteConnectionEndpoints) {
Iterator<? extends InetAddress> inetAddresses;
String host = endpoint.getHost().toString();
UInt16 portUint16 = endpoint.getPort();
int port = portUint16.intValue();
if (proxyInfo == null) {
inetAddresses = endpoint.getInetAddresses().iterator();
assert inetAddresses.hasNext();
innerloop: while (inetAddresses.hasNext()) {
// Create a *new* Socket before every connection attempt, i.e. connect() call, since Sockets are not
// re-usable after a failed connection attempt. See also SMACK-724.
SmackFuture.SocketFuture socketFuture = new SmackFuture.SocketFuture(socketFactory);
final InetAddress inetAddress = inetAddresses.next();
final InetSocketAddress inetSocketAddress = new InetSocketAddress(inetAddress, port);
LOGGER.finer("Trying to establish TCP connection to " + inetSocketAddress);
socketFuture.connectAsync(inetSocketAddress, timeout);
try {
socket = socketFuture.getOrThrow();
} catch (IOException e) {
RemoteConnectionException<Rfc6120TcpRemoteConnectionEndpoint> rce = new RemoteConnectionException<>(
endpoint, inetAddress, e);
connectionExceptions.add(rce);
if (inetAddresses.hasNext()) {
continue innerloop;
} else {
break innerloop;
}
}
LOGGER.finer("Established TCP connection to " + inetSocketAddress);
// We found a host to connect to, return here
this.host = host;
this.port = portUint16;
return;
}
} else {
// TODO: Move this into the inner-loop above. There appears no reason why we should not try a proxy
// connection to every inet address of each connection endpoint.
socket = socketFactory.createSocket();
StringUtils.requireNotNullNorEmpty(host, "Host of endpoint " + endpoint + " must not be null when using a Proxy");
final String hostAndPort = host + " at port " + port;
LOGGER.finer("Trying to establish TCP connection via Proxy to " + hostAndPort);
try {
proxyInfo.getProxySocketConnection().connect(socket, host, port, timeout);
} catch (IOException e) {
CloseableUtil.maybeClose(socket, LOGGER);
RemoteConnectionException<Rfc6120TcpRemoteConnectionEndpoint> rce = new RemoteConnectionException<>(endpoint, null, e);
connectionExceptions.add(rce);
continue;
}
LOGGER.finer("Established TCP connection to " + hostAndPort);
// We found a host to connect to, return here
this.host = host;
this.port = portUint16;
return;
}
}
// There are no more host addresses to try
// throw an exception and report all tried
// HostAddresses in the exception
throw EndpointConnectionException.from(result.lookupFailures, connectionExceptions);
}
/**
2018-03-31 14:17:30 +02:00
* Initializes the connection by creating a stanza reader and writer and opening a
* XMPP stream to the server.
*
2019-10-30 12:02:36 +01:00
* @throws IOException if an I/O error occurred.
* @throws InterruptedException if the calling thread was interrupted.
*/
private void initConnection() throws IOException, InterruptedException {
compressionHandler = null;
// Set the reader and writer instance variables
initReaderAndWriter();
// Start the writer thread. This will open an XMPP stream to the server
packetWriter.init();
// Start the reader thread. The startup() method will block until we
// get an opening stream packet back from server
packetReader.init();
}
private void initReaderAndWriter() throws IOException {
InputStream is = socket.getInputStream();
OutputStream os = socket.getOutputStream();
if (compressionHandler != null) {
is = compressionHandler.getInputStream(is);
os = compressionHandler.getOutputStream(os);
}
// OutputStreamWriter is already buffered, no need to wrap it into a BufferedWriter
writer = new OutputStreamWriter(os, "UTF-8");
reader = new BufferedReader(new InputStreamReader(is, "UTF-8"));
// If debugging is enabled, we open a window and write out all network traffic.
initDebugger();
}
/**
* The server has indicated that TLS negotiation can start. We now need to secure the
* existing plain connection and perform a handshake. This method won't return until the
* connection has finished the handshake or an error occurred while securing the connection.
2019-10-30 12:02:36 +01:00
* @throws IOException if an I/O error occurred.
* @throws SecurityNotPossibleException if TLS is not possible.
* @throws CertificateException if there is an issue with the certificate.
*/
@SuppressWarnings("LiteralClassName")
private void proceedTLSReceived() throws IOException, SecurityNotPossibleException, CertificateException {
SmackTlsContext smackTlsContext = getSmackTlsContext();
Socket plain = socket;
int port = plain.getPort();
String xmppServiceDomainString = config.getXMPPServiceDomain().toString();
SSLSocketFactory sslSocketFactory = smackTlsContext.sslContext.getSocketFactory();
// Secure the plain connection
socket = sslSocketFactory.createSocket(plain, xmppServiceDomainString, port, true);
final SSLSocket sslSocket = (SSLSocket) socket;
// Immediately set the enabled SSL protocols and ciphers. See SMACK-712 why this is
// important (at least on certain platforms) and it seems to be a good idea anyways to
// prevent an accidental implicit handshake.
TLSUtils.setEnabledProtocolsAndCiphers(sslSocket, config.getEnabledSSLProtocols(), config.getEnabledSSLCiphers());
// Initialize the reader and writer with the new secured version
initReaderAndWriter();
// Proceed to do the handshake
sslSocket.startHandshake();
if (smackTlsContext.daneVerifier != null) {
smackTlsContext.daneVerifier.finish(sslSocket.getSession());
}
2014-07-21 18:42:44 +02:00
final HostnameVerifier verifier = getConfiguration().getHostnameVerifier();
if (verifier == null) {
throw new IllegalStateException("No HostnameVerifier set. Use connectionConfiguration.setHostnameVerifier() to configure.");
}
final String verifierHostname;
{
DnsName xmppServiceDomainDnsName = getConfiguration().getXmppServiceDomainAsDnsNameIfPossible();
// Try to convert the XMPP service domain, which potentially includes Unicode characters, into ASCII
// Compatible Encoding (ACE) to match RFC3280 dNSname IA5String constraint.
// See also: https://bugzilla.mozilla.org/show_bug.cgi?id=280839#c1
if (xmppServiceDomainDnsName != null) {
verifierHostname = xmppServiceDomainDnsName.ace;
}
else {
LOGGER.log(Level.WARNING, "XMPP service domain name '" + getXMPPServiceDomain()
+ "' can not be represented as DNS name. TLS X.509 certificate validiation may fail.");
verifierHostname = getXMPPServiceDomain().toString();
}
}
final boolean verificationSuccessful;
// Verify the TLS session.
verificationSuccessful = verifier.verify(verifierHostname, sslSocket.getSession());
if (!verificationSuccessful) {
throw new CertificateException(
"Hostname verification of certificate failed. Certificate does not authenticate "
+ getXMPPServiceDomain());
2014-07-21 18:42:44 +02:00
}
// Set that TLS was successful
secureSocket = sslSocket;
}
/**
* Returns the compression handler that can be used for one compression methods offered by the server.
*
* @return a instance of XMPPInputOutputStream or null if no suitable instance was found
*
*/
private static XMPPInputOutputStream maybeGetCompressionHandler(Compress.Feature compression) {
2017-12-13 23:10:11 +01:00
for (XMPPInputOutputStream handler : SmackConfiguration.getCompressionHandlers()) {
String method = handler.getCompressionMethod();
if (compression.getMethods().contains(method))
return handler;
}
return null;
}
@Override
public boolean isUsingCompression() {
return compressionHandler != null && compressSyncPoint;
}
/**
* <p>
* Starts using stream compression that will compress network traffic. Traffic can be
* reduced up to 90%. Therefore, stream compression is ideal when using a slow speed network
* connection. However, the server and the client will need to use more CPU time in order to
* un/compress network data so under high load the server performance might be affected.
* </p>
* <p>
* Stream compression has to have been previously offered by the server. Currently only the
* zlib method is supported by the client. Stream compression negotiation has to be done
* before authentication took place.
* </p>
*
* @throws NotConnectedException if the XMPP connection is not connected.
* @throws SmackException if Smack detected an exceptional situation.
* @throws InterruptedException if the calling thread was interrupted.
* @throws XMPPException if an XMPP protocol error was received.
*/
private void maybeEnableCompression() throws SmackException, InterruptedException, XMPPException {
if (!config.isCompressionEnabled()) {
return;
}
Compress.Feature compression = getFeature(Compress.Feature.class);
if (compression == null) {
// Server does not support compression
return;
}
// If stream compression was offered by the server and we want to use
// compression then send compression request to the server
if ((compressionHandler = maybeGetCompressionHandler(compression)) != null) {
compressSyncPoint = false;
sendNonza(new Compress(compressionHandler.getCompressionMethod()));
waitForConditionOrThrowConnectionException(() -> compressSyncPoint, "establishing stream compression");
} else {
LOGGER.warning("Could not enable compression because no matching handler/method pair was found");
}
}
/**
* Establishes a connection to the XMPP server. It basically
* creates and maintains a socket connection to the server.
* <p>
* Listeners will be preserved from a previous connection if the reconnection
* occurs after an abrupt termination.
* </p>
*
* @throws XMPPException if an error occurs while trying to establish the connection.
* @throws SmackException if Smack detected an exceptional situation.
2019-10-30 12:02:36 +01:00
* @throws IOException if an I/O error occurred.
* @throws InterruptedException if the calling thread was interrupted.
*/
@Override
protected void connectInternal() throws SmackException, IOException, XMPPException, InterruptedException {
// Establishes the TCP connection to the server and does setup the reader and writer. Throws an exception if
// there is an error establishing the connection
connectUsingConfiguration();
Set 'connected' to 'true' as early as possible We previously only set 'connected' after connectInternal() returned. This could lead to notifyConnectionError() ignoring stream error exceptions, e.g. when establishing TLS which happens also in connectInternal(), because 'connected' was still 'false'. 2020-08-06 13:08:06.265 19830-20423/org.atalk.android D/SMACK: SENT (0): <stream:stream xmlns='jabber:client' to='atalk.sytes.net' xmlns:stream='http://etherx.jabber.org/streams' version='1.0' xml:lang='en'> 2020-08-06 13:08:06.333 19830-20424/org.atalk.android D/SMACK: RECV (0): ?xml version='1.0'?> <stream:stream id='16420577292739412012' version='1.0' xml:lang='en' xmlns:stream='http://etherx.jabber.org/streams' from='atalk.sytes.net' xmlns='jabber:client'> <stream:error> <policy-violation xmlns='urn:ietf:params:xml:ns:xmpp-streams'/> <text xml:lang='en' xmlns='urn:ietf:params:xml:ns:xmpp-streams'> Too many (20) failed authentications from this IP address (::ffff:42.60.7.13). The address will be unblocked at 05:15:34 06.08.2020 UTC </text> </stream:error> </stream:stream> 2020-08-06 13:08:06.346 19830-20424/org.atalk.android I/aTalk: [241896] org.jivesoftware.smack.AbstractXMPPConnection.notifyConnectionError() Connection was already disconnected when attempting to handle org.jivesoftware.smack.XMPPException$StreamErrorException: policy-violation You can read more about the meaning of this stream error at http://xmpp.org/rfcs/rfc6120.html#streams-error-conditions <stream:error><policy-violation xmlns='urn:ietf:params:xml:ns:xmpp-streams'/><text xml:lang='en'>Too many (20) failed authentications from this IP address (::ffff:42.60.7.13). The address will be unblocked at 05:15:34 06.08.2020 UTC</text></stream:error> org.jivesoftware.smack.XMPPException$StreamErrorException: policy-violation You can read more about the meaning of this stream error at http://xmpp.org/rfcs/rfc6120.html#streams-error-conditions <stream:error><policy-violation xmlns='urn:ietf:params:xml:ns:xmpp-streams'/><text xml:lang='en'>Too many (20) failed authentications from this IP address (::ffff:42.60.7.13). The address will be unblocked at 05:15:34 06.08.2020 UTC</text></stream:error> at org.jivesoftware.smack.tcp.XMPPTCPConnection$PacketReader.parsePackets(XMPPTCPConnection.java:966) at org.jivesoftware.smack.tcp.XMPPTCPConnection$PacketReader.access$700(XMPPTCPConnection.java:898) at org.jivesoftware.smack.tcp.XMPPTCPConnection$PacketReader$1.run(XMPPTCPConnection.java:921) at java.lang.Thread.run(Thread.java:919) Which eventually leads to a NoResponseException org.jivesoftware.smack.SmackException$NoResponseException: No response received within reply timeout. Timeout was 30000ms (~30s). While waiting for establishing TLS [XMPPTCPConnection[not-authenticated] (4)] We now set 'connected' to 'true' as soon as the transport (e.g. TCP, BOSH, …) is connected. While this is in other ways also sensible, it also allows notifyConnectionError() to handle exceptions in the early connection stage. Thanks to Eng Chong Meng for reporting this.
2020-08-06 10:28:07 +02:00
connected = true;
// We connected successfully to the servers TCP port
initConnection();
// TLS handled will be true either if TLS was established, or if it was not mandatory.
waitForConditionOrThrowConnectionException(() -> tlsHandled, "establishing TLS");
// Wait with SASL auth until the SASL mechanisms have been received
waitForConditionOrThrowConnectionException(() -> saslFeatureReceived, "SASL mechanisms stream feature from server");
}
/**
* For unit testing purposes
*
* @param writer TODO javadoc me please
*/
protected void setWriter(Writer writer) {
this.writer = writer;
}
@Override
protected void afterFeaturesReceived() throws NotConnectedException, InterruptedException, SecurityRequiredByServerException {
StartTls startTlsFeature = getFeature(StartTls.class);
if (startTlsFeature != null) {
if (startTlsFeature.required() && config.getSecurityMode() == SecurityMode.disabled) {
SecurityRequiredByServerException smackException = new SecurityRequiredByServerException();
currentSmackException = smackException;
notifyWaitingThreads();
throw smackException;
}
if (config.getSecurityMode() != ConnectionConfiguration.SecurityMode.disabled) {
sendNonza(new StartTls());
} else {
tlsHandled = true;
notifyWaitingThreads();
}
} else {
tlsHandled = true;
notifyWaitingThreads();
}
if (isSaslAuthenticated()) {
// If we have received features after the SASL has been successfully completed, then we
// have also *maybe* received, as it is an optional feature, the compression feature
// from the server.
streamFeaturesAfterAuthenticationReceived = true;
notifyWaitingThreads();
}
}
private void resetParser() throws IOException {
try {
packetReader.parser = SmackXmlParser.newXmlParser(reader);
} catch (XmlPullParserException e) {
throw new IOException(e);
}
}
private void openStreamAndResetParser() throws IOException, NotConnectedException, InterruptedException {
sendStreamOpen();
resetParser();
}
protected class PacketReader {
private final String threadName = "Smack Reader (" + getConnectionCounter() + ')';
XmlPullParser parser;
private volatile boolean done;
private boolean running;
/**
* Initializes the reader in order to be used. The reader is initialized during the
* first connection and when reconnecting due to an abruptly disconnection.
*/
void init() {
done = false;
running = true;
Async.go(new Runnable() {
@Override
public void run() {
LOGGER.finer(threadName + " start");
try {
parsePackets();
} finally {
LOGGER.finer(threadName + " exit");
running = false;
notifyWaitingThreads();
}
}
}, threadName);
}
/**
2018-03-31 14:17:30 +02:00
* Shuts the stanza reader down. This method simply sets the 'done' flag to true.
*/
void shutdown() {
done = true;
}
/**
* Parse top-level packets in order to process them further.
*/
private void parsePackets() {
try {
openStreamAndResetParser();
XmlPullParser.Event eventType = parser.getEventType();
while (!done) {
switch (eventType) {
case START_ELEMENT:
final String name = parser.getName();
final String namespace = parser.getNamespace();
switch (name) {
case Message.ELEMENT:
case IQ.IQ_ELEMENT:
case Presence.ELEMENT:
try {
parseAndProcessStanza(parser);
} finally {
clientHandledStanzasCount = SMUtils.incrementHeight(clientHandledStanzasCount);
}
break;
case "stream":
if (StreamOpen.ETHERX_JABBER_STREAMS_NAMESPACE.equals(namespace)) {
onStreamOpen(parser);
}
break;
case "error":
StreamError streamError = PacketParserUtils.parseStreamError(parser);
// Stream errors are non recoverable, throw this exceptions. Also note that this will set
// this exception as current connection exceptions and notify any waiting threads.
throw new StreamErrorException(streamError);
case "features":
parseFeaturesAndNotify(parser);
break;
case "proceed":
// Secure the connection by negotiating TLS
proceedTLSReceived();
// Send a new opening stream to the server
openStreamAndResetParser();
break;
case "failure":
switch (namespace) {
case "urn:ietf:params:xml:ns:xmpp-tls":
// TLS negotiation has failed. The server will close the connection
// TODO Parse failure stanza
2019-02-10 21:39:48 +01:00
throw new SmackException.SmackMessageException("TLS negotiation has failed");
case "http://jabber.org/protocol/compress":
// Stream compression has been denied. This is a recoverable
// situation. It is still possible to authenticate and
// use the connection but using an uncompressed connection
// TODO Parse failure stanza
currentSmackException = new SmackException.SmackMessageException("Could not establish compression");
notifyWaitingThreads();
break;
default:
parseAndProcessNonza(parser);
}
break;
case Compressed.ELEMENT:
// Server confirmed that it's possible to use stream compression. Start
// stream compression
// Initialize the reader and writer with the new compressed version
initReaderAndWriter();
// Send a new opening stream to the server
openStreamAndResetParser();
// Notify that compression is being used
compressSyncPoint = true;
notifyWaitingThreads();
break;
case Enabled.ELEMENT:
Enabled enabled = ParseStreamManagement.enabled(parser);
if (enabled.isResumeSet()) {
smSessionId = enabled.getId();
if (StringUtils.isNullOrEmpty(smSessionId)) {
2019-02-10 21:39:48 +01:00
SmackException xmppException = new SmackException.SmackMessageException("Stream Management 'enabled' element with resume attribute but without session id received");
setCurrentConnectionExceptionAndNotify(xmppException);
throw xmppException;
}
2017-12-13 23:10:11 +01:00
smServerMaxResumptionTime = enabled.getMaxResumptionTime();
} else {
// Mark this a non-resumable stream by setting smSessionId to null
smSessionId = null;
}
clientHandledStanzasCount = 0;
smWasEnabledAtLeastOnce = true;
smEnabledSyncPoint = true;
notifyWaitingThreads();
break;
case Failed.ELEMENT:
Failed failed = ParseStreamManagement.failed(parser);
if (smResumedSyncPoint == SyncPointState.request_sent) {
// This is a <failed/> nonza in a response to resuming a previous stream, failure to do
// so is non-fatal as we can simply continue with resource binding in this case.
smResumptionFailed = failed;
notifyWaitingThreads();
} else {
FailedNonzaException xmppException = new FailedNonzaException(failed, failed.getStanzaErrorCondition());
setCurrentConnectionExceptionAndNotify(xmppException);
}
break;
case Resumed.ELEMENT:
Resumed resumed = ParseStreamManagement.resumed(parser);
if (!smSessionId.equals(resumed.getPrevId())) {
throw new StreamIdDoesNotMatchException(smSessionId, resumed.getPrevId());
}
// Mark SM as enabled
smEnabledSyncPoint = true;
// First, drop the stanzas already handled by the server
processHandledCount(resumed.getHandledCount());
// Then re-send what is left in the unacknowledged queue
List<Stanza> stanzasToResend = new ArrayList<>(unacknowledgedStanzas.size());
unacknowledgedStanzas.drainTo(stanzasToResend);
for (Stanza stanza : stanzasToResend) {
sendStanzaInternal(stanza);
}
// If there where stanzas resent, then request a SM ack for them.
// Writer's sendStreamElement() won't do it automatically based on
// predicates.
if (!stanzasToResend.isEmpty()) {
requestSmAcknowledgementInternal();
}
// Mark SM resumption as successful
smResumedSyncPoint = SyncPointState.successful;
notifyWaitingThreads();
break;
case AckAnswer.ELEMENT:
AckAnswer ackAnswer = ParseStreamManagement.ackAnswer(parser);
processHandledCount(ackAnswer.getHandledCount());
break;
case AckRequest.ELEMENT:
ParseStreamManagement.ackRequest(parser);
if (smEnabledSyncPoint) {
sendSmAcknowledgementInternal();
} else {
LOGGER.warning("SM Ack Request received while SM is not enabled");
}
break;
default:
parseAndProcessNonza(parser);
break;
}
break;
case END_ELEMENT:
final String endTagName = parser.getName();
if ("stream".equals(endTagName)) {
if (!parser.getNamespace().equals("http://etherx.jabber.org/streams")) {
LOGGER.warning(XMPPTCPConnection.this + " </stream> but different namespace " + parser.getNamespace());
break;
}
// Check if the queue was already shut down before reporting success on closing stream tag
// received. This avoids a race if there is a disconnect(), followed by a connect(), which
// did re-start the queue again, causing this writer to assume that the queue is not
// shutdown, which results in a call to disconnect().
final boolean queueWasShutdown = packetWriter.queue.isShutdown();
closingStreamReceived = true;
notifyWaitingThreads();
if (queueWasShutdown) {
// We received a closing stream element *after* we initiated the
// termination of the session by sending a closing stream element to
// the server first
return;
} else {
// We received a closing stream element from the server without us
// sending a closing stream element first. This means that the
// server wants to terminate the session, therefore disconnect
// the connection
LOGGER.info(XMPPTCPConnection.this
+ " received closing </stream> element."
+ " Server wants to terminate the connection, calling disconnect()");
ASYNC_BUT_ORDERED.performAsyncButOrdered(XMPPTCPConnection.this, new Runnable() {
@Override
public void run() {
disconnect();
}});
}
}
break;
case END_DOCUMENT:
// END_DOCUMENT only happens in an error case, as otherwise we would see a
// closing stream element before.
2019-02-10 21:39:48 +01:00
throw new SmackException.SmackMessageException(
"Parser got END_DOCUMENT event. This could happen e.g. if the server closed the connection without sending a closing stream element");
default:
// Catch all for incomplete switch (MissingCasesInEnumSwitch) statement.
break;
}
eventType = parser.next();
}
}
catch (Exception e) {
// Set running to false since this thread will exit here and notifyConnectionError() will wait until
// the reader and writer thread's 'running' value is false. Hence we need to set it to false before calling
// notifyConnetctionError() below, even though run() also sets it to false. Therefore, do not remove this.
running = false;
String ignoreReasonThread = null;
boolean writerThreadWasShutDown = packetWriter.queue.isShutdown();
if (writerThreadWasShutDown) {
ignoreReasonThread = "writer";
} else if (done) {
ignoreReasonThread = "reader";
}
if (ignoreReasonThread != null) {
LOGGER.log(Level.FINER, "Ignoring " + e + " as " + ignoreReasonThread + " was already shut down");
return;
}
// Close the connection and notify connection listeners of the error.
notifyConnectionError(e);
}
}
}
protected class PacketWriter {
public static final int QUEUE_SIZE = XMPPTCPConnection.QUEUE_SIZE;
public static final int UNACKKNOWLEDGED_STANZAS_QUEUE_SIZE = 1024;
public static final int UNACKKNOWLEDGED_STANZAS_QUEUE_SIZE_HIGH_WATER_MARK = (int) (0.3 * UNACKKNOWLEDGED_STANZAS_QUEUE_SIZE);
private final String threadName = "Smack Writer (" + getConnectionCounter() + ')';
2017-12-13 23:10:11 +01:00
private final ArrayBlockingQueueWithShutdown<Element> queue = new ArrayBlockingQueueWithShutdown<>(
QUEUE_SIZE, true);
/**
2018-03-31 14:17:30 +02:00
* If set, the stanza writer is shut down
*/
protected volatile Long shutdownTimestamp = null;
private volatile boolean instantShutdown;
/**
* True if some preconditions are given to start the bundle and defer mechanism.
* <p>
* This will likely get set to true right after the start of the writer thread, because
* {@link #nextStreamElement()} will check if {@link queue} is empty, which is probably the case, and then set
* this field to true.
* </p>
*/
private boolean shouldBundleAndDefer;
private boolean running;
/**
* Initializes the writer in order to be used. It is called at the first connection and also
* is invoked if the connection is disconnected by an error.
*/
void init() {
shutdownTimestamp = null;
if (unacknowledgedStanzas != null) {
// It's possible that there are new stanzas in the writer queue that
// came in while we were disconnected but resumable, drain those into
// the unacknowledged queue so that they get resent now
drainWriterQueueToUnacknowledgedStanzas();
}
queue.start();
running = true;
Async.go(new Runnable() {
@Override
public void run() {
LOGGER.finer(threadName + " start");
try {
writePackets();
} finally {
LOGGER.finer(threadName + " exit");
running = false;
notifyWaitingThreads();
}
}
}, threadName);
}
private boolean done() {
return shutdownTimestamp != null;
}
protected void throwNotConnectedExceptionIfDoneAndResumptionNotPossible() throws NotConnectedException {
final boolean done = done();
if (done) {
2017-12-13 23:10:11 +01:00
final boolean smResumptionPossible = isSmResumptionPossible();
// Don't throw a NotConnectedException is there is an resumable stream available
2017-12-13 23:10:11 +01:00
if (!smResumptionPossible) {
throw new NotConnectedException(XMPPTCPConnection.this, "done=" + done
2017-12-13 23:10:11 +01:00
+ " smResumptionPossible=" + smResumptionPossible);
}
}
}
/**
* Sends the specified element to the server.
*
* @param element the element to send.
* @throws NotConnectedException if the XMPP connection is not connected.
* @throws InterruptedException if the calling thread was interrupted.
*/
protected void sendStreamElement(Element element) throws NotConnectedException, InterruptedException {
throwNotConnectedExceptionIfDoneAndResumptionNotPossible();
try {
queue.put(element);
}
catch (InterruptedException e) {
// put() may throw an InterruptedException for two reasons:
// 1. If the queue was shut down
// 2. If the thread was interrupted
// so we have to check which is the case
throwNotConnectedExceptionIfDoneAndResumptionNotPossible();
// If the method above did not throw, then the sending thread was interrupted
throw e;
}
}
/**
2018-03-31 14:17:30 +02:00
* Shuts down the stanza writer. Once this method has been called, no further
* packets will be written to the server.
*/
void shutdown(boolean instant) {
instantShutdown = instant;
queue.shutdown();
shutdownTimestamp = System.currentTimeMillis();
}
/**
* Maybe return the next available element from the queue for writing. If the queue is shut down <b>or</b> a
* spurious interrupt occurs, <code>null</code> is returned. So it is important to check the 'done' condition in
* that case.
*
* @return the next element for writing or null.
*/
private Element nextStreamElement() {
// It is important the we check if the queue is empty before removing an element from it
if (queue.isEmpty()) {
shouldBundleAndDefer = true;
}
Element packet = null;
try {
packet = queue.take();
}
catch (InterruptedException e) {
if (!queue.isShutdown()) {
// Users shouldn't try to interrupt the packet writer thread
LOGGER.log(Level.WARNING, "Writer thread was interrupted. Don't do that. Use disconnect() instead.", e);
}
}
return packet;
}
private void writePackets() {
try {
// Write out packets from the queue.
while (!done()) {
Element element = nextStreamElement();
if (element == null) {
continue;
}
// Get a local version of the bundle and defer callback, in case it's unset
// between the null check and the method invocation
final BundleAndDeferCallback localBundleAndDeferCallback = bundleAndDeferCallback;
// If the preconditions are given (e.g. bundleAndDefer callback is set, queue is
// empty), then we could wait a bit for further stanzas attempting to decrease
// our energy consumption
if (localBundleAndDeferCallback != null && isAuthenticated() && shouldBundleAndDefer) {
// Reset shouldBundleAndDefer to false, nextStreamElement() will set it to true once the
// queue is empty again.
shouldBundleAndDefer = false;
final AtomicBoolean bundlingAndDeferringStopped = new AtomicBoolean();
final int bundleAndDeferMillis = localBundleAndDeferCallback.getBundleAndDeferMillis(new BundleAndDefer(
bundlingAndDeferringStopped));
if (bundleAndDeferMillis > 0) {
long remainingWait = bundleAndDeferMillis;
final long waitStart = System.currentTimeMillis();
synchronized (bundlingAndDeferringStopped) {
while (!bundlingAndDeferringStopped.get() && remainingWait > 0) {
bundlingAndDeferringStopped.wait(remainingWait);
remainingWait = bundleAndDeferMillis
- (System.currentTimeMillis() - waitStart);
}
}
}
}
Stanza packet = null;
if (element instanceof Stanza) {
packet = (Stanza) element;
}
else if (element instanceof Enable) {
// The client needs to add messages to the unacknowledged stanzas queue
// right after it sent 'enabled'. Stanza will be added once
// unacknowledgedStanzas is not null.
unacknowledgedStanzas = new ArrayBlockingQueue<>(UNACKKNOWLEDGED_STANZAS_QUEUE_SIZE);
}
Also add stanzas to unacknowledgedStanzas while shutting down This caused the WaitForClosingStreamElementTest integration test to fail, because the last presences stanzas, which are send after done() in the writer thread would return true, are not added to the unacknowledgedStanzas queue. The result was: SEVERE: WaitForClosingStreamElementTest.waitForClosingStreamElementTest (LowLevel): Failed java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.igniterealtime.smack.inttest.SmackIntegrationTestFramework.invokeLowLevel(SmackIntegrationTestFramework.java:466) at org.igniterealtime.smack.inttest.SmackIntegrationTestFramework.runTests(SmackIntegrationTestFramework.java:375) at org.igniterealtime.smack.inttest.SmackIntegrationTestFramework.run(SmackIntegrationTestFramework.java:165) at org.igniterealtime.smack.inttest.SmackIntegrationTestFramework.main(SmackIntegrationTestFramework.java:84) Caused by: java.lang.AssertionError: Sync poing yielded failure exception at org.jivesoftware.smack.WaitForClosingStreamElementTest.waitForClosingStreamElementTest(WaitForClosingStreamElementTest.java:46) ... 8 more Caused by: org.jivesoftware.smack.sm.StreamManagementException$StreamManagementCounterError: There was an error regarding the Stream Mangement counters. Server reported 3 handled stanzas, which means that the 3 recently send stanzas by client are now acked by the server. But Smack had only 1 to acknowledge. The stanza id of the last acked outstanding stanza is FqL1X-144 at org.jivesoftware.smack.tcp.XMPPTCPConnection.processHandledCount(XMPPTCPConnection.java:1847) at org.jivesoftware.smack.tcp.XMPPTCPConnection.access$2600(XMPPTCPConnection.java:149) at org.jivesoftware.smack.tcp.XMPPTCPConnection$PacketReader.parsePackets(XMPPTCPConnection.java:1176) at org.jivesoftware.smack.tcp.XMPPTCPConnection$PacketReader.access$300(XMPPTCPConnection.java:980) at org.jivesoftware.smack.tcp.XMPPTCPConnection$PacketReader$1.run(XMPPTCPConnection.java:996) at java.lang.Thread.run(Thread.java:745)
2017-03-18 17:41:30 +01:00
maybeAddToUnacknowledgedStanzas(packet);
CharSequence elementXml = element.toXML(outgoingStreamXmlEnvironment);
if (elementXml instanceof XmlStringBuilder) {
try {
((XmlStringBuilder) elementXml).write(writer, outgoingStreamXmlEnvironment);
} catch (NullPointerException npe) {
LOGGER.log(Level.FINE, "NPE in XmlStringBuilder of " + element.getClass() + ": " + element, npe);
throw npe;
}
}
else {
writer.write(elementXml.toString());
}
if (queue.isEmpty()) {
writer.flush();
}
if (packet != null) {
firePacketSendingListeners(packet);
}
}
if (!instantShutdown) {
// Flush out the rest of the queue.
try {
while (!queue.isEmpty()) {
Element packet = queue.remove();
Also add stanzas to unacknowledgedStanzas while shutting down This caused the WaitForClosingStreamElementTest integration test to fail, because the last presences stanzas, which are send after done() in the writer thread would return true, are not added to the unacknowledgedStanzas queue. The result was: SEVERE: WaitForClosingStreamElementTest.waitForClosingStreamElementTest (LowLevel): Failed java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.igniterealtime.smack.inttest.SmackIntegrationTestFramework.invokeLowLevel(SmackIntegrationTestFramework.java:466) at org.igniterealtime.smack.inttest.SmackIntegrationTestFramework.runTests(SmackIntegrationTestFramework.java:375) at org.igniterealtime.smack.inttest.SmackIntegrationTestFramework.run(SmackIntegrationTestFramework.java:165) at org.igniterealtime.smack.inttest.SmackIntegrationTestFramework.main(SmackIntegrationTestFramework.java:84) Caused by: java.lang.AssertionError: Sync poing yielded failure exception at org.jivesoftware.smack.WaitForClosingStreamElementTest.waitForClosingStreamElementTest(WaitForClosingStreamElementTest.java:46) ... 8 more Caused by: org.jivesoftware.smack.sm.StreamManagementException$StreamManagementCounterError: There was an error regarding the Stream Mangement counters. Server reported 3 handled stanzas, which means that the 3 recently send stanzas by client are now acked by the server. But Smack had only 1 to acknowledge. The stanza id of the last acked outstanding stanza is FqL1X-144 at org.jivesoftware.smack.tcp.XMPPTCPConnection.processHandledCount(XMPPTCPConnection.java:1847) at org.jivesoftware.smack.tcp.XMPPTCPConnection.access$2600(XMPPTCPConnection.java:149) at org.jivesoftware.smack.tcp.XMPPTCPConnection$PacketReader.parsePackets(XMPPTCPConnection.java:1176) at org.jivesoftware.smack.tcp.XMPPTCPConnection$PacketReader.access$300(XMPPTCPConnection.java:980) at org.jivesoftware.smack.tcp.XMPPTCPConnection$PacketReader$1.run(XMPPTCPConnection.java:996) at java.lang.Thread.run(Thread.java:745)
2017-03-18 17:41:30 +01:00
if (packet instanceof Stanza) {
Stanza stanza = (Stanza) packet;
maybeAddToUnacknowledgedStanzas(stanza);
}
2019-02-04 13:27:41 +01:00
writer.write(packet.toXML().toString());
}
}
catch (Exception e) {
LOGGER.log(Level.WARNING,
"Exception flushing queue during shutdown, ignore and continue",
e);
}
// Close the stream.
try {
writer.write("</stream:stream>");
writer.flush();
}
catch (Exception e) {
LOGGER.log(Level.WARNING, "Exception writing closing stream element", e);
}
// Delete the queue contents (hopefully nothing is left).
queue.clear();
} else if (instantShutdown && isSmEnabled()) {
// This was an instantShutdown and SM is enabled, drain all remaining stanzas
// into the unacknowledgedStanzas queue
drainWriterQueueToUnacknowledgedStanzas();
}
// Do *not* close the writer here, as it will cause the socket
// to get closed. But we may want to receive further stanzas
// until the closing stream tag is received. The socket will be
// closed in shutdown().
}
catch (Exception e) {
// The exception can be ignored if the the connection is 'done'
// or if the it was caused because the socket got closed
if (!(done() || queue.isShutdown())) {
// Set running to false since this thread will exit here and notifyConnectionError() will wait until
// the reader and writer thread's 'running' value is false.
running = false;
notifyConnectionError(e);
} else {
LOGGER.log(Level.FINE, "Ignoring Exception in writePackets()", e);
}
}
}
private void drainWriterQueueToUnacknowledgedStanzas() {
2017-12-13 23:10:11 +01:00
List<Element> elements = new ArrayList<>(queue.size());
queue.drainTo(elements);
for (int i = 0; i < elements.size(); i++) {
Element element = elements.get(i);
// If the unacknowledgedStanza queue is full, then bail out with a warning message. See SMACK-844.
if (unacknowledgedStanzas.remainingCapacity() == 0) {
StreamManagementException.UnacknowledgedQueueFullException exception = StreamManagementException.UnacknowledgedQueueFullException
.newWith(i, elements, unacknowledgedStanzas);
LOGGER.log(Level.WARNING,
"Some stanzas may be lost as not all could be drained to the unacknowledged stanzas queue", exception);
return;
}
if (element instanceof Stanza) {
unacknowledgedStanzas.add((Stanza) element);
}
}
}
Also add stanzas to unacknowledgedStanzas while shutting down This caused the WaitForClosingStreamElementTest integration test to fail, because the last presences stanzas, which are send after done() in the writer thread would return true, are not added to the unacknowledgedStanzas queue. The result was: SEVERE: WaitForClosingStreamElementTest.waitForClosingStreamElementTest (LowLevel): Failed java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.igniterealtime.smack.inttest.SmackIntegrationTestFramework.invokeLowLevel(SmackIntegrationTestFramework.java:466) at org.igniterealtime.smack.inttest.SmackIntegrationTestFramework.runTests(SmackIntegrationTestFramework.java:375) at org.igniterealtime.smack.inttest.SmackIntegrationTestFramework.run(SmackIntegrationTestFramework.java:165) at org.igniterealtime.smack.inttest.SmackIntegrationTestFramework.main(SmackIntegrationTestFramework.java:84) Caused by: java.lang.AssertionError: Sync poing yielded failure exception at org.jivesoftware.smack.WaitForClosingStreamElementTest.waitForClosingStreamElementTest(WaitForClosingStreamElementTest.java:46) ... 8 more Caused by: org.jivesoftware.smack.sm.StreamManagementException$StreamManagementCounterError: There was an error regarding the Stream Mangement counters. Server reported 3 handled stanzas, which means that the 3 recently send stanzas by client are now acked by the server. But Smack had only 1 to acknowledge. The stanza id of the last acked outstanding stanza is FqL1X-144 at org.jivesoftware.smack.tcp.XMPPTCPConnection.processHandledCount(XMPPTCPConnection.java:1847) at org.jivesoftware.smack.tcp.XMPPTCPConnection.access$2600(XMPPTCPConnection.java:149) at org.jivesoftware.smack.tcp.XMPPTCPConnection$PacketReader.parsePackets(XMPPTCPConnection.java:1176) at org.jivesoftware.smack.tcp.XMPPTCPConnection$PacketReader.access$300(XMPPTCPConnection.java:980) at org.jivesoftware.smack.tcp.XMPPTCPConnection$PacketReader$1.run(XMPPTCPConnection.java:996) at java.lang.Thread.run(Thread.java:745)
2017-03-18 17:41:30 +01:00
private void maybeAddToUnacknowledgedStanzas(Stanza stanza) throws IOException {
// Check if the stream element should be put to the unacknowledgedStanza
// queue. Note that we can not do the put() in sendStanzaInternal() and the
// packet order is not stable at this point (sendStanzaInternal() can be
// called concurrently).
if (unacknowledgedStanzas != null && stanza != null) {
// If the unacknowledgedStanza queue reaching its high water mark, request an new ack
Also add stanzas to unacknowledgedStanzas while shutting down This caused the WaitForClosingStreamElementTest integration test to fail, because the last presences stanzas, which are send after done() in the writer thread would return true, are not added to the unacknowledgedStanzas queue. The result was: SEVERE: WaitForClosingStreamElementTest.waitForClosingStreamElementTest (LowLevel): Failed java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.igniterealtime.smack.inttest.SmackIntegrationTestFramework.invokeLowLevel(SmackIntegrationTestFramework.java:466) at org.igniterealtime.smack.inttest.SmackIntegrationTestFramework.runTests(SmackIntegrationTestFramework.java:375) at org.igniterealtime.smack.inttest.SmackIntegrationTestFramework.run(SmackIntegrationTestFramework.java:165) at org.igniterealtime.smack.inttest.SmackIntegrationTestFramework.main(SmackIntegrationTestFramework.java:84) Caused by: java.lang.AssertionError: Sync poing yielded failure exception at org.jivesoftware.smack.WaitForClosingStreamElementTest.waitForClosingStreamElementTest(WaitForClosingStreamElementTest.java:46) ... 8 more Caused by: org.jivesoftware.smack.sm.StreamManagementException$StreamManagementCounterError: There was an error regarding the Stream Mangement counters. Server reported 3 handled stanzas, which means that the 3 recently send stanzas by client are now acked by the server. But Smack had only 1 to acknowledge. The stanza id of the last acked outstanding stanza is FqL1X-144 at org.jivesoftware.smack.tcp.XMPPTCPConnection.processHandledCount(XMPPTCPConnection.java:1847) at org.jivesoftware.smack.tcp.XMPPTCPConnection.access$2600(XMPPTCPConnection.java:149) at org.jivesoftware.smack.tcp.XMPPTCPConnection$PacketReader.parsePackets(XMPPTCPConnection.java:1176) at org.jivesoftware.smack.tcp.XMPPTCPConnection$PacketReader.access$300(XMPPTCPConnection.java:980) at org.jivesoftware.smack.tcp.XMPPTCPConnection$PacketReader$1.run(XMPPTCPConnection.java:996) at java.lang.Thread.run(Thread.java:745)
2017-03-18 17:41:30 +01:00
// from the server in order to drain it
if (unacknowledgedStanzas.size() == UNACKKNOWLEDGED_STANZAS_QUEUE_SIZE_HIGH_WATER_MARK) {
2019-02-04 13:27:41 +01:00
writer.write(AckRequest.INSTANCE.toXML().toString());
Also add stanzas to unacknowledgedStanzas while shutting down This caused the WaitForClosingStreamElementTest integration test to fail, because the last presences stanzas, which are send after done() in the writer thread would return true, are not added to the unacknowledgedStanzas queue. The result was: SEVERE: WaitForClosingStreamElementTest.waitForClosingStreamElementTest (LowLevel): Failed java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.igniterealtime.smack.inttest.SmackIntegrationTestFramework.invokeLowLevel(SmackIntegrationTestFramework.java:466) at org.igniterealtime.smack.inttest.SmackIntegrationTestFramework.runTests(SmackIntegrationTestFramework.java:375) at org.igniterealtime.smack.inttest.SmackIntegrationTestFramework.run(SmackIntegrationTestFramework.java:165) at org.igniterealtime.smack.inttest.SmackIntegrationTestFramework.main(SmackIntegrationTestFramework.java:84) Caused by: java.lang.AssertionError: Sync poing yielded failure exception at org.jivesoftware.smack.WaitForClosingStreamElementTest.waitForClosingStreamElementTest(WaitForClosingStreamElementTest.java:46) ... 8 more Caused by: org.jivesoftware.smack.sm.StreamManagementException$StreamManagementCounterError: There was an error regarding the Stream Mangement counters. Server reported 3 handled stanzas, which means that the 3 recently send stanzas by client are now acked by the server. But Smack had only 1 to acknowledge. The stanza id of the last acked outstanding stanza is FqL1X-144 at org.jivesoftware.smack.tcp.XMPPTCPConnection.processHandledCount(XMPPTCPConnection.java:1847) at org.jivesoftware.smack.tcp.XMPPTCPConnection.access$2600(XMPPTCPConnection.java:149) at org.jivesoftware.smack.tcp.XMPPTCPConnection$PacketReader.parsePackets(XMPPTCPConnection.java:1176) at org.jivesoftware.smack.tcp.XMPPTCPConnection$PacketReader.access$300(XMPPTCPConnection.java:980) at org.jivesoftware.smack.tcp.XMPPTCPConnection$PacketReader$1.run(XMPPTCPConnection.java:996) at java.lang.Thread.run(Thread.java:745)
2017-03-18 17:41:30 +01:00
}
Also add stanzas to unacknowledgedStanzas while shutting down This caused the WaitForClosingStreamElementTest integration test to fail, because the last presences stanzas, which are send after done() in the writer thread would return true, are not added to the unacknowledgedStanzas queue. The result was: SEVERE: WaitForClosingStreamElementTest.waitForClosingStreamElementTest (LowLevel): Failed java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.igniterealtime.smack.inttest.SmackIntegrationTestFramework.invokeLowLevel(SmackIntegrationTestFramework.java:466) at org.igniterealtime.smack.inttest.SmackIntegrationTestFramework.runTests(SmackIntegrationTestFramework.java:375) at org.igniterealtime.smack.inttest.SmackIntegrationTestFramework.run(SmackIntegrationTestFramework.java:165) at org.igniterealtime.smack.inttest.SmackIntegrationTestFramework.main(SmackIntegrationTestFramework.java:84) Caused by: java.lang.AssertionError: Sync poing yielded failure exception at org.jivesoftware.smack.WaitForClosingStreamElementTest.waitForClosingStreamElementTest(WaitForClosingStreamElementTest.java:46) ... 8 more Caused by: org.jivesoftware.smack.sm.StreamManagementException$StreamManagementCounterError: There was an error regarding the Stream Mangement counters. Server reported 3 handled stanzas, which means that the 3 recently send stanzas by client are now acked by the server. But Smack had only 1 to acknowledge. The stanza id of the last acked outstanding stanza is FqL1X-144 at org.jivesoftware.smack.tcp.XMPPTCPConnection.processHandledCount(XMPPTCPConnection.java:1847) at org.jivesoftware.smack.tcp.XMPPTCPConnection.access$2600(XMPPTCPConnection.java:149) at org.jivesoftware.smack.tcp.XMPPTCPConnection$PacketReader.parsePackets(XMPPTCPConnection.java:1176) at org.jivesoftware.smack.tcp.XMPPTCPConnection$PacketReader.access$300(XMPPTCPConnection.java:980) at org.jivesoftware.smack.tcp.XMPPTCPConnection$PacketReader$1.run(XMPPTCPConnection.java:996) at java.lang.Thread.run(Thread.java:745)
2017-03-18 17:41:30 +01:00
try {
// It is important the we put the stanza in the unacknowledged stanza
// queue before we put it on the wire
unacknowledgedStanzas.put(stanza);
}
catch (InterruptedException e) {
throw new IllegalStateException(e);
}
}
}
}
/**
* Set if Stream Management should be used by default for new connections.
*
* @param useSmDefault true to use Stream Management for new connections.
*/
public static void setUseStreamManagementDefault(boolean useSmDefault) {
XMPPTCPConnection.useSmDefault = useSmDefault;
}
/**
* Set if Stream Management resumption should be used by default for new connections.
*
* @param useSmResumptionDefault true to use Stream Management resumption for new connections.
* @deprecated use {@link #setUseStreamManagementResumptionDefault(boolean)} instead.
*/
@Deprecated
public static void setUseStreamManagementResumptiodDefault(boolean useSmResumptionDefault) {
setUseStreamManagementResumptionDefault(useSmResumptionDefault);
}
/**
* Set if Stream Management resumption should be used by default for new connections.
*
* @param useSmResumptionDefault true to use Stream Management resumption for new connections.
*/
public static void setUseStreamManagementResumptionDefault(boolean useSmResumptionDefault) {
if (useSmResumptionDefault) {
// Also enable SM is resumption is enabled
setUseStreamManagementDefault(useSmResumptionDefault);
}
XMPPTCPConnection.useSmResumptionDefault = useSmResumptionDefault;
}
/**
* Set if Stream Management should be used if supported by the server.
*
* @param useSm true to use Stream Management.
*/
public void setUseStreamManagement(boolean useSm) {
this.useSm = useSm;
}
/**
* Set if Stream Management resumption should be used if supported by the server.
*
* @param useSmResumption true to use Stream Management resumption.
*/
public void setUseStreamManagementResumption(boolean useSmResumption) {
if (useSmResumption) {
// Also enable SM is resumption is enabled
setUseStreamManagement(useSmResumption);
}
this.useSmResumption = useSmResumption;
}
/**
* Set the preferred resumption time in seconds.
* @param resumptionTime the preferred resumption time in seconds
*/
public void setPreferredResumptionTime(int resumptionTime) {
smClientMaxResumptionTime = resumptionTime;
}
/**
* Add a predicate for Stream Management acknowledgment requests.
* <p>
* Those predicates are used to determine when a Stream Management acknowledgement request is send to the server.
* Some pre-defined predicates are found in the <code>org.jivesoftware.smack.sm.predicates</code> package.
* </p>
* <p>
* If not predicate is configured, the {@link Predicate#forMessagesOrAfter5Stanzas()} will be used.
* </p>
*
* @param predicate the predicate to add.
* @return if the predicate was not already active.
*/
public boolean addRequestAckPredicate(StanzaFilter predicate) {
synchronized (requestAckPredicates) {
return requestAckPredicates.add(predicate);
}
}
/**
* Remove the given predicate for Stream Management acknowledgment request.
* @param predicate the predicate to remove.
* @return true if the predicate was removed.
*/
public boolean removeRequestAckPredicate(StanzaFilter predicate) {
synchronized (requestAckPredicates) {
return requestAckPredicates.remove(predicate);
}
}
/**
* Remove all predicates for Stream Management acknowledgment requests.
*/
public void removeAllRequestAckPredicates() {
synchronized (requestAckPredicates) {
requestAckPredicates.clear();
}
}
/**
* Send an unconditional Stream Management acknowledgement request to the server.
*
2017-12-13 23:10:11 +01:00
* @throws StreamManagementNotEnabledException if Stream Management is not enabled.
* @throws NotConnectedException if the connection is not connected.
* @throws InterruptedException if the calling thread was interrupted.
*/
public void requestSmAcknowledgement() throws StreamManagementNotEnabledException, NotConnectedException, InterruptedException {
if (!isSmEnabled()) {
throw new StreamManagementException.StreamManagementNotEnabledException();
}
requestSmAcknowledgementInternal();
}
private void requestSmAcknowledgementInternal() throws NotConnectedException, InterruptedException {
packetWriter.sendStreamElement(AckRequest.INSTANCE);
}
/**
* Send a unconditional Stream Management acknowledgment to the server.
* <p>
* See <a href="http://xmpp.org/extensions/xep-0198.html#acking">XEP-198: Stream Management § 4. Acks</a>:
* "Either party MAY send an &lt;a/&gt; element at any time (e.g., after it has received a certain number of stanzas,
* or after a certain period of time), even if it has not received an &lt;r/&gt; element from the other party."
* </p>
*
* @throws StreamManagementNotEnabledException if Stream Management is not enabled.
* @throws NotConnectedException if the connection is not connected.
* @throws InterruptedException if the calling thread was interrupted.
*/
public void sendSmAcknowledgement() throws StreamManagementNotEnabledException, NotConnectedException, InterruptedException {
if (!isSmEnabled()) {
throw new StreamManagementException.StreamManagementNotEnabledException();
}
sendSmAcknowledgementInternal();
}
private void sendSmAcknowledgementInternal() throws NotConnectedException, InterruptedException {
[tcp] Do not send SM ack after we send a </stream:stream> Do net put an ack to the queue if it has already been shutdown. Some servers, like ejabberd, like to request an ack even after we have send a stream close (and hance the queue was shutdown). If we would not check here, then the ack would dangle around in the queue, and be send on the next re-connection attempt even before the stream open. See the following trace of the MUC bookmarks integration test. The fact that it is a MUC test does not matter, but this test does disconnect the connection and reconnect it. Not how the server, ejabberd in this case, requests an SM ack by sending an <r/> even though we already send the </stream:stream>: 22:22:05 SENT (4): <iq id='MD4UC-61' type='set'> <query xmlns='jabber:iq:private'> <storage xmlns='storage:bookmarks'> <conference name='Smack Inttest: 7in7j' autojoin='true' jid='y9jcn5@conference.salem.geekplace.eu'> <nick> Nick-P2VXD7 </nick> </conference> </storage> </query> </iq> 22:22:05 RECV (4): <r xmlns='urn:xmpp:sm:3'/> 22:22:05 SENT (4): <a xmlns='urn:xmpp:sm:3' h='29'/> 22:22:05 RECV (4): <message to='sinttest-7in7j-4@salem.geekplace.eu' from='sinttest-7in7j-4@salem.geekplace.eu' type='headline'> <event xmlns='http://jabber.org/protocol/pubsub#event'> <items node='storage:bookmarks'> <item id='current'> <storage xmlns='storage:bookmarks'> <conference name='Smack Inttest: 7in7j' autojoin='true' jid='y9jcn5@conference.salem.geekplace.eu'> <nick> Nick-P2VXD7 </nick> </conference> </storage> </item> </items> </event> <addresses xmlns='http://jabber.org/protocol/address'> <address jid='sinttest-7in7j-4@salem.geekplace.eu/1415073683806426185213090' type='replyto'/> </addresses> </message> 22:22:05 RECV (4): <iq xml:lang='en-US' to='sinttest-7in7j-4@salem.geekplace.eu/1415073683806426185213090' from='sinttest-7in7j-4@salem.geekplace.eu' type='result' id='MD4UC-61'/> 22:22:05 SENT (4): <presence id='6MS6J-20' type='unavailable'/> <a xmlns='urn:xmpp:sm:3' h='31'/> <!-- We have closed the stream --> </stream:stream> <!-- But the server still requests an SM ack --> 22:22:05 RECV (4): <r xmlns='urn:xmpp:sm:3'/> 22:22:05 RECV (4): </stream:stream> 22:22:05 XMPPConnection closed (XMPPTCPConnection[sinttest-7in7j-4@salem.geekplace.eu/1415073683806426185213090] (4)) 22:22:05 SENT (4): <a xmlns='urn:xmpp:sm:3' h='31'/> 22:22:05 SENT (4): <stream:stream xmlns='jabber:client' to='salem.geekplace.eu' xmlns:stream='http://etherx.jabber.org/streams' version='1.0' from='sinttest-7in7j-4@salem.geekplace.eu' xml:lang='en-US'> 22:22:05 RECV (4): ?xml version='1.0'?> <stream:stream id='3379123514446782311' ver 22:22:05 RECV (4): sion='1.0' xml:lang='en' xmlns:stream='http://etherx.jabber.org/streams' xmlns='jabber:client'> <stream:error> <invalid-xml xmlns='urn:ietf:params:xml:ns:xmpp-streams'/> </stream:error> </stream:stream> 22:22:05 XMPPConnection closed due to an exception (XMPPTCPConnection[sinttest-7in7j-4@salem.geekplace.eu/1415073683806426185213090] (4)) org.jivesoftware.smack.XMPPException$StreamErrorException: invalid-xml You can read more about the meaning of this stream error at http://xmpp.org/rfcs/rfc6120.html#streams-error-conditions <stream:error><invalid-xml xmlns='urn:ietf:params:xml:ns:xmpp-streams'/></stream:error> at org.jivesoftware.smack.tcp.XMPPTCPConnection$PacketReader.parsePackets(XMPPTCPConnection.java:981) at org.jivesoftware.smack.tcp.XMPPTCPConnection$PacketReader.access$700(XMPPTCPConnection.java:913) at org.jivesoftware.smack.tcp.XMPPTCPConnection$PacketReader$1.run(XMPPTCPConnection.java:936) at java.base/java.lang.Thread.run(Thread.java:834)
2020-09-17 23:04:55 +02:00
AckAnswer ackAnswer = new AckAnswer(clientHandledStanzasCount);
// Do net put an ack to the queue if it has already been shutdown. Some servers, like ejabberd, like to request
// an ack even after we have send a stream close (and hance the queue was shutdown). If we would not check here,
// then the ack would dangle around in the queue, and be send on the next re-connection attempt even before the
// stream open.
packetWriter.queue.putIfNotShutdown(ackAnswer);
}
/**
* Add a Stanza acknowledged listener.
* <p>
* Those listeners will be invoked every time a Stanza has been acknowledged by the server. The will not get
* automatically removed. Consider using {@link #addStanzaIdAcknowledgedListener(String, StanzaListener)} when
* possible.
* </p>
*
* @param listener the listener to add.
*/
public void addStanzaAcknowledgedListener(StanzaListener listener) {
stanzaAcknowledgedListeners.add(listener);
}
/**
* Remove the given Stanza acknowledged listener.
*
* @param listener the listener.
* @return true if the listener was removed.
*/
public boolean removeStanzaAcknowledgedListener(StanzaListener listener) {
return stanzaAcknowledgedListeners.remove(listener);
}
/**
* Remove all stanza acknowledged listeners.
*/
public void removeAllStanzaAcknowledgedListeners() {
stanzaAcknowledgedListeners.clear();
}
/**
* Add a Stanza dropped listener.
* <p>
* Those listeners will be invoked every time a Stanza has been dropped due to a failed SM resume. They will not get
* automatically removed. If at least one StanzaDroppedListener is configured, no attempt will be made to retransmit
* the Stanzas.
* </p>
*
* @param listener the listener to add.
* @since 4.3.3
*/
public void addStanzaDroppedListener(StanzaListener listener) {
stanzaDroppedListeners.add(listener);
}
/**
* Remove the given Stanza dropped listener.
*
* @param listener the listener.
* @return true if the listener was removed.
* @since 4.3.3
*/
public boolean removeStanzaDroppedListener(StanzaListener listener) {
return stanzaDroppedListeners.remove(listener);
}
/**
* Add a new Stanza ID acknowledged listener for the given ID.
* <p>
* The listener will be invoked if the stanza with the given ID was acknowledged by the server. It will
* automatically be removed after the listener was run.
* </p>
*
* @param id the stanza ID.
* @param listener the listener to invoke.
* @return the previous listener for this stanza ID or null.
* @throws StreamManagementNotEnabledException if Stream Management is not enabled.
*/
@SuppressWarnings("FutureReturnValueIgnored")
public StanzaListener addStanzaIdAcknowledgedListener(final String id, StanzaListener listener) throws StreamManagementNotEnabledException {
// Prevent users from adding callbacks that will never get removed
if (!smWasEnabledAtLeastOnce) {
throw new StreamManagementException.StreamManagementNotEnabledException();
}
// Remove the listener after max. 3 hours
final int removeAfterSeconds = Math.min(getMaxSmResumptionTime(), 3 * 60 * 60);
schedule(new Runnable() {
@Override
public void run() {
stanzaIdAcknowledgedListeners.remove(id);
}
}, removeAfterSeconds, TimeUnit.SECONDS);
return stanzaIdAcknowledgedListeners.put(id, listener);
}
/**
* Remove the Stanza ID acknowledged listener for the given ID.
*
* @param id the stanza ID.
* @return true if the listener was found and removed, false otherwise.
*/
public StanzaListener removeStanzaIdAcknowledgedListener(String id) {
return stanzaIdAcknowledgedListeners.remove(id);
}
/**
* Removes all Stanza ID acknowledged listeners.
*/
public void removeAllStanzaIdAcknowledgedListeners() {
stanzaIdAcknowledgedListeners.clear();
}
/**
* Returns true if Stream Management is supported by the server.
*
* @return true if Stream Management is supported by the server.
*/
public boolean isSmAvailable() {
return hasFeature(StreamManagementFeature.ELEMENT, StreamManagement.NAMESPACE);
}
/**
* Returns true if Stream Management was successfully negotiated with the server.
*
* @return true if Stream Management was negotiated.
*/
public boolean isSmEnabled() {
return smEnabledSyncPoint;
}
/**
* Returns true if the stream was successfully resumed with help of Stream Management.
*
* @return true if the stream was resumed.
*/
public boolean streamWasResumed() {
return smResumedSyncPoint == SyncPointState.successful;
}
/**
* Returns true if the connection is disconnected by a Stream resumption via Stream Management is possible.
*
* @return true if disconnected but resumption possible.
*/
public boolean isDisconnectedButSmResumptionPossible() {
return disconnectedButResumeable && isSmResumptionPossible();
}
/**
* Returns true if the stream is resumable.
*
* @return true if the stream is resumable.
*/
public boolean isSmResumptionPossible() {
// There is no resumable stream available
if (smSessionId == null)
return false;
final Long shutdownTimestamp = packetWriter.shutdownTimestamp;
// Seems like we are already reconnected, report true
if (shutdownTimestamp == null) {
return true;
}
// See if resumption time is over
long current = System.currentTimeMillis();
long maxResumptionMillies = ((long) getMaxSmResumptionTime()) * 1000;
if (current > shutdownTimestamp + maxResumptionMillies) {
// Stream resumption is *not* possible if the current timestamp is greater then the greatest timestamp where
// resumption is possible
return false;
} else {
return true;
}
}
/**
* Drop the stream management state. Sets {@link #smSessionId} and
* {@link #unacknowledgedStanzas} to <code>null</code>.
*/
private void dropSmState() {
// clientHandledCount and serverHandledCount will be reset on <enable/> and <enabled/>
// respective. No need to reset them here.
smSessionId = null;
unacknowledgedStanzas = null;
}
/**
* Get the maximum resumption time in seconds after which a managed stream can be resumed.
* <p>
* This method will return {@link Integer#MAX_VALUE} if neither the client nor the server specify a maximum
* resumption time. Be aware of integer overflows when using this value, e.g. do not add arbitrary values to it
* without checking for overflows before.
* </p>
*
* @return the maximum resumption time in seconds or {@link Integer#MAX_VALUE} if none set.
*/
public int getMaxSmResumptionTime() {
int clientResumptionTime = smClientMaxResumptionTime > 0 ? smClientMaxResumptionTime : Integer.MAX_VALUE;
2017-12-13 23:10:11 +01:00
int serverResumptionTime = smServerMaxResumptionTime > 0 ? smServerMaxResumptionTime : Integer.MAX_VALUE;
return Math.min(clientResumptionTime, serverResumptionTime);
}
private void processHandledCount(long handledCount) throws StreamManagementCounterError {
long ackedStanzasCount = SMUtils.calculateDelta(handledCount, serverHandledStanzasCount);
2017-12-13 23:10:11 +01:00
final List<Stanza> ackedStanzas = new ArrayList<>(
ackedStanzasCount <= Integer.MAX_VALUE ? (int) ackedStanzasCount
: Integer.MAX_VALUE);
for (long i = 0; i < ackedStanzasCount; i++) {
Stanza ackedStanza = unacknowledgedStanzas.poll();
// If the server ack'ed a stanza, then it must be in the
// unacknowledged stanza queue. There can be no exception.
if (ackedStanza == null) {
throw new StreamManagementCounterError(handledCount, serverHandledStanzasCount,
ackedStanzasCount, ackedStanzas);
}
ackedStanzas.add(ackedStanza);
}
boolean atLeastOneStanzaAcknowledgedListener = false;
if (!stanzaAcknowledgedListeners.isEmpty()) {
// If stanzaAcknowledgedListeners is not empty, the we have at least one
atLeastOneStanzaAcknowledgedListener = true;
}
else {
// Otherwise we look for a matching id in the stanza *id* acknowledged listeners
for (Stanza ackedStanza : ackedStanzas) {
String id = ackedStanza.getStanzaId();
if (id != null && stanzaIdAcknowledgedListeners.containsKey(id)) {
atLeastOneStanzaAcknowledgedListener = true;
break;
}
}
}
// Only spawn a new thread if there is a chance that some listener is invoked
if (atLeastOneStanzaAcknowledgedListener) {
asyncGo(new Runnable() {
@Override
public void run() {
for (Stanza ackedStanza : ackedStanzas) {
for (StanzaListener listener : stanzaAcknowledgedListeners) {
try {
listener.processStanza(ackedStanza);
}
catch (InterruptedException | NotConnectedException | NotLoggedInException e) {
LOGGER.log(Level.FINER, "Received exception", e);
}
}
String id = ackedStanza.getStanzaId();
if (StringUtils.isNullOrEmpty(id)) {
continue;
}
StanzaListener listener = stanzaIdAcknowledgedListeners.remove(id);
if (listener != null) {
try {
listener.processStanza(ackedStanza);
}
catch (InterruptedException | NotConnectedException | NotLoggedInException e) {
LOGGER.log(Level.FINER, "Received exception", e);
}
}
}
}
});
}
serverHandledStanzasCount = handledCount;
}
/**
* Set the default bundle and defer callback used for new connections.
*
* @param defaultBundleAndDeferCallback TODO javadoc me please
* @see BundleAndDeferCallback
* @since 4.1
*/
public static void setDefaultBundleAndDeferCallback(BundleAndDeferCallback defaultBundleAndDeferCallback) {
XMPPTCPConnection.defaultBundleAndDeferCallback = defaultBundleAndDeferCallback;
}
/**
* Set the bundle and defer callback used for this connection.
* <p>
* You can use <code>null</code> as argument to reset the callback. Outgoing stanzas will then
* no longer get deferred.
* </p>
*
* @param bundleAndDeferCallback the callback or <code>null</code>.
* @see BundleAndDeferCallback
* @since 4.1
*/
public void setBundleandDeferCallback(BundleAndDeferCallback bundleAndDeferCallback) {
this.bundleAndDeferCallback = bundleAndDeferCallback;
}
}