2003-01-13 17:58:47 +01:00
/ * *
*
2007-02-12 01:59:05 +01:00
* Copyright 2003 - 2007 Jive Software .
2003-01-13 17:58:47 +01:00
*
2014-02-17 18:57:38 +01:00
* Licensed under the Apache License , Version 2 . 0 ( the " License " ) ;
2004-11-03 00:53:30 +01:00
* you may not use this file except in compliance with the License .
* You may obtain a copy of the License at
2003-01-13 17:58:47 +01:00
*
2004-11-03 00:53:30 +01:00
* http : //www.apache.org/licenses/LICENSE-2.0
2003-01-13 17:58:47 +01:00
*
2004-11-03 00:53:30 +01:00
* Unless required by applicable law or agreed to in writing , software
* distributed under the License is distributed on an " AS IS " BASIS ,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
* See the License for the specific language governing permissions and
* limitations under the License .
2003-01-13 17:58:47 +01:00
* /
2014-05-15 15:04:46 +02:00
package org.jivesoftware.smack.tcp ;
2017-06-14 17:12:43 +02:00
import java.io.BufferedReader ;
import java.io.IOException ;
import java.io.InputStream ;
import java.io.InputStreamReader ;
import java.io.OutputStream ;
import java.io.OutputStreamWriter ;
import java.io.Writer ;
import java.net.InetAddress ;
import java.net.InetSocketAddress ;
import java.net.Socket ;
import java.security.cert.CertificateException ;
import java.util.ArrayList ;
import java.util.Collection ;
import java.util.Iterator ;
import java.util.LinkedHashSet ;
import java.util.LinkedList ;
import java.util.List ;
import java.util.Map ;
import java.util.Set ;
import java.util.concurrent.ArrayBlockingQueue ;
import java.util.concurrent.BlockingQueue ;
import java.util.concurrent.ConcurrentHashMap ;
import java.util.concurrent.ConcurrentLinkedQueue ;
import java.util.concurrent.TimeUnit ;
import java.util.concurrent.atomic.AtomicBoolean ;
import java.util.logging.Level ;
import java.util.logging.Logger ;
import javax.net.SocketFactory ;
import javax.net.ssl.HostnameVerifier ;
import javax.net.ssl.SSLSession ;
import javax.net.ssl.SSLSocket ;
2014-05-25 12:28:08 +02:00
import org.jivesoftware.smack.AbstractXMPPConnection ;
2014-05-15 15:04:46 +02:00
import org.jivesoftware.smack.ConnectionConfiguration ;
2014-09-11 09:49:16 +02:00
import org.jivesoftware.smack.ConnectionConfiguration.SecurityMode ;
2020-05-13 22:01:48 +02:00
import org.jivesoftware.smack.ConnectionListener ;
2014-05-15 15:04:46 +02:00
import org.jivesoftware.smack.SmackConfiguration ;
import org.jivesoftware.smack.SmackException ;
2014-09-11 09:49:16 +02:00
import org.jivesoftware.smack.SmackException.AlreadyConnectedException ;
2014-03-12 11:50:05 +01:00
import org.jivesoftware.smack.SmackException.AlreadyLoggedInException ;
2017-06-14 17:12:43 +02:00
import org.jivesoftware.smack.SmackException.ConnectionException ;
2020-04-04 13:03:31 +02:00
import org.jivesoftware.smack.SmackException.EndpointConnectionException ;
2014-03-12 11:50:05 +01:00
import org.jivesoftware.smack.SmackException.NotConnectedException ;
2017-11-06 22:38:13 +01:00
import org.jivesoftware.smack.SmackException.NotLoggedInException ;
2020-05-26 21:39:08 +02:00
import org.jivesoftware.smack.SmackException.SecurityNotPossibleException ;
2014-10-05 17:00:13 +02:00
import org.jivesoftware.smack.SmackException.SecurityRequiredByServerException ;
2018-12-17 21:16:03 +01:00
import org.jivesoftware.smack.SmackFuture ;
2017-06-14 17:12:43 +02:00
import org.jivesoftware.smack.StanzaListener ;
2014-05-15 15:04:46 +02:00
import org.jivesoftware.smack.XMPPConnection ;
import org.jivesoftware.smack.XMPPException ;
2017-06-14 17:12:43 +02:00
import org.jivesoftware.smack.XMPPException.FailedNonzaException ;
import org.jivesoftware.smack.XMPPException.StreamErrorException ;
import org.jivesoftware.smack.compress.packet.Compress ;
2014-09-11 09:49:16 +02:00
import org.jivesoftware.smack.compress.packet.Compressed ;
2013-02-26 10:26:41 +01:00
import org.jivesoftware.smack.compression.XMPPInputOutputStream ;
2020-04-04 13:03:31 +02:00
import org.jivesoftware.smack.datatypes.UInt16 ;
2015-02-26 18:41:17 +01:00
import org.jivesoftware.smack.filter.StanzaFilter ;
2020-05-25 14:47:36 +02:00
import org.jivesoftware.smack.internal.SmackTlsContext ;
2014-09-11 09:49:16 +02:00
import org.jivesoftware.smack.packet.Element ;
import org.jivesoftware.smack.packet.IQ ;
import org.jivesoftware.smack.packet.Message ;
2017-06-14 17:12:43 +02:00
import org.jivesoftware.smack.packet.Nonza ;
2005-09-06 00:06:40 +02:00
import org.jivesoftware.smack.packet.Presence ;
2017-06-14 17:12:43 +02:00
import org.jivesoftware.smack.packet.Stanza ;
2014-09-11 09:49:16 +02:00
import org.jivesoftware.smack.packet.StartTls ;
2017-01-06 15:03:28 +01:00
import org.jivesoftware.smack.packet.StreamError ;
2017-06-14 17:12:43 +02:00
import org.jivesoftware.smack.proxy.ProxyInfo ;
2019-09-24 23:32:08 +02:00
import org.jivesoftware.smack.sasl.packet.SaslNonza ;
2015-01-18 11:03:03 +01:00
import org.jivesoftware.smack.sm.SMUtils ;
import org.jivesoftware.smack.sm.StreamManagementException ;
import org.jivesoftware.smack.sm.StreamManagementException.StreamIdDoesNotMatchException ;
2015-02-18 12:01:55 +01:00
import org.jivesoftware.smack.sm.StreamManagementException.StreamManagementCounterError ;
2015-01-18 11:03:03 +01:00
import org.jivesoftware.smack.sm.StreamManagementException.StreamManagementNotEnabledException ;
import org.jivesoftware.smack.sm.packet.StreamManagement ;
import org.jivesoftware.smack.sm.packet.StreamManagement.AckAnswer ;
import org.jivesoftware.smack.sm.packet.StreamManagement.AckRequest ;
import org.jivesoftware.smack.sm.packet.StreamManagement.Enable ;
import org.jivesoftware.smack.sm.packet.StreamManagement.Enabled ;
import org.jivesoftware.smack.sm.packet.StreamManagement.Failed ;
import org.jivesoftware.smack.sm.packet.StreamManagement.Resume ;
import org.jivesoftware.smack.sm.packet.StreamManagement.Resumed ;
import org.jivesoftware.smack.sm.packet.StreamManagement.StreamManagementFeature ;
import org.jivesoftware.smack.sm.predicates.Predicate ;
import org.jivesoftware.smack.sm.provider.ParseStreamManagement ;
2020-04-04 13:03:31 +02:00
import org.jivesoftware.smack.tcp.rce.RemoteXmppTcpConnectionEndpoints ;
import org.jivesoftware.smack.tcp.rce.Rfc6120TcpRemoteConnectionEndpoint ;
2014-05-26 22:02:04 +02:00
import org.jivesoftware.smack.util.ArrayBlockingQueueWithShutdown ;
2015-01-15 16:12:41 +01:00
import org.jivesoftware.smack.util.Async ;
2018-08-15 17:25:22 +02:00
import org.jivesoftware.smack.util.CloseableUtil ;
2014-05-26 22:02:04 +02:00
import org.jivesoftware.smack.util.PacketParserUtils ;
2014-09-11 09:49:16 +02:00
import org.jivesoftware.smack.util.StringUtils ;
2014-05-29 09:21:04 +02:00
import org.jivesoftware.smack.util.TLSUtils ;
2015-09-25 18:00:32 +02:00
import org.jivesoftware.smack.util.XmlStringBuilder ;
2020-04-04 13:03:31 +02:00
import org.jivesoftware.smack.util.rce.RemoteConnectionException ;
2019-05-06 22:06:13 +02:00
import org.jivesoftware.smack.xml.SmackXmlParser ;
import org.jivesoftware.smack.xml.XmlPullParser ;
import org.jivesoftware.smack.xml.XmlPullParserException ;
2017-06-14 17:12:43 +02:00
2015-09-25 18:00:32 +02:00
import org.jxmpp.jid.impl.JidCreate ;
import org.jxmpp.jid.parts.Resourcepart ;
import org.jxmpp.stringprep.XmppStringprepException ;
2019-05-16 14:09:05 +02:00
import org.minidns.dnsname.DnsName ;
2003-01-13 17:58:47 +01:00
/ * *
2015-02-12 12:13:19 +01:00
* Creates a socket connection to an XMPP server . This is the default connection
* to an XMPP server and is specified in the XMPP Core ( RFC 6120 ) .
2018-05-09 23:06:12 +02:00
*
2014-03-10 18:31:45 +01:00
* @see XMPPConnection
2003-01-13 17:58:47 +01:00
* @author Matt Tucker
* /
2014-05-25 12:28:08 +02:00
public class XMPPTCPConnection extends AbstractXMPPConnection {
2003-01-13 17:58:47 +01:00
2014-09-11 09:49:16 +02:00
private static final int QUEUE_SIZE = 500 ;
2014-04-23 09:37:16 +02:00
private static final Logger LOGGER = Logger . getLogger ( XMPPTCPConnection . class . getName ( ) ) ;
2005-08-27 04:33:08 +02:00
/ * *
2010-02-09 12:55:56 +01:00
* The socket which is used for this connection .
2005-08-27 04:33:08 +02:00
* /
2014-05-26 22:02:04 +02:00
private Socket socket ;
2005-09-05 22:00:45 +02:00
2014-09-11 09:49:16 +02:00
/ * *
2018-05-09 23:06:12 +02:00
*
2014-09-11 09:49:16 +02:00
* /
private boolean disconnectedButResumeable = false ;
2016-11-20 19:32:26 +01:00
private SSLSocket secureSocket ;
2003-01-13 17:58:47 +01:00
2014-09-11 09:49:16 +02:00
/ * *
* Protected access level because of unit test purposes
* /
2019-02-09 13:59:06 +01:00
protected final PacketWriter packetWriter = new PacketWriter ( ) ;
2003-01-13 17:58:47 +01:00
2006-01-16 18:34:56 +01:00
/ * *
2014-09-11 09:49:16 +02:00
* Protected access level because of unit test purposes
2006-01-16 18:34:56 +01:00
* /
2019-02-09 13:59:06 +01:00
protected final PacketReader packetReader = new PacketReader ( ) ;
2014-09-11 09:49:16 +02:00
2006-01-16 18:34:56 +01:00
/ * *
2018-05-09 23:06:12 +02:00
*
2006-01-16 18:34:56 +01:00
* /
2020-05-26 21:39:08 +02:00
private boolean streamFeaturesAfterAuthenticationReceived ;
2006-09-16 00:42:06 +02:00
2014-04-27 12:27:12 +02:00
/ * *
2018-05-09 23:06:12 +02:00
*
2014-04-27 12:27:12 +02:00
* /
2020-05-26 21:39:08 +02:00
private boolean compressSyncPoint ;
2015-09-25 18:00:32 +02:00
2015-02-22 10:33:08 +01:00
/ * *
* The default bundle and defer callback , used for new connections .
* @see bundleAndDeferCallback
* /
2015-02-18 14:38:56 +01:00
private static BundleAndDeferCallback defaultBundleAndDeferCallback ;
2015-02-22 10:33:08 +01:00
/ * *
* The used bundle and defer callback .
* < p >
* Although this field may be set concurrently , the ' volatile ' keyword was deliberately not added , in order to avoid
* having a ' volatile ' read within the writer threads loop .
* < / p >
* /
2015-02-18 14:38:56 +01:00
private BundleAndDeferCallback bundleAndDeferCallback = defaultBundleAndDeferCallback ;
2016-01-10 18:57:19 +01:00
private static boolean useSmDefault = true ;
2014-09-11 09:49:16 +02:00
private static boolean useSmResumptionDefault = true ;
/ * *
* The stream ID of the stream that is currently resumable , ie . the stream we hold the state
* for in { @link # clientHandledStanzasCount } , { @link # serverHandledStanzasCount } and
2020-05-26 21:39:08 +02:00
* { @link # unFailedNonzaExceptionacknowledgedStanzas } .
2014-09-11 09:49:16 +02:00
* /
private String smSessionId ;
2020-06-12 21:16:05 +02:00
/ * *
* Represents the state of stream management resumption .
* < p >
* Unlike other sync points , this sync point is marked volatile because it is also read by the reader thread .
* < / p >
* /
private volatile SyncPointState smResumedSyncPoint ;
2020-05-26 21:39:08 +02:00
private Failed smResumptionFailed ;
2014-09-11 09:49:16 +02:00
2020-06-12 21:16:05 +02:00
/ * *
* Represents the state of stream magement .
* < p >
* This boolean is marked volatile as it is read by various threads , including the reader thread via { @link # isSmEnabled ( ) } .
* < / p >
* /
private volatile boolean smEnabledSyncPoint ;
2014-09-11 09:49:16 +02:00
/ * *
* The client ' s preferred maximum resumption time in seconds .
* /
private int smClientMaxResumptionTime = - 1 ;
/ * *
* The server ' s preferred maximum resumption time in seconds .
* /
2017-12-13 23:10:11 +01:00
private int smServerMaxResumptionTime = - 1 ;
2014-09-11 09:49:16 +02:00
/ * *
* Indicates whether Stream Management ( XEP - 198 ) should be used if it ' s supported by the server .
* /
private boolean useSm = useSmDefault ;
private boolean useSmResumption = useSmResumptionDefault ;
2015-01-15 16:11:49 +01:00
/ * *
* The counter that the server sends the client about it ' s current height . For example , if the server sends
* { @code < a h = ' 42 ' / > } , then this will be set to 42 ( while also handling the { @link # unacknowledgedStanzas } queue ) .
* /
2014-09-11 09:49:16 +02:00
private long serverHandledStanzasCount = 0 ;
2014-10-29 10:06:33 +01:00
/ * *
* The counter for stanzas handled ( " received " ) by the client .
* < p >
* Note that we don ' t need to synchronize this counter . Although JLS 17 . 7 states that reads and writes to longs are
* not atomic , it guarantees that there are at most 2 separate writes , one to each 32 - bit half . And since
* { @link SMUtils # incrementHeight ( long ) } masks the lower 32 bit , we only operate on one half of the long and
* therefore have no concurrency problem because the read / write operations on one half are guaranteed to be atomic .
* < / p >
* /
2014-09-11 09:49:16 +02:00
private long clientHandledStanzasCount = 0 ;
2015-01-15 16:11:49 +01:00
2015-02-05 11:17:27 +01:00
private BlockingQueue < Stanza > unacknowledgedStanzas ;
2014-09-11 09:49:16 +02:00
2014-10-29 10:06:33 +01:00
/ * *
* Set to true if Stream Management was at least once enabled for this connection .
* /
private boolean smWasEnabledAtLeastOnce = false ;
2014-09-11 09:49:16 +02:00
/ * *
* This listeners are invoked for every stanza that got acknowledged .
* < p >
2018-03-09 14:15:46 +01:00
* We use a { @link ConcurrentLinkedQueue } here in order to allow the listeners to remove
2014-09-11 09:49:16 +02:00
* themselves after they have been invoked .
* < / p >
* /
2017-12-13 23:10:11 +01:00
private final Collection < StanzaListener > stanzaAcknowledgedListeners = new ConcurrentLinkedQueue < > ( ) ;
2014-09-11 09:49:16 +02:00
2018-11-28 17:55:46 +01:00
/ * *
* These listeners are invoked for every stanza that got dropped .
* < p >
* We use a { @link ConcurrentLinkedQueue } here in order to allow the listeners to remove
* themselves after they have been invoked .
* < / p >
* /
private final Collection < StanzaListener > stanzaDroppedListeners = new ConcurrentLinkedQueue < > ( ) ;
2014-09-11 09:49:16 +02:00
/ * *
* This listeners are invoked for a acknowledged stanza that has the given stanza ID . They will
* only be invoked once and automatically removed after that .
* /
2017-12-13 23:10:11 +01:00
private final Map < String , StanzaListener > stanzaIdAcknowledgedListeners = new ConcurrentHashMap < > ( ) ;
2014-09-11 09:49:16 +02:00
/ * *
* Predicates that determine if an stream management ack should be requested from the server .
* < p >
* We use a linked hash set here , so that the order how the predicates are added matches the
* order in which they are invoked in order to determine if an ack request should be send or not .
* < / p >
* /
2017-12-13 23:10:11 +01:00
private final Set < StanzaFilter > requestAckPredicates = new LinkedHashSet < > ( ) ;
2007-11-30 20:40:31 +01:00
2018-03-28 15:56:04 +02:00
@SuppressWarnings ( " HidingField " )
2014-11-09 18:30:16 +01:00
private final XMPPTCPConnectionConfiguration config ;
2007-11-30 20:40:31 +01:00
/ * *
2014-11-09 18:30:16 +01:00
* Creates a new XMPP connection over TCP ( optionally using proxies ) .
* < p >
* Note that XMPPTCPConnection constructors do not establish a connection to the server
* and you must call { @link # connect ( ) } .
* < / p >
2007-11-30 20:40:31 +01:00
*
* @param config the connection configuration .
* /
2014-11-09 18:30:16 +01:00
public XMPPTCPConnection ( XMPPTCPConnectionConfiguration config ) {
2010-02-09 12:55:56 +01:00
super ( config ) ;
2014-11-09 18:30:16 +01:00
this . config = config ;
2020-05-13 22:01:48 +02:00
addConnectionListener ( new ConnectionListener ( ) {
2015-08-26 18:47:05 +02:00
@Override
public void connectionClosedOnError ( Exception e ) {
2017-11-16 14:18:15 +01:00
if ( e instanceof XMPPException . StreamErrorException | | e instanceof StreamManagementException ) {
2015-08-26 18:47:05 +02:00
dropSmState ( ) ;
}
}
} ) ;
2019-09-24 23:32:08 +02:00
// Re-init the reader and writer in case of SASL <success/>. This is done to reset the parser since a new stream
// is initiated.
buildNonzaCallback ( ) . listenFor ( SaslNonza . Success . class , s - > resetParser ( ) ) . install ( ) ;
2003-01-13 17:58:47 +01:00
}
2015-02-09 07:32:04 +01:00
/ * *
* Creates a new XMPP connection over TCP .
* < p >
* Note that { @code jid } must be the bare JID , e . g . " user@example.org " . More fine - grained control over the
* connection settings is available using the { @link # XMPPTCPConnection ( XMPPTCPConnectionConfiguration ) }
* constructor .
* < / p >
2018-05-09 23:06:12 +02:00
*
2015-02-09 07:32:04 +01:00
* @param jid the bare JID used by the client .
* @param password the password or authentication token .
2019-08-30 12:08:30 +02:00
* @throws XmppStringprepException if the provided string is invalid .
2015-02-09 07:32:04 +01:00
* /
2015-09-25 18:00:32 +02:00
public XMPPTCPConnection ( CharSequence jid , String password ) throws XmppStringprepException {
2019-04-07 16:44:04 +02:00
this ( XMPPTCPConnectionConfiguration . builder ( ) . setXmppAddressAndPassword ( jid , password ) . build ( ) ) ;
2015-02-09 07:32:04 +01:00
}
2004-07-06 00:22:02 +02:00
/ * *
2014-11-09 18:30:16 +01:00
* Creates a new XMPP connection over TCP .
* < p >
* This is the simplest constructor for connecting to an XMPP server . Alternatively ,
* you can get fine - grained control over connection settings using the
* { @link # XMPPTCPConnection ( XMPPTCPConnectionConfiguration ) } constructor .
* < / p >
2019-08-30 12:08:30 +02:00
* @param username TODO javadoc me please
* @param password TODO javadoc me please
* @param serviceName TODO javadoc me please
* @throws XmppStringprepException if the provided string is invalid .
2004-07-06 00:22:02 +02:00
* /
2015-09-25 18:00:32 +02:00
public XMPPTCPConnection ( CharSequence username , String password , String serviceName ) throws XmppStringprepException {
this ( XMPPTCPConnectionConfiguration . builder ( ) . setUsernameAndPassword ( username , password ) . setXmppDomain (
JidCreate . domainBareFrom ( serviceName ) ) . build ( ) ) ;
2005-07-18 23:28:25 +02:00
}
2011-03-28 15:13:41 +02:00
@Override
2014-11-09 18:30:16 +01:00
protected void throwNotConnectedExceptionIfAppropriate ( ) throws NotConnectedException {
2015-01-17 17:53:36 +01:00
if ( packetWriter = = null ) {
throw new NotConnectedException ( ) ;
}
2014-11-09 18:30:16 +01:00
packetWriter . throwNotConnectedExceptionIfDoneAndResumptionNotPossible ( ) ;
}
@Override
protected void throwAlreadyConnectedExceptionIfAppropriate ( ) throws AlreadyConnectedException {
if ( isConnected ( ) & & ! disconnectedButResumeable ) {
throw new AlreadyConnectedException ( ) ;
2003-01-13 17:58:47 +01:00
}
2014-11-09 18:30:16 +01:00
}
@Override
protected void throwAlreadyLoggedInExceptionIfAppropriate ( ) throws AlreadyLoggedInException {
if ( isAuthenticated ( ) & & ! disconnectedButResumeable ) {
2014-03-12 11:50:05 +01:00
throw new AlreadyLoggedInException ( ) ;
2003-03-09 00:09:48 +01:00
}
2014-11-09 18:30:16 +01:00
}
2014-09-11 09:49:16 +02:00
2014-11-09 18:30:16 +01:00
@Override
2015-09-25 18:00:32 +02:00
protected void afterSuccessfulLogin ( final boolean resumed ) throws NotConnectedException , InterruptedException {
2014-11-09 18:30:16 +01:00
// Reset the flag in case it was set
disconnectedButResumeable = false ;
super . afterSuccessfulLogin ( resumed ) ;
}
2003-01-16 02:01:27 +01:00
2014-11-09 18:30:16 +01:00
@Override
2015-09-25 18:00:32 +02:00
protected synchronized void loginInternal ( String username , String password , Resourcepart resource ) throws XMPPException ,
SmackException , IOException , InterruptedException {
// Authenticate using SASL
2016-11-20 19:32:26 +01:00
SSLSession sslSession = secureSocket ! = null ? secureSocket . getSession ( ) : null ;
2020-06-12 18:15:24 +02:00
streamFeaturesAfterAuthenticationReceived = false ;
2019-09-24 23:32:08 +02:00
authenticate ( username , password , config . getAuthzid ( ) , sslSession ) ;
2003-01-15 15:54:11 +01:00
2018-12-20 16:52:44 +01:00
// Wait for stream features after the authentication.
// TODO: The name of this synchronization point "maybeCompressFeaturesReceived" is not perfect. It should be
// renamed to "streamFeaturesAfterAuthenticationReceived".
2020-05-26 21:39:08 +02:00
waitForConditionOrThrowConnectionException ( ( ) - > streamFeaturesAfterAuthenticationReceived , " compress features from server " ) ;
2018-12-20 16:52:44 +01:00
2014-04-27 12:27:12 +02:00
// If compression is enabled then request the server to use stream compression. XEP-170
// recommends to perform stream compression before resource binding.
2015-09-25 18:00:32 +02:00
maybeEnableCompression ( ) ;
2014-04-27 12:27:12 +02:00
2020-06-12 18:15:24 +02:00
smResumedSyncPoint = SyncPointState . initial ;
smResumptionFailed = null ;
2014-09-11 09:49:16 +02:00
if ( isSmResumptionPossible ( ) ) {
2020-05-26 21:39:08 +02:00
smResumedSyncPoint = SyncPointState . request_sent ;
sendNonza ( new Resume ( clientHandledStanzasCount , smSessionId ) ) ;
2020-06-17 20:27:02 +02:00
waitForConditionOrConnectionException ( ( ) - > smResumedSyncPoint = = SyncPointState . successful | | smResumptionFailed ! = null , " resume previous stream " ) ;
2020-05-26 21:39:08 +02:00
if ( smResumedSyncPoint = = SyncPointState . successful ) {
2014-09-11 09:49:16 +02:00
// We successfully resumed the stream, be done here
2014-11-09 18:30:16 +01:00
afterSuccessfulLogin ( true ) ;
2014-09-11 09:49:16 +02:00
return ;
2003-08-12 21:30:51 +02:00
}
2014-09-11 09:49:16 +02:00
// SM resumption failed, what Smack does here is to report success of
// lastFeaturesReceived in case of sm resumption was answered with 'failed' so that
// normal resource binding can be tried.
2020-05-26 21:39:08 +02:00
assert smResumptionFailed ! = null ;
LOGGER . fine ( " Stream resumption failed, continuing with normal stream establishment process: " + smResumptionFailed ) ;
2003-08-12 21:30:51 +02:00
}
2003-02-10 06:01:01 +01:00
2020-07-20 14:23:09 +02:00
// We either failed to resume a previous stream management (SM) session, or we did not even try. In any case,
// mark SM as not enabled. Most importantly, we do this prior calling bindResourceAndEstablishSession(), as the
// bind IQ may trigger a SM ack request, which would be invalid in the pre resource bound state.
smEnabledSyncPoint = false ;
2015-02-05 11:17:27 +01:00
List < Stanza > previouslyUnackedStanzas = new LinkedList < Stanza > ( ) ;
2014-09-11 09:49:16 +02:00
if ( unacknowledgedStanzas ! = null ) {
// There was a previous connection with SM enabled but that was either not resumable or
// failed to resume. Make sure that we (re-)send the unacknowledged stanzas.
unacknowledgedStanzas . drainTo ( previouslyUnackedStanzas ) ;
2015-03-02 15:12:19 +01:00
// Reset unacknowledged stanzas to 'null' to signal that we never send 'enable' in this
// XMPP session (There maybe was an enabled in a previous XMPP session of this
// connection instance though). This is used in writePackets to decide if stanzas should
// be added to the unacknowledged stanzas queue, because they have to be added right
// after the 'enable' stream element has been sent.
2015-08-26 18:47:05 +02:00
dropSmState ( ) ;
2014-09-11 09:49:16 +02:00
}
2015-12-03 13:30:44 +01:00
// Now bind the resource. It is important to do this *after* we dropped an eventually
// existing Stream Management state. As otherwise <bind/> and <session/> may end up in
// unacknowledgedStanzas and become duplicated on reconnect. See SMACK-706.
bindResourceAndEstablishSession ( resource ) ;
2014-09-11 09:49:16 +02:00
if ( isSmAvailable ( ) & & useSm ) {
// Remove what is maybe left from previously stream managed sessions
serverHandledStanzasCount = 0 ;
2020-05-26 21:39:08 +02:00
sendNonza ( new Enable ( useSmResumption , smClientMaxResumptionTime ) ) ;
2014-09-11 09:49:16 +02:00
// XEP-198 3. Enabling Stream Management. If the server response to 'Enable' is 'Failed'
// then this is a non recoverable error and we therefore throw an exception.
2020-05-26 21:39:08 +02:00
waitForConditionOrThrowConnectionException ( ( ) - > smEnabledSyncPoint , " enabling stream mangement " ) ;
2014-09-11 09:49:16 +02:00
synchronized ( requestAckPredicates ) {
if ( requestAckPredicates . isEmpty ( ) ) {
// Assure that we have at lest one predicate set up that so that we request acks
// for the server and eventually flush some stanzas from the unacknowledged
// stanza queue
requestAckPredicates . add ( Predicate . forMessagesOrAfter5Stanzas ( ) ) ;
}
}
2007-11-14 17:27:47 +01:00
}
2018-11-28 17:55:46 +01:00
// Inform client about failed resumption if possible, resend stanzas otherwise
// Process the stanzas synchronously so a client can re-queue them for transmission
// before it is informed about connection success
if ( ! stanzaDroppedListeners . isEmpty ( ) ) {
for ( Stanza stanza : previouslyUnackedStanzas ) {
for ( StanzaListener listener : stanzaDroppedListeners ) {
try {
listener . processStanza ( stanza ) ;
}
catch ( InterruptedException | NotConnectedException | NotLoggedInException e ) {
LOGGER . log ( Level . FINER , " StanzaDroppedListener received exception " , e ) ;
}
}
}
} else {
for ( Stanza stanza : previouslyUnackedStanzas ) {
sendStanzaInternal ( stanza ) ;
}
2014-08-18 18:48:57 +02:00
}
2014-09-11 09:49:16 +02:00
2014-11-09 18:30:16 +01:00
afterSuccessfulLogin ( false ) ;
2007-11-14 17:27:47 +01:00
}
2003-02-10 06:01:01 +01:00
2014-05-26 22:02:04 +02:00
@Override
2003-09-18 22:39:14 +02:00
public boolean isSecureConnection ( ) {
2016-11-20 19:32:26 +01:00
return secureSocket ! = null ;
2003-09-18 22:39:14 +02:00
}
2003-01-13 17:58:47 +01:00
/ * *
2014-05-04 20:31:25 +02:00
* Shuts the current connection down . After this method returns , the connection must be ready
* for re - use by connect .
2003-01-13 17:58:47 +01:00
* /
2014-05-04 20:31:25 +02:00
@Override
protected void shutdown ( ) {
2014-10-28 12:22:16 +01:00
if ( isSmEnabled ( ) ) {
try {
// Try to send a last SM Acknowledgement. Most servers won't find this information helpful, as the SM
// state is dropped after a clean disconnect anyways. OTOH it doesn't hurt much either.
sendSmAcknowledgementInternal ( ) ;
2015-09-25 18:00:32 +02:00
} catch ( InterruptedException | NotConnectedException e ) {
2014-10-28 12:22:16 +01:00
LOGGER . log ( Level . FINE , " Can not send final SM ack as connection is not connected " , e ) ;
}
}
2014-09-11 09:49:16 +02:00
shutdown ( false ) ;
}
2019-02-04 08:59:39 +01:00
@Override
2014-11-09 18:30:16 +01:00
public synchronized void instantShutdown ( ) {
2014-09-11 09:49:16 +02:00
shutdown ( true ) ;
}
private void shutdown ( boolean instant ) {
2020-05-31 14:15:13 +02:00
// The writer thread may already been finished at this point, for example when the connection is in the
// disconnected-but-resumable state. There is no need to wait for the closing stream tag from the server in this
// case.
if ( ! packetWriter . done ( ) ) {
// First shutdown the writer, this will result in a closing stream element getting send to
// the server
2020-06-16 22:59:26 +02:00
LOGGER . finer ( packetWriter . threadName + " shutdown() " ) ;
2020-05-31 14:15:13 +02:00
packetWriter . shutdown ( instant ) ;
2020-06-16 22:59:26 +02:00
LOGGER . finer ( packetWriter . threadName + " shutdown() returned " ) ;
2020-05-31 14:15:13 +02:00
if ( ! instant ) {
waitForClosingStreamTagFromServer ( ) ;
}
2015-09-25 18:00:32 +02:00
}
2020-06-16 22:59:26 +02:00
LOGGER . finer ( packetReader . threadName + " shutdown() " ) ;
2019-02-09 18:20:55 +01:00
packetReader . shutdown ( ) ;
2020-06-16 22:59:26 +02:00
LOGGER . finer ( packetReader . threadName + " shutdown() returned " ) ;
2013-01-06 15:02:44 +01:00
2018-08-15 17:25:22 +02:00
CloseableUtil . maybeClose ( socket , LOGGER ) ;
2013-01-06 15:02:44 +01:00
2014-11-09 18:30:16 +01:00
setWasAuthenticated ( ) ;
2019-03-28 13:52:17 +01:00
2020-05-26 21:39:08 +02:00
try {
2020-06-17 20:29:21 +02:00
boolean readerAndWriterThreadsTermianted = waitFor ( ( ) - > ! packetWriter . running & & ! packetReader . running ) ;
2020-05-26 21:39:08 +02:00
if ( ! readerAndWriterThreadsTermianted ) {
2020-06-16 22:59:26 +02:00
LOGGER . severe ( " Reader and/or writer threads did not terminate timely. Writer running: "
+ packetWriter . running + " , Reader running: " + packetReader . running ) ;
} else {
LOGGER . fine ( " Reader and writer threads terminated " ) ;
2020-05-26 21:39:08 +02:00
}
} catch ( InterruptedException e ) {
LOGGER . log ( Level . FINE , " Interrupted while waiting for reader and writer threads to terminate " , e ) ;
}
2019-03-28 13:52:17 +01:00
if ( disconnectedButResumeable ) {
return ;
}
2014-09-11 09:49:16 +02:00
// If we are able to resume the stream, then don't set
// connected/authenticated/usingTLS to false since we like behave like we are still
2015-03-04 21:44:43 +01:00
// connected (e.g. sendStanza should not throw a NotConnectedException).
2014-09-11 09:49:16 +02:00
if ( isSmResumptionPossible ( ) & & instant ) {
disconnectedButResumeable = true ;
} else {
disconnectedButResumeable = false ;
2015-03-10 13:17:18 +01:00
// Reset the stream management session id to null, since if the stream is cleanly closed, i.e. sending a closing
// stream tag, there is no longer a stream to resume.
smSessionId = null ;
2019-03-09 19:19:20 +01:00
// Note that we deliberately do not reset authenticatedConnectionInitiallyEstablishedTimestamp here, so that the
// information is available in the connectionClosedOnError() listeners.
2014-09-11 09:49:16 +02:00
}
2014-11-16 18:53:31 +01:00
authenticated = false ;
connected = false ;
2016-11-20 19:32:26 +01:00
secureSocket = null ;
2013-06-22 19:02:09 +02:00
reader = null ;
writer = null ;
2015-02-08 14:10:47 +01:00
2019-03-09 17:13:54 +01:00
initState ( ) ;
}
2014-09-11 09:49:16 +02:00
@Override
2015-09-25 18:00:32 +02:00
public void sendNonza ( Nonza element ) throws NotConnectedException , InterruptedException {
2014-09-11 09:49:16 +02:00
packetWriter . sendStreamElement ( element ) ;
2006-09-14 21:21:38 +02:00
}
2006-09-16 00:42:06 +02:00
2014-05-15 15:04:46 +02:00
@Override
2015-09-25 18:00:32 +02:00
protected void sendStanzaInternal ( Stanza packet ) throws NotConnectedException , InterruptedException {
2014-09-11 09:49:16 +02:00
packetWriter . sendStreamElement ( packet ) ;
if ( isSmEnabled ( ) ) {
2015-02-26 18:41:17 +01:00
for ( StanzaFilter requestAckPredicate : requestAckPredicates ) {
2014-09-11 09:49:16 +02:00
if ( requestAckPredicate . accept ( packet ) ) {
requestSmAcknowledgementInternal ( ) ;
break ;
}
}
}
2003-01-13 17:58:47 +01:00
}
2018-12-17 21:16:03 +01:00
private void connectUsingConfiguration ( ) throws ConnectionException , IOException , InterruptedException {
2020-04-04 13:03:31 +02:00
RemoteXmppTcpConnectionEndpoints . Result < Rfc6120TcpRemoteConnectionEndpoint > result = RemoteXmppTcpConnectionEndpoints . lookup ( config ) ;
List < RemoteConnectionException < Rfc6120TcpRemoteConnectionEndpoint > > connectionExceptions = new ArrayList < > ( ) ;
2015-01-04 16:38:45 +01:00
SocketFactory socketFactory = config . getSocketFactory ( ) ;
2015-09-25 18:00:32 +02:00
ProxyInfo proxyInfo = config . getProxyInfo ( ) ;
int timeout = config . getConnectTimeout ( ) ;
2015-01-04 16:38:45 +01:00
if ( socketFactory = = null ) {
socketFactory = SocketFactory . getDefault ( ) ;
}
2020-04-04 13:03:31 +02:00
for ( Rfc6120TcpRemoteConnectionEndpoint endpoint : result . discoveredRemoteConnectionEndpoints ) {
Iterator < ? extends InetAddress > inetAddresses ;
String host = endpoint . getHost ( ) . toString ( ) ;
UInt16 portUint16 = endpoint . getPort ( ) ;
int port = portUint16 . intValue ( ) ;
2016-05-03 09:59:20 +02:00
if ( proxyInfo = = null ) {
2020-04-04 13:03:31 +02:00
inetAddresses = endpoint . getInetAddresses ( ) . iterator ( ) ;
2019-07-24 09:18:39 +02:00
assert inetAddresses . hasNext ( ) ;
2016-10-31 10:45:38 +01:00
2016-05-03 09:59:20 +02:00
innerloop : while ( inetAddresses . hasNext ( ) ) {
2016-06-19 12:06:21 +02:00
// Create a *new* Socket before every connection attempt, i.e. connect() call, since Sockets are not
// re-usable after a failed connection attempt. See also SMACK-724.
2018-12-17 21:16:03 +01:00
SmackFuture . SocketFuture socketFuture = new SmackFuture . SocketFuture ( socketFactory ) ;
2016-06-19 12:06:21 +02:00
2016-05-03 09:59:20 +02:00
final InetAddress inetAddress = inetAddresses . next ( ) ;
2018-12-17 21:16:03 +01:00
final InetSocketAddress inetSocketAddress = new InetSocketAddress ( inetAddress , port ) ;
2018-12-21 12:05:14 +01:00
LOGGER . finer ( " Trying to establish TCP connection to " + inetSocketAddress ) ;
2018-12-17 21:16:03 +01:00
socketFuture . connectAsync ( inetSocketAddress , timeout ) ;
2016-05-03 09:59:20 +02:00
try {
2018-12-17 21:16:03 +01:00
socket = socketFuture . getOrThrow ( ) ;
} catch ( IOException e ) {
2020-04-04 13:03:31 +02:00
RemoteConnectionException < Rfc6120TcpRemoteConnectionEndpoint > rce = new RemoteConnectionException < > (
endpoint , inetAddress , e ) ;
connectionExceptions . add ( rce ) ;
2016-05-03 09:59:20 +02:00
if ( inetAddresses . hasNext ( ) ) {
continue innerloop ;
} else {
break innerloop ;
}
2016-01-07 19:22:03 +01:00
}
2018-12-21 12:05:14 +01:00
LOGGER . finer ( " Established TCP connection to " + inetSocketAddress ) ;
2016-05-03 09:59:20 +02:00
// We found a host to connect to, return here
this . host = host ;
2020-04-04 13:03:31 +02:00
this . port = portUint16 ;
2016-05-03 09:59:20 +02:00
return ;
}
} else {
2020-04-04 13:03:31 +02:00
// TODO: Move this into the inner-loop above. There appears no reason why we should not try a proxy
// connection to every inet address of each connection endpoint.
2017-01-08 22:58:23 +01:00
socket = socketFactory . createSocket ( ) ;
2020-04-04 13:03:31 +02:00
StringUtils . requireNotNullNorEmpty ( host , " Host of endpoint " + endpoint + " must not be null when using a Proxy " ) ;
2016-05-03 09:59:20 +02:00
final String hostAndPort = host + " at port " + port ;
LOGGER . finer ( " Trying to establish TCP connection via Proxy to " + hostAndPort ) ;
try {
proxyInfo . getProxySocketConnection ( ) . connect ( socket , host , port , timeout ) ;
} catch ( IOException e ) {
2019-11-18 17:44:08 +01:00
CloseableUtil . maybeClose ( socket , LOGGER ) ;
2020-04-04 13:03:31 +02:00
RemoteConnectionException < Rfc6120TcpRemoteConnectionEndpoint > rce = new RemoteConnectionException < > ( endpoint , null , e ) ;
connectionExceptions . add ( rce ) ;
2016-05-03 09:59:20 +02:00
continue ;
2016-01-07 19:22:03 +01:00
}
2016-05-03 09:59:20 +02:00
LOGGER . finer ( " Established TCP connection to " + hostAndPort ) ;
2016-01-07 19:22:03 +01:00
// We found a host to connect to, return here
this . host = host ;
2020-04-04 13:03:31 +02:00
this . port = portUint16 ;
2016-01-07 19:22:03 +01:00
return ;
2006-09-15 23:51:08 +02:00
}
2003-12-19 18:54:38 +01:00
}
2020-04-04 13:03:31 +02:00
2015-01-25 00:05:06 +01:00
// There are no more host addresses to try
// throw an exception and report all tried
// HostAddresses in the exception
2020-04-04 13:03:31 +02:00
throw EndpointConnectionException . from ( result . lookupFailures , connectionExceptions ) ;
2003-12-19 18:54:38 +01:00
}
2003-01-13 17:58:47 +01:00
/ * *
2018-03-31 14:17:30 +02:00
* Initializes the connection by creating a stanza reader and writer and opening a
2003-04-18 06:38:27 +02:00
* XMPP stream to the server .
2003-01-13 17:58:47 +01:00
*
* @throws XMPPException if establishing a connection to the server fails .
2017-12-13 23:10:11 +01:00
* @throws SmackException if the server fails to respond back or if there is anther error .
2019-10-30 12:02:36 +01:00
* @throws IOException if an I / O error occurred .
2019-08-30 12:08:30 +02:00
* @throws InterruptedException if the calling thread was interrupted .
2003-01-13 17:58:47 +01:00
* /
2019-02-06 21:49:58 +01:00
private void initConnection ( ) throws IOException , InterruptedException {
2013-03-18 20:58:48 +01:00
compressionHandler = null ;
2006-11-10 20:06:33 +01:00
2005-08-27 04:33:08 +02:00
// Set the reader and writer instance variables
initReaderAndWriter ( ) ;
2003-01-13 17:58:47 +01:00
2018-02-09 10:26:42 +01:00
// Start the writer thread. This will open an XMPP stream to the server
2015-01-25 00:21:19 +01:00
packetWriter . init ( ) ;
2018-02-09 10:26:42 +01:00
// Start the reader thread. The startup() method will block until we
2015-01-25 00:21:19 +01:00
// get an opening stream packet back from server
packetReader . init ( ) ;
2005-08-27 04:33:08 +02:00
}
2015-01-25 00:21:19 +01:00
private void initReaderAndWriter ( ) throws IOException {
InputStream is = socket . getInputStream ( ) ;
OutputStream os = socket . getOutputStream ( ) ;
if ( compressionHandler ! = null ) {
is = compressionHandler . getInputStream ( is ) ;
os = compressionHandler . getOutputStream ( os ) ;
2005-08-27 04:33:08 +02:00
}
2015-01-25 00:21:19 +01:00
// OutputStreamWriter is already buffered, no need to wrap it into a BufferedWriter
writer = new OutputStreamWriter ( os , " UTF-8 " ) ;
reader = new BufferedReader ( new InputStreamReader ( is , " UTF-8 " ) ) ;
2005-08-27 04:33:08 +02:00
// If debugging is enabled, we open a window and write out all network traffic.
2010-02-09 12:55:56 +01:00
initDebugger ( ) ;
2003-12-19 18:54:38 +01:00
}
2005-08-27 04:33:08 +02:00
/ * *
* The server has indicated that TLS negotiation can start . We now need to secure the
* existing plain connection and perform a handshake . This method won ' t return until the
2014-04-22 22:45:20 +02:00
* connection has finished the handshake or an error occurred while securing the connection .
2019-10-30 12:02:36 +01:00
* @throws IOException if an I / O error occurred .
2020-05-26 21:39:08 +02:00
* @throws SecurityNotPossibleException if TLS is not possible .
* @throws CertificateException if there is an issue with the certificate .
2005-08-27 04:33:08 +02:00
* /
2017-03-06 17:17:15 +01:00
@SuppressWarnings ( " LiteralClassName " )
2020-05-26 21:39:08 +02:00
private void proceedTLSReceived ( ) throws IOException , SecurityNotPossibleException , CertificateException {
2019-02-04 08:59:39 +01:00
SmackTlsContext smackTlsContext = getSmackTlsContext ( ) ;
2016-01-19 17:04:11 +01:00
2005-09-06 00:06:40 +02:00
Socket plain = socket ;
// Secure the plain connection
2019-02-04 08:59:39 +01:00
socket = smackTlsContext . sslContext . getSocketFactory ( ) . createSocket ( plain ,
2018-04-09 08:18:13 +02:00
config . getXMPPServiceDomain ( ) . toString ( ) , plain . getPort ( ) , true ) ;
2014-04-28 08:29:12 +02:00
2014-05-29 09:21:04 +02:00
final SSLSocket sslSocket = ( SSLSocket ) socket ;
2016-02-08 10:07:14 +01:00
// Immediately set the enabled SSL protocols and ciphers. See SMACK-712 why this is
// important (at least on certain platforms) and it seems to be a good idea anyways to
// prevent an accidental implicit handshake.
2014-05-29 09:21:04 +02:00
TLSUtils . setEnabledProtocolsAndCiphers ( sslSocket , config . getEnabledSSLProtocols ( ) , config . getEnabledSSLCiphers ( ) ) ;
2016-02-08 10:07:14 +01:00
// Initialize the reader and writer with the new secured version
initReaderAndWriter ( ) ;
2014-05-29 09:21:04 +02:00
// Proceed to do the handshake
sslSocket . startHandshake ( ) ;
2019-02-04 08:59:39 +01:00
if ( smackTlsContext . daneVerifier ! = null ) {
2019-02-04 09:47:59 +01:00
smackTlsContext . daneVerifier . finish ( sslSocket . getSession ( ) ) ;
2016-10-31 10:45:38 +01:00
}
2014-07-21 18:42:44 +02:00
final HostnameVerifier verifier = getConfiguration ( ) . getHostnameVerifier ( ) ;
2014-07-26 21:55:52 +02:00
if ( verifier = = null ) {
throw new IllegalStateException ( " No HostnameVerifier set. Use connectionConfiguration.setHostnameVerifier() to configure. " ) ;
2019-05-16 14:09:05 +02:00
}
final String verifierHostname ;
{
DnsName xmppServiceDomainDnsName = getConfiguration ( ) . getXmppServiceDomainAsDnsNameIfPossible ( ) ;
// Try to convert the XMPP service domain, which potentially includes Unicode characters, into ASCII
// Compatible Encoding (ACE) to match RFC3280 dNSname IA5String constraint.
// See also: https://bugzilla.mozilla.org/show_bug.cgi?id=280839#c1
if ( xmppServiceDomainDnsName ! = null ) {
verifierHostname = xmppServiceDomainDnsName . ace ;
}
else {
LOGGER . log ( Level . WARNING , " XMPP service domain name ' " + getXMPPServiceDomain ( )
+ " ' can not be represented as DNS name. TLS X.509 certificate validiation may fail. " ) ;
verifierHostname = getXMPPServiceDomain ( ) . toString ( ) ;
}
}
2019-05-16 16:50:47 +02:00
final boolean verificationSuccessful ;
// Verify the TLS session.
verificationSuccessful = verifier . verify ( verifierHostname , sslSocket . getSession ( ) ) ;
if ( ! verificationSuccessful ) {
2019-05-16 14:09:05 +02:00
throw new CertificateException (
" Hostname verification of certificate failed. Certificate does not authenticate "
+ getXMPPServiceDomain ( ) ) ;
2014-07-21 18:42:44 +02:00
}
2005-09-06 00:06:40 +02:00
// Set that TLS was successful
2016-11-20 19:32:26 +01:00
secureSocket = sslSocket ;
2006-01-16 18:34:56 +01:00
}
/ * *
2013-02-26 10:26:41 +01:00
* Returns the compression handler that can be used for one compression methods offered by the server .
2018-05-09 23:06:12 +02:00
*
2013-02-26 10:26:41 +01:00
* @return a instance of XMPPInputOutputStream or null if no suitable instance was found
2018-05-09 23:06:12 +02:00
*
2006-01-16 18:34:56 +01:00
* /
2015-09-25 18:00:32 +02:00
private static XMPPInputOutputStream maybeGetCompressionHandler ( Compress . Feature compression ) {
2017-12-13 23:10:11 +01:00
for ( XMPPInputOutputStream handler : SmackConfiguration . getCompressionHandlers ( ) ) {
2013-02-26 10:26:41 +01:00
String method = handler . getCompressionMethod ( ) ;
2014-09-11 09:49:16 +02:00
if ( compression . getMethods ( ) . contains ( method ) )
2013-02-26 10:26:41 +01:00
return handler ;
}
return null ;
2006-01-16 18:34:56 +01:00
}
2014-05-26 22:02:04 +02:00
@Override
2006-01-16 18:34:56 +01:00
public boolean isUsingCompression ( ) {
2020-05-26 21:39:08 +02:00
return compressionHandler ! = null & & compressSyncPoint ;
2006-01-16 18:34:56 +01:00
}
/ * *
2014-09-11 09:49:16 +02:00
* < p >
2006-01-16 18:34:56 +01:00
* Starts using stream compression that will compress network traffic . Traffic can be
* reduced up to 90 % . Therefore , stream compression is ideal when using a slow speed network
* connection . However , the server and the client will need to use more CPU time in order to
2014-04-23 09:50:39 +02:00
* un / compress network data so under high load the server performance might be affected .
2014-09-11 09:49:16 +02:00
* < / p >
2014-04-23 09:50:39 +02:00
* < p >
2006-01-16 18:34:56 +01:00
* Stream compression has to have been previously offered by the server . Currently only the
* zlib method is supported by the client . Stream compression negotiation has to be done
2014-09-11 09:49:16 +02:00
* before authentication took place .
* < / p >
2006-01-16 18:34:56 +01:00
*
2019-08-30 12:08:30 +02:00
* @throws NotConnectedException if the XMPP connection is not connected .
* @throws SmackException if Smack detected an exceptional situation .
* @throws InterruptedException if the calling thread was interrupted .
2020-05-26 21:39:08 +02:00
* @throws XMPPException if an XMPP protocol error was received .
2006-01-16 18:34:56 +01:00
* /
2020-05-26 21:39:08 +02:00
private void maybeEnableCompression ( ) throws SmackException , InterruptedException , XMPPException {
2015-09-25 18:00:32 +02:00
if ( ! config . isCompressionEnabled ( ) ) {
return ;
}
2018-12-20 16:52:44 +01:00
2020-07-23 14:32:14 +02:00
Compress . Feature compression = getFeature ( Compress . Feature . class ) ;
2015-09-25 18:00:32 +02:00
if ( compression = = null ) {
// Server does not support compression
return ;
}
2006-01-16 18:34:56 +01:00
// If stream compression was offered by the server and we want to use
// compression then send compression request to the server
2015-09-25 18:00:32 +02:00
if ( ( compressionHandler = maybeGetCompressionHandler ( compression ) ) ! = null ) {
2020-06-12 18:15:24 +02:00
compressSyncPoint = false ;
2020-05-26 21:39:08 +02:00
sendNonza ( new Compress ( compressionHandler . getCompressionMethod ( ) ) ) ;
waitForConditionOrThrowConnectionException ( ( ) - > compressSyncPoint , " establishing stream compression " ) ;
2014-09-11 09:49:16 +02:00
} else {
LOGGER . warning ( " Could not enable compression because no matching handler/method pair was found " ) ;
2014-04-27 12:27:12 +02:00
}
2006-01-16 18:34:56 +01:00
}
2007-03-20 23:09:15 +01:00
2006-09-16 00:42:06 +02:00
/ * *
2015-09-25 18:00:32 +02:00
* Establishes a connection to the XMPP server . It basically
* creates and maintains a socket connection to the server .
* < p >
2006-09-14 21:21:38 +02:00
* Listeners will be preserved from a previous connection if the reconnection
* occurs after an abrupt termination .
2015-09-25 18:00:32 +02:00
* < / p >
2006-09-16 00:42:06 +02:00
*
2006-09-14 21:21:38 +02:00
* @throws XMPPException if an error occurs while trying to establish the connection .
2019-08-30 12:08:30 +02:00
* @throws SmackException if Smack detected an exceptional situation .
2019-10-30 12:02:36 +01:00
* @throws IOException if an I / O error occurred .
2019-08-30 12:08:30 +02:00
* @throws InterruptedException if the calling thread was interrupted .
2006-09-14 21:21:38 +02:00
* /
2014-04-27 12:27:12 +02:00
@Override
2015-09-25 18:00:32 +02:00
protected void connectInternal ( ) throws SmackException , IOException , XMPPException , InterruptedException {
2015-01-25 16:25:22 +01:00
// Establishes the TCP connection to the server and does setup the reader and writer. Throws an exception if
// there is an error establishing the connection
2015-01-25 00:21:19 +01:00
connectUsingConfiguration ( ) ;
2015-01-25 16:25:22 +01:00
2020-08-06 10:28:07 +02:00
connected = true ;
2015-01-25 16:25:22 +01:00
// We connected successfully to the servers TCP port
2015-01-25 00:05:06 +01:00
initConnection ( ) ;
2018-02-20 09:01:11 +01:00
2020-05-26 21:39:08 +02:00
// TLS handled will be true either if TLS was established, or if it was not mandatory.
waitForConditionOrThrowConnectionException ( ( ) - > tlsHandled , " establishing TLS " ) ;
2018-02-20 09:01:11 +01:00
// Wait with SASL auth until the SASL mechanisms have been received
2020-05-26 21:39:08 +02:00
waitForConditionOrThrowConnectionException ( ( ) - > saslFeatureReceived , " SASL mechanisms stream feature from server " ) ;
2006-09-14 21:21:38 +02:00
}
2006-09-16 00:42:06 +02:00
2014-09-11 09:49:16 +02:00
/ * *
* For unit testing purposes
*
2019-08-30 12:08:30 +02:00
* @param writer TODO javadoc me please
2014-09-11 09:49:16 +02:00
* /
protected void setWriter ( Writer writer ) {
this . writer = writer ;
}
2014-05-15 15:04:46 +02:00
2014-09-11 09:49:16 +02:00
@Override
2019-03-07 09:49:16 +01:00
protected void afterFeaturesReceived ( ) throws NotConnectedException , InterruptedException , SecurityRequiredByServerException {
2020-07-23 14:32:14 +02:00
StartTls startTlsFeature = getFeature ( StartTls . class ) ;
2014-09-11 09:49:16 +02:00
if ( startTlsFeature ! = null ) {
if ( startTlsFeature . required ( ) & & config . getSecurityMode ( ) = = SecurityMode . disabled ) {
2019-03-07 09:49:16 +01:00
SecurityRequiredByServerException smackException = new SecurityRequiredByServerException ( ) ;
2020-05-26 21:39:08 +02:00
currentSmackException = smackException ;
notifyWaitingThreads ( ) ;
2019-03-07 09:49:16 +01:00
throw smackException ;
2014-09-11 09:49:16 +02:00
}
2014-05-15 15:04:46 +02:00
2015-06-25 11:07:25 +02:00
if ( config . getSecurityMode ( ) ! = ConnectionConfiguration . SecurityMode . disabled ) {
2015-09-25 18:00:32 +02:00
sendNonza ( new StartTls ( ) ) ;
2017-01-06 15:03:28 +01:00
} else {
2020-05-26 21:39:08 +02:00
tlsHandled = true ;
notifyWaitingThreads ( ) ;
2014-09-11 09:49:16 +02:00
}
2017-01-06 15:03:28 +01:00
} else {
2020-05-26 21:39:08 +02:00
tlsHandled = true ;
notifyWaitingThreads ( ) ;
2014-09-11 09:49:16 +02:00
}
2014-05-15 15:04:46 +02:00
2019-09-24 23:32:08 +02:00
if ( isSaslAuthenticated ( ) ) {
2014-09-11 09:49:16 +02:00
// If we have received features after the SASL has been successfully completed, then we
// have also *maybe* received, as it is an optional feature, the compression feature
// from the server.
2020-05-26 21:39:08 +02:00
streamFeaturesAfterAuthenticationReceived = true ;
notifyWaitingThreads ( ) ;
2014-09-11 09:49:16 +02:00
}
}
2014-05-15 15:04:46 +02:00
2019-09-24 23:32:08 +02:00
private void resetParser ( ) throws IOException {
try {
packetReader . parser = SmackXmlParser . newXmlParser ( reader ) ;
} catch ( XmlPullParserException e ) {
throw new IOException ( e ) ;
}
}
private void openStreamAndResetParser ( ) throws IOException , NotConnectedException , InterruptedException {
2019-02-04 08:59:39 +01:00
sendStreamOpen ( ) ;
2019-09-24 23:32:08 +02:00
resetParser ( ) ;
2014-09-11 09:49:16 +02:00
}
protected class PacketReader {
2019-03-25 13:06:12 +01:00
private final String threadName = " Smack Reader ( " + getConnectionCounter ( ) + ')' ;
2014-09-11 09:49:16 +02:00
XmlPullParser parser ;
private volatile boolean done ;
2014-05-15 15:04:46 +02:00
2020-05-26 21:39:08 +02:00
private boolean running ;
2014-05-26 22:02:04 +02:00
/ * *
* Initializes the reader in order to be used . The reader is initialized during the
* first connection and when reconnecting due to an abruptly disconnection .
* /
2015-01-25 00:21:19 +01:00
void init ( ) {
2014-05-26 22:02:04 +02:00
done = false ;
2020-06-17 21:55:24 +02:00
running = true ;
2015-01-15 16:12:41 +01:00
Async . go ( new Runnable ( ) {
2017-02-11 16:16:41 +01:00
@Override
2014-05-26 22:02:04 +02:00
public void run ( ) {
2019-03-25 13:06:12 +01:00
LOGGER . finer ( threadName + " start " ) ;
2019-02-06 21:49:58 +01:00
try {
parsePackets ( ) ;
} finally {
2019-03-25 13:06:12 +01:00
LOGGER . finer ( threadName + " exit " ) ;
2020-05-26 21:39:08 +02:00
running = false ;
notifyWaitingThreads ( ) ;
2019-02-06 21:49:58 +01:00
}
2014-05-26 22:02:04 +02:00
}
2019-03-25 13:06:12 +01:00
} , threadName ) ;
2014-09-11 09:49:16 +02:00
}
2014-05-15 15:04:46 +02:00
2014-05-26 22:02:04 +02:00
/ * *
2018-03-31 14:17:30 +02:00
* Shuts the stanza reader down . This method simply sets the ' done ' flag to true .
2014-05-26 22:02:04 +02:00
* /
void shutdown ( ) {
done = true ;
}
/ * *
* Parse top - level packets in order to process them further .
* /
2014-09-11 09:49:16 +02:00
private void parsePackets ( ) {
2014-05-26 22:02:04 +02:00
try {
2019-09-24 23:32:08 +02:00
openStreamAndResetParser ( ) ;
2019-05-06 22:06:13 +02:00
XmlPullParser . Event eventType = parser . getEventType ( ) ;
2015-01-14 17:07:35 +01:00
while ( ! done ) {
2014-10-21 23:37:37 +02:00
switch ( eventType ) {
2019-05-06 22:06:13 +02:00
case START_ELEMENT :
2014-09-11 09:49:16 +02:00
final String name = parser . getName ( ) ;
switch ( name ) {
case Message . ELEMENT :
2015-01-11 21:52:06 +01:00
case IQ . IQ_ELEMENT :
2014-09-11 09:49:16 +02:00
case Presence . ELEMENT :
try {
2014-12-28 17:43:39 +01:00
parseAndProcessStanza ( parser ) ;
2014-09-11 09:49:16 +02:00
} finally {
clientHandledStanzasCount = SMUtils . incrementHeight ( clientHandledStanzasCount ) ;
2014-05-26 22:02:04 +02:00
}
2014-09-11 09:49:16 +02:00
break ;
case " stream " :
2019-02-04 08:59:39 +01:00
onStreamOpen ( parser ) ;
2014-09-11 09:49:16 +02:00
break ;
case " error " :
2017-01-06 15:03:28 +01:00
StreamError streamError = PacketParserUtils . parseStreamError ( parser ) ;
2020-05-31 19:44:53 +02:00
// Stream errors are non recoverable, throw this exceptions. Also note that this will set
// this exception as current connection exceptions and notify any waiting threads.
2017-01-06 15:03:28 +01:00
throw new StreamErrorException ( streamError ) ;
2014-09-11 09:49:16 +02:00
case " features " :
2019-02-04 08:59:39 +01:00
parseFeaturesAndNotify ( parser ) ;
2014-09-11 09:49:16 +02:00
break ;
case " proceed " :
2020-05-26 21:39:08 +02:00
// Secure the connection by negotiating TLS
proceedTLSReceived ( ) ;
// Send a new opening stream to the server
openStreamAndResetParser ( ) ;
2014-09-11 09:49:16 +02:00
break ;
case " failure " :
2014-05-26 22:02:04 +02:00
String namespace = parser . getNamespace ( null ) ;
2014-09-11 09:49:16 +02:00
switch ( namespace ) {
case " urn:ietf:params:xml:ns:xmpp-tls " :
2014-05-26 22:02:04 +02:00
// TLS negotiation has failed. The server will close the connection
2014-09-11 09:49:16 +02:00
// TODO Parse failure stanza
2019-02-10 21:39:48 +01:00
throw new SmackException . SmackMessageException ( " TLS negotiation has failed " ) ;
2014-09-11 09:49:16 +02:00
case " http://jabber.org/protocol/compress " :
2014-05-26 22:02:04 +02:00
// Stream compression has been denied. This is a recoverable
// situation. It is still possible to authenticate and
// use the connection but using an uncompressed connection
2014-09-11 09:49:16 +02:00
// TODO Parse failure stanza
2020-05-26 21:39:08 +02:00
currentSmackException = new SmackException . SmackMessageException ( " Could not establish compression " ) ;
notifyWaitingThreads ( ) ;
2014-09-11 09:49:16 +02:00
break ;
2019-09-24 23:32:08 +02:00
default :
parseAndProcessNonza ( parser ) ;
2014-05-26 22:02:04 +02:00
}
2014-09-11 09:49:16 +02:00
break ;
case Compressed . ELEMENT :
2014-05-26 22:02:04 +02:00
// Server confirmed that it's possible to use stream compression. Start
// stream compression
2014-09-11 09:49:16 +02:00
// Initialize the reader and writer with the new compressed version
initReaderAndWriter ( ) ;
// Send a new opening stream to the server
2019-09-24 23:32:08 +02:00
openStreamAndResetParser ( ) ;
2014-09-11 09:49:16 +02:00
// Notify that compression is being used
2020-05-26 21:39:08 +02:00
compressSyncPoint = true ;
notifyWaitingThreads ( ) ;
2014-09-11 09:49:16 +02:00
break ;
case Enabled . ELEMENT :
Enabled enabled = ParseStreamManagement . enabled ( parser ) ;
if ( enabled . isResumeSet ( ) ) {
smSessionId = enabled . getId ( ) ;
if ( StringUtils . isNullOrEmpty ( smSessionId ) ) {
2019-02-10 21:39:48 +01:00
SmackException xmppException = new SmackException . SmackMessageException ( " Stream Management 'enabled' element with resume attribute but without session id received " ) ;
2020-05-26 21:39:08 +02:00
setCurrentConnectionExceptionAndNotify ( xmppException ) ;
2014-09-11 09:49:16 +02:00
throw xmppException ;
}
2017-12-13 23:10:11 +01:00
smServerMaxResumptionTime = enabled . getMaxResumptionTime ( ) ;
2014-09-11 09:49:16 +02:00
} else {
2015-01-15 16:11:49 +01:00
// Mark this a non-resumable stream by setting smSessionId to null
2014-09-11 09:49:16 +02:00
smSessionId = null ;
}
2015-01-15 14:42:08 +01:00
clientHandledStanzasCount = 0 ;
2014-10-29 10:06:33 +01:00
smWasEnabledAtLeastOnce = true ;
2020-05-26 21:39:08 +02:00
smEnabledSyncPoint = true ;
notifyWaitingThreads ( ) ;
2014-09-11 09:49:16 +02:00
break ;
case Failed . ELEMENT :
Failed failed = ParseStreamManagement . failed ( parser ) ;
2020-05-26 21:39:08 +02:00
if ( smResumedSyncPoint = = SyncPointState . request_sent ) {
// This is a <failed/> nonza in a response to resuming a previous stream, failure to do
// so is non-fatal as we can simply continue with resource binding in this case.
smResumptionFailed = failed ;
2020-06-12 16:35:32 +02:00
notifyWaitingThreads ( ) ;
2020-05-26 21:39:08 +02:00
} else {
FailedNonzaException xmppException = new FailedNonzaException ( failed , failed . getStanzaErrorCondition ( ) ) ;
setCurrentConnectionExceptionAndNotify ( xmppException ) ;
2014-09-11 09:49:16 +02:00
}
break ;
case Resumed . ELEMENT :
Resumed resumed = ParseStreamManagement . resumed ( parser ) ;
if ( ! smSessionId . equals ( resumed . getPrevId ( ) ) ) {
throw new StreamIdDoesNotMatchException ( smSessionId , resumed . getPrevId ( ) ) ;
}
2017-11-16 15:30:16 +01:00
// Mark SM as enabled
2020-05-26 21:39:08 +02:00
smEnabledSyncPoint = true ;
2014-09-11 09:49:16 +02:00
// First, drop the stanzas already handled by the server
processHandledCount ( resumed . getHandledCount ( ) ) ;
// Then re-send what is left in the unacknowledged queue
2015-10-21 07:42:47 +02:00
List < Stanza > stanzasToResend = new ArrayList < > ( unacknowledgedStanzas . size ( ) ) ;
unacknowledgedStanzas . drainTo ( stanzasToResend ) ;
2015-02-05 11:17:27 +01:00
for ( Stanza stanza : stanzasToResend ) {
2015-10-21 07:42:47 +02:00
sendStanzaInternal ( stanza ) ;
2014-09-11 09:49:16 +02:00
}
2015-05-19 14:21:52 +02:00
// If there where stanzas resent, then request a SM ack for them.
// Writer's sendStreamElement() won't do it automatically based on
// predicates.
if ( ! stanzasToResend . isEmpty ( ) ) {
requestSmAcknowledgementInternal ( ) ;
}
2017-11-16 15:30:16 +01:00
// Mark SM resumption as successful
2020-05-26 21:39:08 +02:00
smResumedSyncPoint = SyncPointState . successful ;
notifyWaitingThreads ( ) ;
2014-09-11 09:49:16 +02:00
break ;
case AckAnswer . ELEMENT :
AckAnswer ackAnswer = ParseStreamManagement . ackAnswer ( parser ) ;
processHandledCount ( ackAnswer . getHandledCount ( ) ) ;
break ;
case AckRequest . ELEMENT :
2014-10-28 12:22:05 +01:00
ParseStreamManagement . ackRequest ( parser ) ;
2020-05-26 21:39:08 +02:00
if ( smEnabledSyncPoint ) {
2014-10-28 12:22:05 +01:00
sendSmAcknowledgementInternal ( ) ;
2014-09-11 09:49:16 +02:00
} else {
LOGGER . warning ( " SM Ack Request received while SM is not enabled " ) ;
}
break ;
default :
2019-09-24 23:32:08 +02:00
parseAndProcessNonza ( parser ) ;
2014-09-11 09:49:16 +02:00
break ;
2014-05-26 22:02:04 +02:00
}
2014-10-21 23:37:37 +02:00
break ;
2019-05-06 22:06:13 +02:00
case END_ELEMENT :
2017-10-27 17:44:48 +02:00
final String endTagName = parser . getName ( ) ;
if ( " stream " . equals ( endTagName ) ) {
2015-09-25 18:00:32 +02:00
if ( ! parser . getNamespace ( ) . equals ( " http://etherx.jabber.org/streams " ) ) {
LOGGER . warning ( XMPPTCPConnection . this + " </stream> but different namespace " + parser . getNamespace ( ) ) ;
break ;
}
// Check if the queue was already shut down before reporting success on closing stream tag
// received. This avoids a race if there is a disconnect(), followed by a connect(), which
// did re-start the queue again, causing this writer to assume that the queue is not
// shutdown, which results in a call to disconnect().
final boolean queueWasShutdown = packetWriter . queue . isShutdown ( ) ;
2020-05-26 21:39:08 +02:00
closingStreamReceived = true ;
notifyWaitingThreads ( ) ;
2015-09-25 18:00:32 +02:00
if ( queueWasShutdown ) {
// We received a closing stream element *after* we initiated the
// termination of the session by sending a closing stream element to
// the server first
return ;
} else {
// We received a closing stream element from the server without us
// sending a closing stream element first. This means that the
// server wants to terminate the session, therefore disconnect
// the connection
LOGGER . info ( XMPPTCPConnection . this
+ " received closing </stream> element. "
+ " Server wants to terminate the connection, calling disconnect() " ) ;
2019-03-06 22:11:45 +01:00
ASYNC_BUT_ORDERED . performAsyncButOrdered ( XMPPTCPConnection . this , new Runnable ( ) {
@Override
public void run ( ) {
disconnect ( ) ;
} } ) ;
2015-09-25 18:00:32 +02:00
}
2014-05-26 22:02:04 +02:00
}
2014-10-21 23:37:37 +02:00
break ;
2019-05-06 22:06:13 +02:00
case END_DOCUMENT :
2015-01-14 17:07:35 +01:00
// END_DOCUMENT only happens in an error case, as otherwise we would see a
// closing stream element before.
2019-02-10 21:39:48 +01:00
throw new SmackException . SmackMessageException (
2015-01-14 17:07:35 +01:00
" Parser got END_DOCUMENT event. This could happen e.g. if the server closed the connection without sending a closing stream element " ) ;
2019-05-06 22:06:13 +02:00
default :
// Catch all for incomplete switch (MissingCasesInEnumSwitch) statement.
break ;
2014-05-26 22:02:04 +02:00
}
eventType = parser . next ( ) ;
2014-10-21 23:37:37 +02:00
}
2013-03-18 20:58:48 +01:00
}
catch ( Exception e ) {
2014-05-26 22:02:04 +02:00
// The exception can be ignored if the the connection is 'done'
2020-05-26 21:39:08 +02:00
// or if the it was caused because the socket got closed.
if ( ! done ) {
2020-06-17 20:34:36 +02:00
// Set running to false since this thread will exit here and notifyConnectionError() will wait until
// the reader and writer thread's 'running' value is false.
running = false ;
2014-05-26 22:02:04 +02:00
// Close the connection and notify connection listeners of the
// error.
notifyConnectionError ( e ) ;
}
2013-03-18 20:58:48 +01:00
}
}
2014-05-26 22:02:04 +02:00
}
protected class PacketWriter {
2014-09-11 09:49:16 +02:00
public static final int QUEUE_SIZE = XMPPTCPConnection . QUEUE_SIZE ;
2020-04-07 23:10:35 +02:00
public static final int UNACKKNOWLEDGED_STANZAS_QUEUE_SIZE = 1024 ;
public static final int UNACKKNOWLEDGED_STANZAS_QUEUE_SIZE_HIGH_WATER_MARK = ( int ) ( 0 . 3 * UNACKKNOWLEDGED_STANZAS_QUEUE_SIZE ) ;
2014-05-26 22:02:04 +02:00
2019-03-25 13:06:12 +01:00
private final String threadName = " Smack Writer ( " + getConnectionCounter ( ) + ')' ;
2017-12-13 23:10:11 +01:00
private final ArrayBlockingQueueWithShutdown < Element > queue = new ArrayBlockingQueueWithShutdown < > (
2014-09-11 09:49:16 +02:00
QUEUE_SIZE , true ) ;
2014-05-26 22:02:04 +02:00
/ * *
2018-03-31 14:17:30 +02:00
* If set , the stanza writer is shut down
2014-05-26 22:02:04 +02:00
* /
2014-09-11 09:49:16 +02:00
protected volatile Long shutdownTimestamp = null ;
private volatile boolean instantShutdown ;
2014-05-26 22:02:04 +02:00
2015-02-22 10:44:22 +01:00
/ * *
* True if some preconditions are given to start the bundle and defer mechanism .
* < p >
* This will likely get set to true right after the start of the writer thread , because
* { @link # nextStreamElement ( ) } will check if { @link queue } is empty , which is probably the case , and then set
* this field to true .
* < / p >
* /
private boolean shouldBundleAndDefer ;
2020-05-26 21:39:08 +02:00
private boolean running ;
2018-05-09 23:06:12 +02:00
/ * *
* Initializes the writer in order to be used . It is called at the first connection and also
2014-05-26 22:02:04 +02:00
* is invoked if the connection is disconnected by an error .
2018-05-09 23:06:12 +02:00
* /
2014-05-26 22:02:04 +02:00
void init ( ) {
2014-09-11 09:49:16 +02:00
shutdownTimestamp = null ;
if ( unacknowledgedStanzas ! = null ) {
// It's possible that there are new stanzas in the writer queue that
// came in while we were disconnected but resumable, drain those into
// the unacknowledged queue so that they get resent now
drainWriterQueueToUnacknowledgedStanzas ( ) ;
}
2014-05-26 22:02:04 +02:00
queue . start ( ) ;
2020-06-17 21:55:24 +02:00
running = true ;
2015-01-25 00:21:19 +01:00
Async . go ( new Runnable ( ) {
@Override
2014-05-26 22:02:04 +02:00
public void run ( ) {
2019-03-25 13:06:12 +01:00
LOGGER . finer ( threadName + " start " ) ;
2019-02-06 21:49:58 +01:00
try {
writePackets ( ) ;
} finally {
2019-03-25 13:06:12 +01:00
LOGGER . finer ( threadName + " exit " ) ;
2020-05-26 21:39:08 +02:00
running = false ;
notifyWaitingThreads ( ) ;
2019-02-06 21:49:58 +01:00
}
2014-05-26 22:02:04 +02:00
}
2019-03-25 13:06:12 +01:00
} , threadName ) ;
2014-09-11 09:49:16 +02:00
}
private boolean done ( ) {
return shutdownTimestamp ! = null ;
2014-05-26 22:02:04 +02:00
}
2014-11-09 18:30:16 +01:00
protected void throwNotConnectedExceptionIfDoneAndResumptionNotPossible ( ) throws NotConnectedException {
2015-09-25 18:00:32 +02:00
final boolean done = done ( ) ;
if ( done ) {
2017-12-13 23:10:11 +01:00
final boolean smResumptionPossible = isSmResumptionPossible ( ) ;
2014-10-28 13:50:13 +01:00
// Don't throw a NotConnectedException is there is an resumable stream available
2017-12-13 23:10:11 +01:00
if ( ! smResumptionPossible ) {
2015-09-25 18:00:32 +02:00
throw new NotConnectedException ( XMPPTCPConnection . this , " done= " + done
2017-12-13 23:10:11 +01:00
+ " smResumptionPossible= " + smResumptionPossible ) ;
2015-09-25 18:00:32 +02:00
}
2014-10-28 13:50:13 +01:00
}
}
2014-05-26 22:02:04 +02:00
/ * *
2014-09-11 09:49:16 +02:00
* Sends the specified element to the server .
2014-05-26 22:02:04 +02:00
*
2014-09-11 09:49:16 +02:00
* @param element the element to send .
2019-08-30 12:08:30 +02:00
* @throws NotConnectedException if the XMPP connection is not connected .
* @throws InterruptedException if the calling thread was interrupted .
2014-05-26 22:02:04 +02:00
* /
2015-09-25 18:00:32 +02:00
protected void sendStreamElement ( Element element ) throws NotConnectedException , InterruptedException {
2014-10-28 13:50:13 +01:00
throwNotConnectedExceptionIfDoneAndResumptionNotPossible ( ) ;
2015-09-25 18:00:32 +02:00
try {
queue . put ( element ) ;
}
catch ( InterruptedException e ) {
// put() may throw an InterruptedException for two reasons:
// 1. If the queue was shut down
// 2. If the thread was interrupted
// so we have to check which is the case
throwNotConnectedExceptionIfDoneAndResumptionNotPossible ( ) ;
// If the method above did not throw, then the sending thread was interrupted
throw e ;
2014-05-26 22:02:04 +02:00
}
}
/ * *
2018-03-31 14:17:30 +02:00
* Shuts down the stanza writer . Once this method has been called , no further
2014-05-26 22:02:04 +02:00
* packets will be written to the server .
* /
2014-09-11 09:49:16 +02:00
void shutdown ( boolean instant ) {
instantShutdown = instant ;
2014-05-26 22:02:04 +02:00
queue . shutdown ( ) ;
2015-09-25 18:00:32 +02:00
shutdownTimestamp = System . currentTimeMillis ( ) ;
2014-05-26 22:02:04 +02:00
}
/ * *
2014-10-28 13:50:13 +01:00
* Maybe return the next available element from the queue for writing . If the queue is shut down < b > or < / b > a
* spurious interrupt occurs , < code > null < / code > is returned . So it is important to check the ' done ' condition in
* that case .
2014-05-26 22:02:04 +02:00
*
2014-10-28 13:50:13 +01:00
* @return the next element for writing or null .
2014-05-26 22:02:04 +02:00
* /
2014-09-11 09:49:16 +02:00
private Element nextStreamElement ( ) {
2015-02-22 10:44:22 +01:00
// It is important the we check if the queue is empty before removing an element from it
if ( queue . isEmpty ( ) ) {
shouldBundleAndDefer = true ;
}
2014-09-11 09:49:16 +02:00
Element packet = null ;
2014-05-26 22:02:04 +02:00
try {
packet = queue . take ( ) ;
}
catch ( InterruptedException e ) {
2014-10-28 13:50:13 +01:00
if ( ! queue . isShutdown ( ) ) {
2015-01-16 15:55:06 +01:00
// Users shouldn't try to interrupt the packet writer thread
2018-02-09 10:26:42 +01:00
LOGGER . log ( Level . WARNING , " Writer thread was interrupted. Don't do that. Use disconnect() instead. " , e ) ;
2014-10-28 13:50:13 +01:00
}
2014-05-26 22:02:04 +02:00
}
return packet ;
}
2014-09-11 09:49:16 +02:00
private void writePackets ( ) {
2014-05-26 22:02:04 +02:00
try {
// Write out packets from the queue.
2014-09-11 09:49:16 +02:00
while ( ! done ( ) ) {
2014-10-25 11:39:16 +02:00
Element element = nextStreamElement ( ) ;
if ( element = = null ) {
continue ;
}
2015-02-18 14:38:56 +01:00
// Get a local version of the bundle and defer callback, in case it's unset
// between the null check and the method invocation
final BundleAndDeferCallback localBundleAndDeferCallback = bundleAndDeferCallback ;
// If the preconditions are given (e.g. bundleAndDefer callback is set, queue is
// empty), then we could wait a bit for further stanzas attempting to decrease
// our energy consumption
2015-02-22 10:44:22 +01:00
if ( localBundleAndDeferCallback ! = null & & isAuthenticated ( ) & & shouldBundleAndDefer ) {
// Reset shouldBundleAndDefer to false, nextStreamElement() will set it to true once the
// queue is empty again.
shouldBundleAndDefer = false ;
2015-02-18 14:38:56 +01:00
final AtomicBoolean bundlingAndDeferringStopped = new AtomicBoolean ( ) ;
final int bundleAndDeferMillis = localBundleAndDeferCallback . getBundleAndDeferMillis ( new BundleAndDefer (
bundlingAndDeferringStopped ) ) ;
if ( bundleAndDeferMillis > 0 ) {
long remainingWait = bundleAndDeferMillis ;
final long waitStart = System . currentTimeMillis ( ) ;
synchronized ( bundlingAndDeferringStopped ) {
while ( ! bundlingAndDeferringStopped . get ( ) & & remainingWait > 0 ) {
bundlingAndDeferringStopped . wait ( remainingWait ) ;
remainingWait = bundleAndDeferMillis
- ( System . currentTimeMillis ( ) - waitStart ) ;
}
}
}
}
2015-02-05 11:17:27 +01:00
Stanza packet = null ;
if ( element instanceof Stanza ) {
packet = ( Stanza ) element ;
2014-10-25 11:39:16 +02:00
}
2015-03-02 15:12:19 +01:00
else if ( element instanceof Enable ) {
// The client needs to add messages to the unacknowledged stanzas queue
// right after it sent 'enabled'. Stanza will be added once
// unacknowledgedStanzas is not null.
2020-04-07 23:10:35 +02:00
unacknowledgedStanzas = new ArrayBlockingQueue < > ( UNACKKNOWLEDGED_STANZAS_QUEUE_SIZE ) ;
2015-03-02 15:12:19 +01:00
}
2017-03-18 17:41:30 +01:00
maybeAddToUnacknowledgedStanzas ( packet ) ;
2015-09-25 18:00:32 +02:00
2019-09-21 23:00:17 +02:00
CharSequence elementXml = element . toXML ( outgoingStreamXmlEnvironment ) ;
2015-09-25 18:00:32 +02:00
if ( elementXml instanceof XmlStringBuilder ) {
2020-06-15 17:52:28 +02:00
try {
( ( XmlStringBuilder ) elementXml ) . write ( writer , outgoingStreamXmlEnvironment ) ;
} catch ( NullPointerException npe ) {
LOGGER . log ( Level . FINE , " NPE in XmlStringBuilder of " + element . getClass ( ) + " : " + element , npe ) ;
throw npe ;
}
2015-09-25 18:00:32 +02:00
}
else {
writer . write ( elementXml . toString ( ) ) ;
}
2014-10-25 11:39:16 +02:00
if ( queue . isEmpty ( ) ) {
writer . flush ( ) ;
}
if ( packet ! = null ) {
firePacketSendingListeners ( packet ) ;
2014-05-26 22:02:04 +02:00
}
}
2014-09-11 09:49:16 +02:00
if ( ! instantShutdown ) {
2014-10-28 13:50:13 +01:00
// Flush out the rest of the queue.
2014-09-11 09:49:16 +02:00
try {
while ( ! queue . isEmpty ( ) ) {
Element packet = queue . remove ( ) ;
2017-03-18 17:41:30 +01:00
if ( packet instanceof Stanza ) {
Stanza stanza = ( Stanza ) packet ;
maybeAddToUnacknowledgedStanzas ( stanza ) ;
}
2019-02-04 13:27:41 +01:00
writer . write ( packet . toXML ( ) . toString ( ) ) ;
2014-09-11 09:49:16 +02:00
}
}
catch ( Exception e ) {
LOGGER . log ( Level . WARNING ,
" Exception flushing queue during shutdown, ignore and continue " ,
e ) ;
2014-05-26 22:02:04 +02:00
}
2014-09-11 09:49:16 +02:00
// Close the stream.
2014-05-26 22:02:04 +02:00
try {
2014-09-11 09:49:16 +02:00
writer . write ( " </stream:stream> " ) ;
writer . flush ( ) ;
2014-05-26 22:02:04 +02:00
}
catch ( Exception e ) {
2014-09-11 09:49:16 +02:00
LOGGER . log ( Level . WARNING , " Exception writing closing stream element " , e ) ;
2014-05-26 22:02:04 +02:00
}
2015-09-25 18:00:32 +02:00
2014-09-11 09:49:16 +02:00
// Delete the queue contents (hopefully nothing is left).
queue . clear ( ) ;
} else if ( instantShutdown & & isSmEnabled ( ) ) {
// This was an instantShutdown and SM is enabled, drain all remaining stanzas
// into the unacknowledgedStanzas queue
drainWriterQueueToUnacknowledgedStanzas ( ) ;
2014-05-26 22:02:04 +02:00
}
2015-09-25 18:00:32 +02:00
// Do *not* close the writer here, as it will cause the socket
// to get closed. But we may want to receive further stanzas
// until the closing stream tag is received. The socket will be
// closed in shutdown().
2014-05-26 22:02:04 +02:00
}
2014-09-11 09:49:16 +02:00
catch ( Exception e ) {
2014-05-26 22:02:04 +02:00
// The exception can be ignored if the the connection is 'done'
// or if the it was caused because the socket got closed
2015-09-25 18:00:32 +02:00
if ( ! ( done ( ) | | queue . isShutdown ( ) ) ) {
2020-06-17 20:34:36 +02:00
// Set running to false since this thread will exit here and notifyConnectionError() will wait until
// the reader and writer thread's 'running' value is false.
running = false ;
2020-06-17 20:31:28 +02:00
notifyConnectionError ( e ) ;
2014-09-11 09:49:16 +02:00
} else {
LOGGER . log ( Level . FINE , " Ignoring Exception in writePackets() " , e ) ;
2014-05-26 22:02:04 +02:00
}
}
}
2014-09-11 09:49:16 +02:00
private void drainWriterQueueToUnacknowledgedStanzas ( ) {
2017-12-13 23:10:11 +01:00
List < Element > elements = new ArrayList < > ( queue . size ( ) ) ;
2014-09-11 09:49:16 +02:00
queue . drainTo ( elements ) ;
2018-12-08 22:54:13 +01:00
for ( int i = 0 ; i < elements . size ( ) ; i + + ) {
Element element = elements . get ( i ) ;
// If the unacknowledgedStanza queue is full, then bail out with a warning message. See SMACK-844.
if ( unacknowledgedStanzas . remainingCapacity ( ) = = 0 ) {
StreamManagementException . UnacknowledgedQueueFullException exception = StreamManagementException . UnacknowledgedQueueFullException
. newWith ( i , elements , unacknowledgedStanzas ) ;
LOGGER . log ( Level . WARNING ,
" Some stanzas may be lost as not all could be drained to the unacknowledged stanzas queue " , exception ) ;
return ;
}
2015-02-05 11:17:27 +01:00
if ( element instanceof Stanza ) {
unacknowledgedStanzas . add ( ( Stanza ) element ) ;
2014-09-11 09:49:16 +02:00
}
}
}
2017-03-18 17:41:30 +01:00
private void maybeAddToUnacknowledgedStanzas ( Stanza stanza ) throws IOException {
// Check if the stream element should be put to the unacknowledgedStanza
// queue. Note that we can not do the put() in sendStanzaInternal() and the
// packet order is not stable at this point (sendStanzaInternal() can be
// called concurrently).
if ( unacknowledgedStanzas ! = null & & stanza ! = null ) {
2020-04-07 23:10:35 +02:00
// If the unacknowledgedStanza queue reaching its high water mark, request an new ack
2017-03-18 17:41:30 +01:00
// from the server in order to drain it
2020-04-07 23:10:35 +02:00
if ( unacknowledgedStanzas . size ( ) = = UNACKKNOWLEDGED_STANZAS_QUEUE_SIZE_HIGH_WATER_MARK ) {
2019-02-04 13:27:41 +01:00
writer . write ( AckRequest . INSTANCE . toXML ( ) . toString ( ) ) ;
2017-03-18 17:41:30 +01:00
}
2020-04-07 23:13:50 +02:00
2017-03-18 17:41:30 +01:00
try {
// It is important the we put the stanza in the unacknowledged stanza
// queue before we put it on the wire
unacknowledgedStanzas . put ( stanza ) ;
}
catch ( InterruptedException e ) {
throw new IllegalStateException ( e ) ;
}
}
}
2014-09-11 09:49:16 +02:00
}
2014-10-29 10:06:33 +01:00
/ * *
* Set if Stream Management should be used by default for new connections .
2018-05-09 23:06:12 +02:00
*
2014-10-29 10:06:33 +01:00
* @param useSmDefault true to use Stream Management for new connections .
* /
2014-09-11 09:49:16 +02:00
public static void setUseStreamManagementDefault ( boolean useSmDefault ) {
XMPPTCPConnection . useSmDefault = useSmDefault ;
}
2014-10-29 10:06:33 +01:00
/ * *
* Set if Stream Management resumption should be used by default for new connections .
2018-05-09 23:06:12 +02:00
*
2014-10-29 10:06:33 +01:00
* @param useSmResumptionDefault true to use Stream Management resumption for new connections .
2015-09-25 18:00:32 +02:00
* @deprecated use { @link # setUseStreamManagementResumptionDefault ( boolean ) } instead .
2014-10-29 10:06:33 +01:00
* /
2015-09-25 18:00:32 +02:00
@Deprecated
2014-10-29 10:06:33 +01:00
public static void setUseStreamManagementResumptiodDefault ( boolean useSmResumptionDefault ) {
2015-10-21 11:04:09 +02:00
setUseStreamManagementResumptionDefault ( useSmResumptionDefault ) ;
2015-09-25 18:00:32 +02:00
}
/ * *
* Set if Stream Management resumption should be used by default for new connections .
*
* @param useSmResumptionDefault true to use Stream Management resumption for new connections .
* /
public static void setUseStreamManagementResumptionDefault ( boolean useSmResumptionDefault ) {
2014-10-29 10:06:33 +01:00
if ( useSmResumptionDefault ) {
// Also enable SM is resumption is enabled
setUseStreamManagementDefault ( useSmResumptionDefault ) ;
}
XMPPTCPConnection . useSmResumptionDefault = useSmResumptionDefault ;
2014-09-11 09:49:16 +02:00
}
2014-10-29 10:06:33 +01:00
/ * *
* Set if Stream Management should be used if supported by the server .
2018-05-09 23:06:12 +02:00
*
2014-10-29 10:06:33 +01:00
* @param useSm true to use Stream Management .
* /
2014-09-11 09:49:16 +02:00
public void setUseStreamManagement ( boolean useSm ) {
this . useSm = useSm ;
}
2014-10-29 10:06:33 +01:00
/ * *
* Set if Stream Management resumption should be used if supported by the server .
*
* @param useSmResumption true to use Stream Management resumption .
* /
2014-09-11 09:49:16 +02:00
public void setUseStreamManagementResumption ( boolean useSmResumption ) {
2014-10-29 10:06:33 +01:00
if ( useSmResumption ) {
// Also enable SM is resumption is enabled
setUseStreamManagement ( useSmResumption ) ;
}
2014-09-11 09:49:16 +02:00
this . useSmResumption = useSmResumption ;
}
/ * *
* Set the preferred resumption time in seconds .
* @param resumptionTime the preferred resumption time in seconds
* /
public void setPreferredResumptionTime ( int resumptionTime ) {
smClientMaxResumptionTime = resumptionTime ;
}
2014-10-29 10:06:33 +01:00
/ * *
* Add a predicate for Stream Management acknowledgment requests .
* < p >
* Those predicates are used to determine when a Stream Management acknowledgement request is send to the server .
2015-01-18 11:03:03 +01:00
* Some pre - defined predicates are found in the < code > org . jivesoftware . smack . sm . predicates < / code > package .
2014-10-29 10:06:33 +01:00
* < / p >
* < p >
* If not predicate is configured , the { @link Predicate # forMessagesOrAfter5Stanzas ( ) } will be used .
* < / p >
2018-05-09 23:06:12 +02:00
*
2014-10-29 10:06:33 +01:00
* @param predicate the predicate to add .
* @return if the predicate was not already active .
* /
2015-02-26 18:41:17 +01:00
public boolean addRequestAckPredicate ( StanzaFilter predicate ) {
2014-09-11 09:49:16 +02:00
synchronized ( requestAckPredicates ) {
return requestAckPredicates . add ( predicate ) ;
}
}
2014-10-29 10:06:33 +01:00
/ * *
* Remove the given predicate for Stream Management acknowledgment request .
* @param predicate the predicate to remove .
* @return true if the predicate was removed .
* /
2015-02-26 18:41:17 +01:00
public boolean removeRequestAckPredicate ( StanzaFilter predicate ) {
2014-09-11 09:49:16 +02:00
synchronized ( requestAckPredicates ) {
return requestAckPredicates . remove ( predicate ) ;
}
}
2014-10-29 10:06:33 +01:00
/ * *
* Remove all predicates for Stream Management acknowledgment requests .
* /
2014-09-11 09:49:16 +02:00
public void removeAllRequestAckPredicates ( ) {
synchronized ( requestAckPredicates ) {
requestAckPredicates . clear ( ) ;
}
}
2014-10-29 10:06:33 +01:00
/ * *
* Send an unconditional Stream Management acknowledgement request to the server .
*
2017-12-13 23:10:11 +01:00
* @throws StreamManagementNotEnabledException if Stream Management is not enabled .
2014-10-29 10:06:33 +01:00
* @throws NotConnectedException if the connection is not connected .
2019-08-30 12:08:30 +02:00
* @throws InterruptedException if the calling thread was interrupted .
2014-10-29 10:06:33 +01:00
* /
2015-09-25 18:00:32 +02:00
public void requestSmAcknowledgement ( ) throws StreamManagementNotEnabledException , NotConnectedException , InterruptedException {
2014-09-11 09:49:16 +02:00
if ( ! isSmEnabled ( ) ) {
throw new StreamManagementException . StreamManagementNotEnabledException ( ) ;
}
requestSmAcknowledgementInternal ( ) ;
}
2015-09-25 18:00:32 +02:00
private void requestSmAcknowledgementInternal ( ) throws NotConnectedException , InterruptedException {
2014-09-11 09:49:16 +02:00
packetWriter . sendStreamElement ( AckRequest . INSTANCE ) ;
}
2014-10-29 10:06:33 +01:00
/ * *
* Send a unconditional Stream Management acknowledgment to the server .
* < p >
* See < a href = " http://xmpp.org/extensions/xep-0198.html#acking " > XEP - 198 : Stream Management § 4 . Acks < / a > :
2017-12-25 12:51:41 +01:00
* " Either party MAY send an <a/> element at any time (e.g., after it has received a certain number of stanzas,
* or after a certain period of time ) , even if it has not received an & lt ; r / & gt ; element from the other party . "
2014-10-29 10:06:33 +01:00
* < / p >
2018-05-09 23:06:12 +02:00
*
2014-10-29 10:06:33 +01:00
* @throws StreamManagementNotEnabledException if Stream Management is not enabled .
* @throws NotConnectedException if the connection is not connected .
2019-08-30 12:08:30 +02:00
* @throws InterruptedException if the calling thread was interrupted .
2014-10-29 10:06:33 +01:00
* /
2015-09-25 18:00:32 +02:00
public void sendSmAcknowledgement ( ) throws StreamManagementNotEnabledException , NotConnectedException , InterruptedException {
2014-10-28 12:22:05 +01:00
if ( ! isSmEnabled ( ) ) {
throw new StreamManagementException . StreamManagementNotEnabledException ( ) ;
}
sendSmAcknowledgementInternal ( ) ;
}
2015-09-25 18:00:32 +02:00
private void sendSmAcknowledgementInternal ( ) throws NotConnectedException , InterruptedException {
2014-10-28 12:22:05 +01:00
packetWriter . sendStreamElement ( new AckAnswer ( clientHandledStanzasCount ) ) ;
}
2014-10-29 10:06:33 +01:00
/ * *
* Add a Stanza acknowledged listener .
* < p >
* Those listeners will be invoked every time a Stanza has been acknowledged by the server . The will not get
2015-03-01 10:28:15 +01:00
* automatically removed . Consider using { @link # addStanzaIdAcknowledgedListener ( String , StanzaListener ) } when
2014-10-29 10:06:33 +01:00
* possible .
* < / p >
2018-05-09 23:06:12 +02:00
*
2014-10-29 10:06:33 +01:00
* @param listener the listener to add .
* /
2015-03-01 10:28:15 +01:00
public void addStanzaAcknowledgedListener ( StanzaListener listener ) {
2014-09-11 09:49:16 +02:00
stanzaAcknowledgedListeners . add ( listener ) ;
}
2014-10-29 10:06:33 +01:00
/ * *
* Remove the given Stanza acknowledged listener .
*
* @param listener the listener .
* @return true if the listener was removed .
* /
2015-03-01 10:28:15 +01:00
public boolean removeStanzaAcknowledgedListener ( StanzaListener listener ) {
2014-09-11 09:49:16 +02:00
return stanzaAcknowledgedListeners . remove ( listener ) ;
}
2014-10-29 10:06:33 +01:00
/ * *
* Remove all stanza acknowledged listeners .
* /
2014-09-11 09:49:16 +02:00
public void removeAllStanzaAcknowledgedListeners ( ) {
stanzaAcknowledgedListeners . clear ( ) ;
}
2018-11-28 17:55:46 +01:00
/ * *
* Add a Stanza dropped listener .
* < p >
* Those listeners will be invoked every time a Stanza has been dropped due to a failed SM resume . They will not get
* automatically removed . If at least one StanzaDroppedListener is configured , no attempt will be made to retransmit
* the Stanzas .
* < / p >
*
* @param listener the listener to add .
* @since 4 . 3 . 3
* /
public void addStanzaDroppedListener ( StanzaListener listener ) {
stanzaDroppedListeners . add ( listener ) ;
}
/ * *
* Remove the given Stanza dropped listener .
*
* @param listener the listener .
* @return true if the listener was removed .
* @since 4 . 3 . 3
* /
public boolean removeStanzaDroppedListener ( StanzaListener listener ) {
return stanzaDroppedListeners . remove ( listener ) ;
}
2014-10-29 10:06:33 +01:00
/ * *
* Add a new Stanza ID acknowledged listener for the given ID .
* < p >
* The listener will be invoked if the stanza with the given ID was acknowledged by the server . It will
* automatically be removed after the listener was run .
* < / p >
2018-05-09 23:06:12 +02:00
*
2014-10-29 10:06:33 +01:00
* @param id the stanza ID .
* @param listener the listener to invoke .
* @return the previous listener for this stanza ID or null .
* @throws StreamManagementNotEnabledException if Stream Management is not enabled .
* /
2017-03-06 17:17:15 +01:00
@SuppressWarnings ( " FutureReturnValueIgnored " )
2015-03-01 10:28:15 +01:00
public StanzaListener addStanzaIdAcknowledgedListener ( final String id , StanzaListener listener ) throws StreamManagementNotEnabledException {
2014-10-29 10:06:33 +01:00
// Prevent users from adding callbacks that will never get removed
if ( ! smWasEnabledAtLeastOnce ) {
throw new StreamManagementException . StreamManagementNotEnabledException ( ) ;
}
2018-02-23 18:53:47 +01:00
// Remove the listener after max. 3 hours
final int removeAfterSeconds = Math . min ( getMaxSmResumptionTime ( ) , 3 * 60 * 60 ) ;
2015-01-15 21:36:23 +01:00
schedule ( new Runnable ( ) {
@Override
public void run ( ) {
stanzaIdAcknowledgedListeners . remove ( id ) ;
}
} , removeAfterSeconds , TimeUnit . SECONDS ) ;
2014-10-29 10:06:33 +01:00
return stanzaIdAcknowledgedListeners . put ( id , listener ) ;
2014-09-11 09:49:16 +02:00
}
2014-10-29 10:06:33 +01:00
/ * *
* Remove the Stanza ID acknowledged listener for the given ID .
2018-05-09 23:06:12 +02:00
*
2014-10-29 10:06:33 +01:00
* @param id the stanza ID .
* @return true if the listener was found and removed , false otherwise .
* /
2015-03-01 10:28:15 +01:00
public StanzaListener removeStanzaIdAcknowledgedListener ( String id ) {
2014-10-29 10:06:33 +01:00
return stanzaIdAcknowledgedListeners . remove ( id ) ;
2014-09-11 09:49:16 +02:00
}
2014-10-29 10:06:33 +01:00
/ * *
* Removes all Stanza ID acknowledged listeners .
* /
public void removeAllStanzaIdAcknowledgedListeners ( ) {
stanzaIdAcknowledgedListeners . clear ( ) ;
2014-09-11 09:49:16 +02:00
}
2014-10-29 10:06:33 +01:00
/ * *
* Returns true if Stream Management is supported by the server .
*
* @return true if Stream Management is supported by the server .
* /
2014-09-11 09:49:16 +02:00
public boolean isSmAvailable ( ) {
return hasFeature ( StreamManagementFeature . ELEMENT , StreamManagement . NAMESPACE ) ;
}
2014-10-29 10:06:33 +01:00
/ * *
* Returns true if Stream Management was successfully negotiated with the server .
*
* @return true if Stream Management was negotiated .
* /
2014-09-11 09:49:16 +02:00
public boolean isSmEnabled ( ) {
2020-05-26 21:39:08 +02:00
return smEnabledSyncPoint ;
2014-09-11 09:49:16 +02:00
}
2014-12-17 10:34:16 +01:00
/ * *
* Returns true if the stream was successfully resumed with help of Stream Management .
2018-05-09 23:06:12 +02:00
*
2014-12-17 10:34:16 +01:00
* @return true if the stream was resumed .
* /
public boolean streamWasResumed ( ) {
2020-05-26 21:39:08 +02:00
return smResumedSyncPoint = = SyncPointState . successful ;
2014-12-17 10:34:16 +01:00
}
2014-10-29 10:06:33 +01:00
/ * *
* Returns true if the connection is disconnected by a Stream resumption via Stream Management is possible .
2018-05-09 23:06:12 +02:00
*
2014-10-29 10:06:33 +01:00
* @return true if disconnected but resumption possible .
* /
2014-09-11 09:49:16 +02:00
public boolean isDisconnectedButSmResumptionPossible ( ) {
return disconnectedButResumeable & & isSmResumptionPossible ( ) ;
}
2014-10-29 10:06:33 +01:00
/ * *
* Returns true if the stream is resumable .
*
* @return true if the stream is resumable .
* /
2014-09-11 09:49:16 +02:00
public boolean isSmResumptionPossible ( ) {
// There is no resumable stream available
if ( smSessionId = = null )
return false ;
final Long shutdownTimestamp = packetWriter . shutdownTimestamp ;
// Seems like we are already reconnected, report true
if ( shutdownTimestamp = = null ) {
return true ;
}
// See if resumption time is over
long current = System . currentTimeMillis ( ) ;
2015-04-06 21:59:45 +02:00
long maxResumptionMillies = ( ( long ) getMaxSmResumptionTime ( ) ) * 1000 ;
if ( current > shutdownTimestamp + maxResumptionMillies ) {
// Stream resumption is *not* possible if the current timestamp is greater then the greatest timestamp where
// resumption is possible
2014-09-11 09:49:16 +02:00
return false ;
} else {
return true ;
}
}
2015-08-26 18:47:05 +02:00
/ * *
* Drop the stream management state . Sets { @link # smSessionId } and
* { @link # unacknowledgedStanzas } to < code > null < / code > .
* /
private void dropSmState ( ) {
// clientHandledCount and serverHandledCount will be reset on <enable/> and <enabled/>
// respective. No need to reset them here.
smSessionId = null ;
unacknowledgedStanzas = null ;
}
2015-01-15 21:33:44 +01:00
/ * *
* Get the maximum resumption time in seconds after which a managed stream can be resumed .
2015-04-06 21:55:20 +02:00
* < p >
* This method will return { @link Integer # MAX_VALUE } if neither the client nor the server specify a maximum
* resumption time . Be aware of integer overflows when using this value , e . g . do not add arbitrary values to it
* without checking for overflows before .
* < / p >
2015-01-15 21:33:44 +01:00
*
2015-04-06 21:55:20 +02:00
* @return the maximum resumption time in seconds or { @link Integer # MAX_VALUE } if none set .
2015-01-15 21:33:44 +01:00
* /
public int getMaxSmResumptionTime ( ) {
int clientResumptionTime = smClientMaxResumptionTime > 0 ? smClientMaxResumptionTime : Integer . MAX_VALUE ;
2017-12-13 23:10:11 +01:00
int serverResumptionTime = smServerMaxResumptionTime > 0 ? smServerMaxResumptionTime : Integer . MAX_VALUE ;
2015-01-15 21:33:44 +01:00
return Math . min ( clientResumptionTime , serverResumptionTime ) ;
}
2015-09-25 18:01:59 +02:00
private void processHandledCount ( long handledCount ) throws StreamManagementCounterError {
2014-09-11 09:49:16 +02:00
long ackedStanzasCount = SMUtils . calculateDelta ( handledCount , serverHandledStanzasCount ) ;
2017-12-13 23:10:11 +01:00
final List < Stanza > ackedStanzas = new ArrayList < > (
2015-10-21 16:03:45 +02:00
ackedStanzasCount < = Integer . MAX_VALUE ? ( int ) ackedStanzasCount
2014-09-11 09:49:16 +02:00
: Integer . MAX_VALUE ) ;
for ( long i = 0 ; i < ackedStanzasCount ; i + + ) {
2015-02-05 11:17:27 +01:00
Stanza ackedStanza = unacknowledgedStanzas . poll ( ) ;
2014-09-11 09:49:16 +02:00
// If the server ack'ed a stanza, then it must be in the
// unacknowledged stanza queue. There can be no exception.
2015-02-18 12:01:55 +01:00
if ( ackedStanza = = null ) {
throw new StreamManagementCounterError ( handledCount , serverHandledStanzasCount ,
ackedStanzasCount , ackedStanzas ) ;
}
2014-09-11 09:49:16 +02:00
ackedStanzas . add ( ackedStanza ) ;
}
2014-10-29 10:15:57 +01:00
2015-01-13 11:15:55 +01:00
boolean atLeastOneStanzaAcknowledgedListener = false ;
if ( ! stanzaAcknowledgedListeners . isEmpty ( ) ) {
// If stanzaAcknowledgedListeners is not empty, the we have at least one
atLeastOneStanzaAcknowledgedListener = true ;
}
else {
// Otherwise we look for a matching id in the stanza *id* acknowledged listeners
2015-02-05 11:17:27 +01:00
for ( Stanza ackedStanza : ackedStanzas ) {
String id = ackedStanza . getStanzaId ( ) ;
2015-01-13 11:15:55 +01:00
if ( id ! = null & & stanzaIdAcknowledgedListeners . containsKey ( id ) ) {
atLeastOneStanzaAcknowledgedListener = true ;
break ;
}
2014-09-11 09:49:16 +02:00
}
2014-05-26 22:02:04 +02:00
}
2014-10-29 10:15:57 +01:00
// Only spawn a new thread if there is a chance that some listener is invoked
2015-01-13 11:15:55 +01:00
if ( atLeastOneStanzaAcknowledgedListener ) {
2014-12-17 18:21:54 +01:00
asyncGo ( new Runnable ( ) {
2014-10-29 10:15:57 +01:00
@Override
public void run ( ) {
2015-02-05 11:17:27 +01:00
for ( Stanza ackedStanza : ackedStanzas ) {
2015-03-01 10:28:15 +01:00
for ( StanzaListener listener : stanzaAcknowledgedListeners ) {
2015-01-13 11:21:47 +01:00
try {
2017-01-03 11:12:34 +01:00
listener . processStanza ( ackedStanza ) ;
2014-10-29 10:15:57 +01:00
}
2017-11-06 22:38:13 +01:00
catch ( InterruptedException | NotConnectedException | NotLoggedInException e ) {
2015-09-25 18:00:32 +02:00
LOGGER . log ( Level . FINER , " Received exception " , e ) ;
2015-01-13 11:21:47 +01:00
}
}
2015-02-05 11:17:27 +01:00
String id = ackedStanza . getStanzaId ( ) ;
2015-01-17 14:11:21 +01:00
if ( StringUtils . isNullOrEmpty ( id ) ) {
2015-02-19 23:01:38 +01:00
continue ;
2015-01-13 11:21:47 +01:00
}
2015-03-01 10:28:15 +01:00
StanzaListener listener = stanzaIdAcknowledgedListeners . remove ( id ) ;
2015-01-13 11:21:47 +01:00
if ( listener ! = null ) {
try {
2017-01-03 11:12:34 +01:00
listener . processStanza ( ackedStanza ) ;
2015-01-13 11:21:47 +01:00
}
2017-11-06 22:38:13 +01:00
catch ( InterruptedException | NotConnectedException | NotLoggedInException e ) {
2015-09-25 18:00:32 +02:00
LOGGER . log ( Level . FINER , " Received exception " , e ) ;
2014-10-29 10:15:57 +01:00
}
}
}
}
2014-12-17 18:21:54 +01:00
} ) ;
2014-10-29 10:15:57 +01:00
}
2014-09-11 09:49:16 +02:00
serverHandledStanzasCount = handledCount ;
2013-03-18 20:58:48 +01:00
}
2015-02-18 14:38:56 +01:00
/ * *
* Set the default bundle and defer callback used for new connections .
*
2019-08-30 12:08:30 +02:00
* @param defaultBundleAndDeferCallback TODO javadoc me please
2015-02-18 14:38:56 +01:00
* @see BundleAndDeferCallback
* @since 4 . 1
* /
public static void setDefaultBundleAndDeferCallback ( BundleAndDeferCallback defaultBundleAndDeferCallback ) {
XMPPTCPConnection . defaultBundleAndDeferCallback = defaultBundleAndDeferCallback ;
}
/ * *
* Set the bundle and defer callback used for this connection .
* < p >
* You can use < code > null < / code > as argument to reset the callback . Outgoing stanzas will then
* no longer get deferred .
* < / p >
*
* @param bundleAndDeferCallback the callback or < code > null < / code > .
* @see BundleAndDeferCallback
* @since 4 . 1
* /
public void setBundleandDeferCallback ( BundleAndDeferCallback bundleAndDeferCallback ) {
this . bundleAndDeferCallback = bundleAndDeferCallback ;
}
2007-11-14 17:27:47 +01:00
}