/** * * Copyright 2009 Jive Software, 2018-2020 Florian Schmaus. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.jivesoftware.smack; import java.io.IOException; import java.io.Reader; import java.io.Writer; import java.util.Collection; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.logging.Level; import java.util.logging.Logger; import javax.net.ssl.SSLSession; import javax.xml.namespace.QName; import org.jivesoftware.smack.ConnectionConfiguration.SecurityMode; import org.jivesoftware.smack.SmackConfiguration.UnknownIqRequestReplyMode; import org.jivesoftware.smack.SmackException.AlreadyConnectedException; import org.jivesoftware.smack.SmackException.AlreadyLoggedInException; import org.jivesoftware.smack.SmackException.NoResponseException; import org.jivesoftware.smack.SmackException.NotConnectedException; import org.jivesoftware.smack.SmackException.NotLoggedInException; import org.jivesoftware.smack.SmackException.ResourceBindingNotOfferedException; import org.jivesoftware.smack.SmackException.SecurityRequiredByClientException; import org.jivesoftware.smack.SmackException.SecurityRequiredException; import org.jivesoftware.smack.SmackException.SmackSaslException; import org.jivesoftware.smack.SmackException.SmackWrappedException; import org.jivesoftware.smack.SmackFuture.InternalSmackFuture; import org.jivesoftware.smack.XMPPException.FailedNonzaException; import org.jivesoftware.smack.XMPPException.StreamErrorException; import org.jivesoftware.smack.XMPPException.XMPPErrorException; import org.jivesoftware.smack.compress.packet.Compress; import org.jivesoftware.smack.compression.XMPPInputOutputStream; import org.jivesoftware.smack.datatypes.UInt16; import org.jivesoftware.smack.debugger.SmackDebugger; import org.jivesoftware.smack.debugger.SmackDebuggerFactory; import org.jivesoftware.smack.filter.IQReplyFilter; import org.jivesoftware.smack.filter.StanzaFilter; import org.jivesoftware.smack.filter.StanzaIdFilter; import org.jivesoftware.smack.internal.SmackTlsContext; import org.jivesoftware.smack.iqrequest.IQRequestHandler; import org.jivesoftware.smack.packet.Bind; import org.jivesoftware.smack.packet.ErrorIQ; import org.jivesoftware.smack.packet.ExtensionElement; import org.jivesoftware.smack.packet.FullyQualifiedElement; import org.jivesoftware.smack.packet.IQ; import org.jivesoftware.smack.packet.Mechanisms; import org.jivesoftware.smack.packet.Message; import org.jivesoftware.smack.packet.MessageBuilder; import org.jivesoftware.smack.packet.MessageOrPresence; import org.jivesoftware.smack.packet.MessageOrPresenceBuilder; import org.jivesoftware.smack.packet.Nonza; import org.jivesoftware.smack.packet.Presence; import org.jivesoftware.smack.packet.PresenceBuilder; import org.jivesoftware.smack.packet.Session; import org.jivesoftware.smack.packet.Stanza; import org.jivesoftware.smack.packet.StanzaError; import org.jivesoftware.smack.packet.StanzaFactory; import org.jivesoftware.smack.packet.StartTls; import org.jivesoftware.smack.packet.StreamError; import org.jivesoftware.smack.packet.StreamOpen; import org.jivesoftware.smack.packet.TopLevelStreamElement; import org.jivesoftware.smack.packet.XmlEnvironment; import org.jivesoftware.smack.packet.id.StanzaIdSource; import org.jivesoftware.smack.parsing.ParsingExceptionCallback; import org.jivesoftware.smack.parsing.SmackParsingException; import org.jivesoftware.smack.provider.ExtensionElementProvider; import org.jivesoftware.smack.provider.NonzaProvider; import org.jivesoftware.smack.provider.ProviderManager; import org.jivesoftware.smack.sasl.SASLErrorException; import org.jivesoftware.smack.sasl.SASLMechanism; import org.jivesoftware.smack.sasl.core.SASLAnonymous; import org.jivesoftware.smack.sasl.packet.SaslNonza; import org.jivesoftware.smack.util.Async; import org.jivesoftware.smack.util.CollectionUtil; import org.jivesoftware.smack.util.Consumer; import org.jivesoftware.smack.util.MultiMap; import org.jivesoftware.smack.util.Objects; import org.jivesoftware.smack.util.PacketParserUtils; import org.jivesoftware.smack.util.ParserUtils; import org.jivesoftware.smack.util.Predicate; import org.jivesoftware.smack.util.StringUtils; import org.jivesoftware.smack.util.Supplier; import org.jivesoftware.smack.xml.XmlPullParser; import org.jivesoftware.smack.xml.XmlPullParserException; import org.jxmpp.jid.DomainBareJid; import org.jxmpp.jid.EntityBareJid; import org.jxmpp.jid.EntityFullJid; import org.jxmpp.jid.Jid; import org.jxmpp.jid.impl.JidCreate; import org.jxmpp.jid.parts.Resourcepart; import org.jxmpp.stringprep.XmppStringprepException; import org.jxmpp.util.XmppStringUtils; /** * This abstract class is commonly used as super class for XMPP connection mechanisms like TCP and BOSH. Hence it * provides the methods for connection state management, like {@link #connect()}, {@link #login()} and * {@link #disconnect()} (which are deliberately not provided by the {@link XMPPConnection} interface). *
* Note: The default entry point to Smack's documentation is {@link XMPPConnection}. If you are getting started * with Smack, then head over to {@link XMPPConnection} and the come back here. *
** In case a Smack parser (Provider) throws those exceptions are handled over to the {@link ParsingExceptionCallback}. A * common cause for a provider throwing is illegal input, for example a non-numeric String where only Integers are * allowed. Smack's default behavior follows the "fail-hard per default" principle leading to a * termination of the connection on parsing exceptions. This default was chosen to make users eventually aware that they * should configure their own callback and handle those exceptions to prevent the disconnect. Handle a parsing exception * could be as simple as using a non-throwing no-op callback, which would cause the faulty stream element to be taken * out of the stream, i.e., Smack behaves like that element was never received. *
** If the parsing exception is because Smack received illegal input, then please consider informing the authors of the * originating entity about that. If it was thrown because of an bug in a Smack parser, then please consider filling a * bug with Smack. *
** The "fail-hard per default" behavior is achieved by using the * {@link org.jivesoftware.smack.parsing.ExceptionThrowingCallbackWithHint} as default parsing exception callback. You * can change the behavior using {@link #setParsingExceptionCallback(ParsingExceptionCallback)} to set a new callback. * Use {@link org.jivesoftware.smack.SmackConfiguration#setDefaultParsingExceptionCallback(ParsingExceptionCallback)} to * set the default callback. *
*/ public abstract class AbstractXMPPConnection implements XMPPConnection { private static final Logger LOGGER = Logger.getLogger(AbstractXMPPConnection.class.getName()); protected static final SmackReactor SMACK_REACTOR; static { SMACK_REACTOR = SmackReactor.getInstance(); } /** * Counter to uniquely identify connections that are created. */ private static final AtomicInteger connectionCounter = new AtomicInteger(0); static { Smack.ensureInitialized(); } protected enum SyncPointState { initial, request_sent, successful, } /** * A collection of ConnectionListeners which listen for connection closing * and reconnection events. */ protected final Set
* We use a ConcurrentLinkedQueue here, because its Iterator is weakly
* consistent and we want {@link #invokeStanzaCollectorsAndNotifyRecvListeners(Stanza)} for-each
* loop to be lock free. As drawback, removing a StanzaCollector is O(n).
* The alternative would be a synchronized HashSet, but this would mean a
* synchronized block around every usage of collectors
.
*
* It is important that we don't infer the user from the login() arguments and the configurations service name, as, * for example, when SASL External is used, the username is not given to login but taken from the 'external' * certificate. *
*/ protected EntityFullJid user; protected boolean connected = false; /** * The stream ID, see RFC 6120 § 4.7.3 */ protected String streamId; /** * The timeout to wait for a reply in milliseconds. */ private long replyTimeout = SmackConfiguration.getDefaultReplyTimeout(); /** * The SmackDebugger allows to log and debug XML traffic. */ protected final SmackDebugger debugger; /** * The Reader which is used for the debugger. */ protected Reader reader; /** * The Writer which is used for the debugger. */ protected Writer writer; protected SmackException currentSmackException; protected XMPPException currentXmppException; protected boolean tlsHandled; /** * Set totrue
if the last features stanza from the server has been parsed. A XMPP connection
* handshake can invoke multiple features stanzas, e.g. when TLS is activated a second feature
* stanza is send by the server. This is set to true once the last feature stanza has been
* parsed.
*/
protected boolean lastFeaturesReceived;
/**
* Set to true
if the SASL feature has been received.
*/
protected boolean saslFeatureReceived;
/**
* A synchronization point which is successful if this connection has received the closing
* stream element from the remote end-point, i.e. the server.
*/
protected boolean closingStreamReceived;
/**
* The SASLAuthentication manager that is responsible for authenticating with the server.
*/
private final SASLAuthentication saslAuthentication;
/**
* A number to uniquely identify connections that are created. This is distinct from the
* connection ID, which is a value sent by the server once a connection is made.
*/
protected final int connectionCounterValue = connectionCounter.getAndIncrement();
/**
* Holds the initial configuration used while creating the connection.
*/
protected final ConnectionConfiguration config;
/**
* Defines how the from attribute of outgoing stanzas should be handled.
*/
private FromMode fromMode = FromMode.OMITTED;
protected XMPPInputOutputStream compressionHandler;
private ParsingExceptionCallback parsingExceptionCallback = SmackConfiguration.getDefaultParsingExceptionCallback();
/**
* A cached thread pool executor service with custom thread factory to set meaningful names on the threads and set
* them 'daemon'.
*/
private static final ExecutorService CACHED_EXECUTOR_SERVICE = Executors.newCachedThreadPool(new ThreadFactory() {
@Override
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable);
thread.setName("Smack Cached Executor");
thread.setDaemon(true);
thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
LOGGER.log(Level.WARNING, t + " encountered uncaught exception", e);
}
});
return thread;
}
});
protected static final AsyncButOrdered* Listeners will be preserved from a previous connection. *
* * @throws XMPPException if an error occurs on the XMPP protocol level. * @throws SmackException if an error occurs somewhere else besides XMPP protocol level. * @throws IOException if an I/O error occurred. * @return a reference to this object, to chainconnect()
with login()
.
* @throws InterruptedException if the calling thread was interrupted.
*/
public synchronized AbstractXMPPConnection connect() throws SmackException, IOException, XMPPException, InterruptedException {
// Check if not already connected
throwAlreadyConnectedExceptionIfAppropriate();
// Notify connection listeners that we are trying to connect
callConnectionConnectingListener();
// Reset the connection state
initState();
closingStreamReceived = false;
streamId = null;
try {
// Perform the actual connection to the XMPP service
connectInternal();
// If TLS is required but the server doesn't offer it, disconnect
// from the server and throw an error. First check if we've already negotiated TLS
// and are secure, however (features get parsed a second time after TLS is established).
if (!isSecureConnection() && getConfiguration().getSecurityMode() == SecurityMode.required) {
throw new SecurityRequiredByClientException();
}
} catch (SmackException | IOException | XMPPException | InterruptedException e) {
instantShutdown();
throw e;
}
// If connectInternal() did not throw, then this connection must now be marked as connected.
assert connected;
callConnectionConnectedListener();
return this;
}
/**
* Abstract method that concrete subclasses of XMPPConnection need to implement to perform their
* way of XMPP connection establishment. Implementations are required to perform an automatic
* login if the previous connection state was logged (authenticated).
*
* @throws SmackException if Smack detected an exceptional situation.
* @throws IOException if an I/O error occurred.
* @throws XMPPException if an XMPP protocol error was received.
* @throws InterruptedException if the calling thread was interrupted.
*/
protected abstract void connectInternal() throws SmackException, IOException, XMPPException, InterruptedException;
private String usedUsername, usedPassword;
/**
* The resourcepart used for this connection. May not be the resulting resourcepart if it's null or overridden by the XMPP service.
*/
private Resourcepart usedResource;
/**
* Logs in to the server using the strongest SASL mechanism supported by
* the server. If more than the connection's default stanza timeout elapses in each step of the
* authentication process without a response from the server, a
* {@link SmackException.NoResponseException} will be thrown.
* * Before logging in (i.e. authenticate) to the server the connection must be connected * by calling {@link #connect}. *
** It is possible to log in without sending an initial available presence by using * {@link ConnectionConfiguration.Builder#setSendPresence(boolean)}. * Finally, if you want to not pass a password and instead use a more advanced mechanism * while using SASL then you may be interested in using * {@link ConnectionConfiguration.Builder#setCallbackHandler(javax.security.auth.callback.CallbackHandler)}. * For more advanced login settings see {@link ConnectionConfiguration}. *
* * @throws XMPPException if an error occurs on the XMPP protocol level. * @throws SmackException if an error occurs somewhere else besides XMPP protocol level. * @throws IOException if an I/O error occurs during login. * @throws InterruptedException if the calling thread was interrupted. */ public synchronized void login() throws XMPPException, SmackException, IOException, InterruptedException { // The previously used username, password and resource take over precedence over the // ones from the connection configuration CharSequence username = usedUsername != null ? usedUsername : config.getUsername(); String password = usedPassword != null ? usedPassword : config.getPassword(); Resourcepart resource = usedResource != null ? usedResource : config.getResource(); login(username, password, resource); } /** * Same as {@link #login(CharSequence, String, Resourcepart)}, but takes the resource from the connection * configuration. * * @param username TODO javadoc me please * @param password TODO javadoc me please * @throws XMPPException if an XMPP protocol error was received. * @throws SmackException if Smack detected an exceptional situation. * @throws IOException if an I/O error occurred. * @throws InterruptedException if the calling thread was interrupted. * @see #login */ public synchronized void login(CharSequence username, String password) throws XMPPException, SmackException, IOException, InterruptedException { login(username, password, config.getResource()); } /** * Login with the given username (authorization identity). You may omit the password if a callback handler is used. * If resource is null, then the server will generate one. * * @param username TODO javadoc me please * @param password TODO javadoc me please * @param resource TODO javadoc me please * @throws XMPPException if an XMPP protocol error was received. * @throws SmackException if Smack detected an exceptional situation. * @throws IOException if an I/O error occurred. * @throws InterruptedException if the calling thread was interrupted. * @see #login */ public synchronized void login(CharSequence username, String password, Resourcepart resource) throws XMPPException, SmackException, IOException, InterruptedException { if (!config.allowNullOrEmptyUsername) { StringUtils.requireNotNullNorEmpty(username, "Username must not be null nor empty"); } throwNotConnectedExceptionIfAppropriate("Did you call connect() before login()?"); throwAlreadyLoggedInExceptionIfAppropriate(); usedUsername = username != null ? username.toString() : null; usedPassword = password; usedResource = resource; loginInternal(usedUsername, usedPassword, usedResource); } protected abstract void loginInternal(String username, String password, Resourcepart resource) throws XMPPException, SmackException, IOException, InterruptedException; @Override public final boolean isConnected() { return connected; } @Override public final boolean isAuthenticated() { return authenticated; } @Override public final EntityFullJid getUser() { return user; } @Override public String getStreamId() { if (!isConnected()) { return null; } return streamId; } protected final void throwCurrentConnectionException() throws SmackException, XMPPException { if (currentSmackException != null) { throw currentSmackException; } else if (currentXmppException != null) { throw currentXmppException; } throw new AssertionError("No current connection exception set, although throwCurrentException() was called"); } protected final boolean hasCurrentConnectionException() { return currentSmackException != null || currentXmppException != null; } protected final void setCurrentConnectionExceptionAndNotify(Exception exception) { if (exception instanceof SmackException) { currentSmackException = (SmackException) exception; } else if (exception instanceof XMPPException) { currentXmppException = (XMPPException) exception; } else { currentSmackException = new SmackException.SmackWrappedException(exception); } notifyWaitingThreads(); } /** * We use an extra object for {@link #notifyWaitingThreads()} and {@link #waitForConditionOrConnectionException(Supplier)}, because all state * changing methods of the connection are synchronized using the connection instance as monitor. If we now would * also use the connection instance for the internal process to wait for a condition, the {@link Object#wait()} * would leave the monitor when it waites, which would allow for another potential call to a state changing function * to proceed. */ private final Object internalMonitor = new Object(); protected final void notifyWaitingThreads() { synchronized (internalMonitor) { internalMonitor.notifyAll(); } } protected final boolean waitFor(Suppliernull
if
* this connection was not authenticated before.
*
* @return the name of the used SASL mechanism.
* @since 4.2
*/
public final String getUsedSaslMechansism() {
return saslAuthentication.getNameOfLastUsedSaslMechansism();
}
private DomainBareJid xmppServiceDomain;
protected Lock getConnectionLock() {
return connectionLock;
}
protected void throwNotConnectedExceptionIfAppropriate() throws NotConnectedException {
throwNotConnectedExceptionIfAppropriate(null);
}
protected void throwNotConnectedExceptionIfAppropriate(String optionalHint) throws NotConnectedException {
if (!isConnected()) {
throw new NotConnectedException(optionalHint);
}
}
protected void throwAlreadyConnectedExceptionIfAppropriate() throws AlreadyConnectedException {
if (isConnected()) {
throw new AlreadyConnectedException();
}
}
protected void throwAlreadyLoggedInExceptionIfAppropriate() throws AlreadyLoggedInException {
if (isAuthenticated()) {
throw new AlreadyLoggedInException();
}
}
@Override
public final StanzaFactory getStanzaFactory() {
return stanzaFactory;
}
@Override
public final void sendStanza(Stanza stanza) throws NotConnectedException, InterruptedException {
Objects.requireNonNull(stanza, "Stanza must not be null");
assert stanza instanceof Message || stanza instanceof Presence || stanza instanceof IQ;
throwNotConnectedExceptionIfAppropriate();
switch (fromMode) {
case OMITTED:
stanza.setFrom((Jid) null);
break;
case USER:
stanza.setFrom(getUser());
break;
case UNCHANGED:
default:
break;
}
// Invoke interceptors for the new stanza that is about to be sent. Interceptors may modify
// the content of the stanza.
Stanza stanzaAfterInterceptors = firePacketInterceptors(stanza);
sendStanzaInternal(stanzaAfterInterceptors);
}
/**
* Authenticate a connection.
*
* @param username the username that is authenticating with the server.
* @param password the password to send to the server.
* @param authzid the authorization identifier (typically null).
* @param sslSession the optional SSL/TLS session (if one was established)
* @return the used SASLMechanism.
* @throws XMPPErrorException if there was an XMPP error returned.
* @throws SASLErrorException if a SASL protocol error was returned.
* @throws IOException if an I/O error occurred.
* @throws InterruptedException if the calling thread was interrupted.
* @throws SmackSaslException if a SASL specific error occurred.
* @throws NotConnectedException if the XMPP connection is not connected.
* @throws NoResponseException if there was no response from the remote entity.
* @throws SmackWrappedException in case of an exception.
* @see SASLAuthentication#authenticate(String, String, EntityBareJid, SSLSession)
*/
protected final SASLMechanism authenticate(String username, String password, EntityBareJid authzid,
SSLSession sslSession) throws XMPPErrorException, SASLErrorException, SmackSaslException,
NotConnectedException, NoResponseException, IOException, InterruptedException, SmackWrappedException {
SASLMechanism saslMechanism = saslAuthentication.authenticate(username, password, authzid, sslSession);
afterSaslAuthenticationSuccess();
return saslMechanism;
}
/**
* Hook for subclasses right after successful SASL authentication. RFC 6120 § 6.4.6. specifies a that the initiating
* entity, needs to initiate a new stream in this case. But some transports, like BOSH, requires a special handling.
* * Note that we can not reset XMPPTCPConnection's parser here, because this method is invoked by the thread calling * {@link #login()}, but the parser reset has to be done within the reader thread. *
* * @throws NotConnectedException if the XMPP connection is not connected. * @throws InterruptedException if the calling thread was interrupted. * @throws SmackWrappedException in case of an exception. */ protected void afterSaslAuthenticationSuccess() throws NotConnectedException, InterruptedException, SmackWrappedException { sendStreamOpen(); } protected final boolean isSaslAuthenticated() { return saslAuthentication.authenticationSuccessful(); } /** * Closes the connection by setting presence to unavailable then closing the connection to * the XMPP server. The XMPPConnection can still be used for connecting to the server * again. * */ public void disconnect() { Presence unavailablePresence = null; if (isAuthenticated()) { unavailablePresence = getStanzaFactory().buildPresenceStanza() .ofType(Presence.Type.unavailable) .build(); } try { disconnect(unavailablePresence); } catch (NotConnectedException e) { LOGGER.log(Level.FINEST, "Connection is already disconnected", e); } } /** * Closes the connection. A custom unavailable presence is sent to the server, followed * by closing the stream. The XMPPConnection can still be used for connecting to the server * again. A custom unavailable presence is useful for communicating offline presence * information such as "On vacation". Typically, just the status text of the presence * stanza is set with online information, but most XMPP servers will deliver the full * presence stanza with whatever data is set. * * @param unavailablePresence the optional presence stanza to send during shutdown. * @throws NotConnectedException if the XMPP connection is not connected. */ public synchronized void disconnect(Presence unavailablePresence) throws NotConnectedException { if (unavailablePresence != null) { try { sendStanza(unavailablePresence); } catch (InterruptedException e) { LOGGER.log(Level.FINE, "Was interrupted while sending unavailable presence. Continuing to disconnect the connection", e); } } shutdown(); callConnectionClosedListener(); } private final Object notifyConnectionErrorMonitor = new Object(); /** * Sends out a notification that there was an error with the connection * and closes the connection. * * @param exception the exception that causes the connection close event. */ protected final void notifyConnectionError(final Exception exception) { synchronized (notifyConnectionErrorMonitor) { if (!isConnected()) { LOGGER.log(Level.INFO, "Connection was already disconnected when attempting to handle " + exception, exception); return; } // Note that we first have to set the current connection exception and notify waiting threads, as one of them // could hold the instance lock, which we also need later when calling instantShutdown(). setCurrentConnectionExceptionAndNotify(exception); // Closes the connection temporary. A if the connection supports stream management, then a reconnection is // possible. Note that a connection listener of e.g. XMPPTCPConnection will drop the SM state in // case the Exception is a StreamErrorException. instantShutdown(); for (StanzaCollector collector : collectors) { collector.notifyConnectionError(exception); } Async.go(() -> { // Notify connection listeners of the error. callConnectionClosedOnErrorListener(exception); }, AbstractXMPPConnection.this + " callConnectionClosedOnErrorListener()"); } } /** * Performs an unclean disconnect and shutdown of the connection. Does not send a closing stream stanza. */ public abstract void instantShutdown(); /** * Shuts the current connection down. */ protected abstract void shutdown(); protected final boolean waitForClosingStreamTagFromServer() { try { waitForConditionOrThrowConnectionException(() -> closingStreamReceived, "closing stream tag from the server"); } catch (InterruptedException | SmackException | XMPPException e) { LOGGER.log(Level.INFO, "Exception while waiting for closing stream element from the server " + this, e); return false; } return true; } @Override public void addConnectionListener(ConnectionListener connectionListener) { if (connectionListener == null) { return; } connectionListeners.add(connectionListener); } @Override public void removeConnectionListener(ConnectionListener connectionListener) { connectionListeners.remove(connectionListener); } @Override public I sendIqRequestAndWaitForResponse(IQ request) throws NoResponseException, XMPPErrorException, NotConnectedException, InterruptedException { StanzaCollector collector = createStanzaCollectorAndSend(request); IQ resultResponse = collector.nextResultOrThrow(); @SuppressWarnings("unchecked") I concreteResultResponse = (I) resultResponse; return concreteResultResponse; } @Override public StanzaCollector createStanzaCollectorAndSend(IQ packet) throws NotConnectedException, InterruptedException { StanzaFilter packetFilter = new IQReplyFilter(packet, this); // Create the packet collector before sending the packet StanzaCollector packetCollector = createStanzaCollectorAndSend(packetFilter, packet); return packetCollector; } @Override public StanzaCollector createStanzaCollectorAndSend(StanzaFilter packetFilter, Stanza packet) throws NotConnectedException, InterruptedException { StanzaCollector.Configuration configuration = StanzaCollector.newConfiguration() .setStanzaFilter(packetFilter) .setRequest(packet); // Create the packet collector before sending the packet StanzaCollector packetCollector = createStanzaCollector(configuration); try { // Now we can send the packet as the collector has been created sendStanza(packet); } catch (InterruptedException | NotConnectedException | RuntimeException e) { packetCollector.cancel(); throw e; } return packetCollector; } @Override public StanzaCollector createStanzaCollector(StanzaFilter packetFilter) { StanzaCollector.Configuration configuration = StanzaCollector.newConfiguration().setStanzaFilter(packetFilter); return createStanzaCollector(configuration); } @Override public StanzaCollector createStanzaCollector(StanzaCollector.Configuration configuration) { StanzaCollector collector = new StanzaCollector(this, configuration); // Add the collector to the list of active collectors. collectors.add(collector); return collector; } @Override public void removeStanzaCollector(StanzaCollector collector) { collectors.remove(collector); } @Override public final void addStanzaListener(StanzaListener stanzaListener, StanzaFilter stanzaFilter) { if (stanzaListener == null) { throw new NullPointerException("Given stanza listener must not be null"); } ListenerWrapper wrapper = new ListenerWrapper(stanzaListener, stanzaFilter); synchronized (recvListeners) { recvListeners.put(stanzaListener, wrapper); } } @Override public final boolean removeStanzaListener(StanzaListener stanzaListener) { synchronized (recvListeners) { return recvListeners.remove(stanzaListener) != null; } } @Override public void addSyncStanzaListener(StanzaListener packetListener, StanzaFilter packetFilter) { if (packetListener == null) { throw new NullPointerException("Packet listener is null."); } ListenerWrapper wrapper = new ListenerWrapper(packetListener, packetFilter); synchronized (syncRecvListeners) { syncRecvListeners.put(packetListener, wrapper); } } @Override public boolean removeSyncStanzaListener(StanzaListener packetListener) { synchronized (syncRecvListeners) { return syncRecvListeners.remove(packetListener) != null; } } @Override public void addAsyncStanzaListener(StanzaListener packetListener, StanzaFilter packetFilter) { if (packetListener == null) { throw new NullPointerException("Packet listener is null."); } ListenerWrapper wrapper = new ListenerWrapper(packetListener, packetFilter); synchronized (asyncRecvListeners) { asyncRecvListeners.put(packetListener, wrapper); } } @Override public boolean removeAsyncStanzaListener(StanzaListener packetListener) { synchronized (asyncRecvListeners) { return asyncRecvListeners.remove(packetListener) != null; } } @Override public void addStanzaSendingListener(StanzaListener packetListener, StanzaFilter packetFilter) { if (packetListener == null) { throw new NullPointerException("Packet listener is null."); } ListenerWrapper wrapper = new ListenerWrapper(packetListener, packetFilter); synchronized (sendListeners) { sendListeners.put(packetListener, wrapper); } } @Override public void removeStanzaSendingListener(StanzaListener packetListener) { synchronized (sendListeners) { sendListeners.remove(packetListener); } } /** * Process all stanza listeners for sending stanzas. ** Compared to {@link #firePacketInterceptors(Stanza)}, the listeners will be invoked in a new thread. *
* * @param sendTopLevelStreamElement the top level stream element which just got send. */ // TODO: Rename to fireElementSendingListeners(). @SuppressWarnings("javadoc") protected void firePacketSendingListeners(final TopLevelStreamElement sendTopLevelStreamElement) { if (debugger != null) { debugger.onOutgoingStreamElement(sendTopLevelStreamElement); } if (!(sendTopLevelStreamElement instanceof Stanza)) { return; } Stanza packet = (Stanza) sendTopLevelStreamElement; final Listsmack.debuggerClass
to the implementation.
*
* @throws IllegalStateException if the reader or writer isn't yet initialized.
* @throws IllegalArgumentException if the SmackDebugger can't be loaded.
*/
protected void initDebugger() {
if (reader == null || writer == null) {
throw new NullPointerException("Reader or writer isn't initialized.");
}
// If debugging is enabled, we open a window and write out all network traffic.
if (debugger != null) {
// Obtain new reader and writer from the existing debugger
reader = debugger.newConnectionReader(reader);
writer = debugger.newConnectionWriter(writer);
}
}
@Override
public long getReplyTimeout() {
return replyTimeout;
}
@Override
public void setReplyTimeout(long timeout) {
if (Long.MAX_VALUE - System.currentTimeMillis() < timeout) {
throw new IllegalArgumentException("Extremely long reply timeout");
}
else {
replyTimeout = timeout;
}
}
private SmackConfiguration.UnknownIqRequestReplyMode unknownIqRequestReplyMode = SmackConfiguration.getUnknownIqRequestReplyMode();
/**
* Set how Smack behaves when an unknown IQ request has been received.
*
* @param unknownIqRequestReplyMode reply mode.
*/
public void setUnknownIqRequestReplyMode(UnknownIqRequestReplyMode unknownIqRequestReplyMode) {
this.unknownIqRequestReplyMode = Objects.requireNonNull(unknownIqRequestReplyMode, "Mode must not be null");
}
protected final NonzaCallback.Builder buildNonzaCallback() {
return new NonzaCallback.Builder(this);
}
protected * This method will be invoked by the connections incoming processing thread which may be shared across multiple connections and * thus it is important that no user code, e.g. in form of a callback, is invoked by this method. For the same reason, * this method must not block for an extended period of time. *
* * @param packet the stanza to notify the StanzaCollectors and receive listeners about. */ protected void invokeStanzaCollectorsAndNotifyRecvListeners(final Stanza packet) { if (packet instanceof IQ) { final IQ iq = (IQ) packet; if (iq.isRequestIQ()) { final IQ iqRequest = iq; final QName key = iqRequest.getChildElementQName(); IQRequestHandler iqRequestHandler; final IQ.Type type = iq.getType(); switch (type) { case set: synchronized (setIqRequestHandler) { iqRequestHandler = setIqRequestHandler.get(key); } break; case get: synchronized (getIqRequestHandler) { iqRequestHandler = getIqRequestHandler.get(key); } break; default: throw new IllegalStateException("Should only encounter IQ type 'get' or 'set'"); } if (iqRequestHandler == null) { StanzaError.Condition replyCondition; switch (unknownIqRequestReplyMode) { case doNotReply: return; case replyFeatureNotImplemented: replyCondition = StanzaError.Condition.feature_not_implemented; break; case replyServiceUnavailable: replyCondition = StanzaError.Condition.service_unavailable; break; default: throw new AssertionError(); } // If the IQ stanza is of type "get" or "set" with no registered IQ request handler, then answer an // IQ of type 'error' with condition 'service-unavailable'. final ErrorIQ errorIQ = IQ.createErrorResponse(iq, StanzaError.getBuilder( replyCondition).build()); // Use async sendStanza() here, since if sendStanza() would block, then some connections, e.g. // XmppNioTcpConnection, would deadlock, as this operation is performed in the same thread that is asyncGo(() -> { try { sendStanza(errorIQ); } catch (InterruptedException | NotConnectedException e) { LOGGER.log(Level.WARNING, "Exception while sending error IQ to unkown IQ request", e); } }); } else { Executor executorService = null; switch (iqRequestHandler.getMode()) { case sync: executorService = ASYNC_BUT_ORDERED.asExecutorFor(this); break; case async: executorService = this::asyncGoLimited; break; } final IQRequestHandler finalIqRequestHandler = iqRequestHandler; executorService.execute(new Runnable() { @Override public void run() { IQ response = finalIqRequestHandler.handleIQRequest(iq); if (response == null) { // It is not ideal if the IQ request handler does not return an IQ response, because RFC // 6120 § 8.1.2 does specify that a response is mandatory. But some APIs, mostly the // file transfer one, does not always return a result, so we need to handle this case. // Also sometimes a request handler may decide that it's better to not send a response, // e.g. to avoid presence leaks. return; } assert response.isResponseIQ(); response.setTo(iqRequest.getFrom()); response.setStanzaId(iqRequest.getStanzaId()); try { sendStanza(response); } catch (InterruptedException | NotConnectedException e) { LOGGER.log(Level.WARNING, "Exception while sending response to IQ request", e); } } }); } // The following returns makes it impossible for packet listeners and collectors to // filter for IQ request stanzas, i.e. IQs of type 'set' or 'get'. This is the // desired behavior. return; } } // First handle the async recv listeners. Note that this code is very similar to what follows a few lines below, // the only difference is that asyncRecvListeners is used here and that the packet listeners are started in // their own thread. final Collection