diff --git a/smack-bosh/src/main/java/org/jivesoftware/smack/bosh/XMPPBOSHConnection.java b/smack-bosh/src/main/java/org/jivesoftware/smack/bosh/XMPPBOSHConnection.java index f92a6dd9f..03c180a15 100644 --- a/smack-bosh/src/main/java/org/jivesoftware/smack/bosh/XMPPBOSHConnection.java +++ b/smack-bosh/src/main/java/org/jivesoftware/smack/bosh/XMPPBOSHConnection.java @@ -193,9 +193,6 @@ public class XMPPBOSHConnection extends AbstractXMPPConnection { + getHost() + ":" + getPort() + "."; throw new SmackException(errorMessage); } - - tlsHandled.reportSuccess(); - saslFeatureReceived.reportSuccess(); } @Override diff --git a/smack-core/src/main/java/org/jivesoftware/smack/AbstractXMPPConnection.java b/smack-core/src/main/java/org/jivesoftware/smack/AbstractXMPPConnection.java index 19f07e7b9..6c2daf59a 100644 --- a/smack-core/src/main/java/org/jivesoftware/smack/AbstractXMPPConnection.java +++ b/smack-core/src/main/java/org/jivesoftware/smack/AbstractXMPPConnection.java @@ -31,8 +31,11 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; @@ -78,7 +81,6 @@ import org.jivesoftware.smack.parsing.ParsingExceptionCallback; import org.jivesoftware.smack.provider.ExtensionElementProvider; import org.jivesoftware.smack.provider.ProviderManager; import org.jivesoftware.smack.sasl.core.SASLAnonymous; -import org.jivesoftware.smack.util.BoundedThreadPoolExecutor; import org.jivesoftware.smack.util.DNSUtil; import org.jivesoftware.smack.util.Objects; import org.jivesoftware.smack.util.PacketParserUtils; @@ -235,14 +237,6 @@ public abstract class AbstractXMPPConnection implements XMPPConnection { private ParsingExceptionCallback parsingExceptionCallback = SmackConfiguration.getDefaultParsingExceptionCallback(); - /** - * ExecutorService used to invoke the PacketListeners on newly arrived and parsed stanzas. It is - * important that we use a single threaded ExecutorService in order to guarantee that the - * PacketListeners are invoked in the same order the stanzas arrived. - */ - private final BoundedThreadPoolExecutor executorService = new BoundedThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, - 100, new SmackExecutorThreadFactory(this, "Incoming Processor")); - /** * This scheduled thread pool executor is used to remove pending callbacks. */ @@ -253,24 +247,24 @@ public abstract class AbstractXMPPConnection implements XMPPConnection { * A cached thread pool executor service with custom thread factory to set meaningful names on the threads and set * them 'daemon'. */ - private final ExecutorService cachedExecutorService = Executors.newCachedThreadPool( - // @formatter:off - // CHECKSTYLE:OFF - new SmackExecutorThreadFactory( - this, - "Cached Executor" - ) - // @formatter:on - // CHECKSTYLE:ON - ); + private static final ExecutorService CACHED_EXECUTOR_SERVICE = Executors.newCachedThreadPool(new ThreadFactory() { + @Override + public Thread newThread(Runnable runnable) { + Thread thread = new Thread(runnable); + thread.setName("Smack Cached Executor"); + thread.setDaemon(true); + return thread; + } + }); /** * A executor service used to invoke the callbacks of synchronous stanza(/packet) listeners. We use a executor service to * decouple incoming stanza processing from callback invocation. It is important that order of callback invocation * is the same as the order of the incoming stanzas. Therefore we use a single threaded executor service. */ - private final ExecutorService singleThreadedExecutorService = Executors.newSingleThreadExecutor(new SmackExecutorThreadFactory( - this, "Single Threaded Executor")); + private final ExecutorService singleThreadedExecutorService = new ThreadPoolExecutor(0, 1, 30L, + TimeUnit.SECONDS, new LinkedBlockingQueue(), + new SmackExecutorThreadFactory(this, "Single Threaded Executor")); /** * The used host to establish the connection to @@ -381,12 +375,6 @@ public abstract class AbstractXMPPConnection implements XMPPConnection { // Perform the actual connection to the XMPP service connectInternal(); - // TLS handled will be successful either if TLS was established, or if it was not mandatory. - tlsHandled.checkIfSuccessOrWaitOrThrow(); - - // Wait with SASL auth until the SASL mechanisms have been received - saslFeatureReceived.checkIfSuccessOrWaitOrThrow(); - // If TLS is required but the server doesn't offer it, disconnect // from the server and throw an error. First check if we've already negotiated TLS // and are secure, however (features get parsed a second time after TLS is established). @@ -702,8 +690,12 @@ public abstract class AbstractXMPPConnection implements XMPPConnection { * */ public void disconnect() { + Presence unavailablePresence = null; + if (isAuthenticated()) { + unavailablePresence = new Presence(Presence.Type.unavailable); + } try { - disconnect(new Presence(Presence.Type.unavailable)); + disconnect(unavailablePresence); } catch (NotConnectedException e) { LOGGER.log(Level.FINEST, "Connection is already disconnected", e); @@ -718,15 +710,18 @@ public abstract class AbstractXMPPConnection implements XMPPConnection { * stanza(/packet) is set with online information, but most XMPP servers will deliver the full * presence stanza(/packet) with whatever data is set. * - * @param unavailablePresence the presence stanza(/packet) to send during shutdown. + * @param unavailablePresence the optional presence stanza to send during shutdown. * @throws NotConnectedException */ public synchronized void disconnect(Presence unavailablePresence) throws NotConnectedException { - try { - sendStanza(unavailablePresence); - } - catch (InterruptedException e) { - LOGGER.log(Level.FINE, "Was interrupted while sending unavailable presence. Continuing to disconnect the connection", e); + if (unavailablePresence != null) { + try { + sendStanza(unavailablePresence); + } catch (InterruptedException e) { + LOGGER.log(Level.FINE, + "Was interrupted while sending unavailable presence. Continuing to disconnect the connection", + e); + } } shutdown(); callConnectionClosedListener(); @@ -1015,17 +1010,17 @@ public abstract class AbstractXMPPConnection implements XMPPConnection { lastStanzaReceived = System.currentTimeMillis(); // Deliver the incoming packet to listeners. - executorService.executeBlocking(new Runnable() { - @Override - public void run() { - invokeStanzaCollectorsAndNotifyRecvListeners(stanza); - } - }); + invokeStanzaCollectorsAndNotifyRecvListeners(stanza); } /** * Invoke {@link StanzaCollector#processStanza(Stanza)} for every * StanzaCollector with the given packet. Also notify the receive listeners with a matching stanza(/packet) filter about the packet. + *

+ * This method will be invoked by the connections incoming processing thread which may be shared across multiple connections and + * thus it is important that no user code, e.g. in form of a callback, is invoked by this method. For the same reason, + * this method must not block for an extended period of time. + *

* * @param packet the stanza(/packet) to notify the StanzaCollectors and receive listeners about. */ @@ -1082,7 +1077,7 @@ public abstract class AbstractXMPPConnection implements XMPPConnection { executorService = singleThreadedExecutorService; break; case async: - executorService = cachedExecutorService; + executorService = CACHED_EXECUTOR_SERVICE; break; } final IQRequestHandler finalIqRequestHandler = iqRequestHandler; @@ -1342,8 +1337,6 @@ public abstract class AbstractXMPPConnection implements XMPPConnection { // reference to their ExecutorService which prevents the ExecutorService from being // gc'ed. It is possible that the XMPPConnection instance is gc'ed while the // listenerExecutor ExecutorService call not be gc'ed until it got shut down. - executorService.shutdownNow(); - cachedExecutorService.shutdown(); removeCallbacksService.shutdownNow(); singleThreadedExecutorService.shutdownNow(); } catch (Throwable t) { @@ -1708,7 +1701,7 @@ public abstract class AbstractXMPPConnection implements XMPPConnection { } protected final void asyncGo(Runnable runnable) { - cachedExecutorService.execute(runnable); + CACHED_EXECUTOR_SERVICE.execute(runnable); } protected final ScheduledFuture schedule(Runnable runnable, long delay, TimeUnit unit) { diff --git a/smack-core/src/main/java/org/jivesoftware/smack/util/BoundedThreadPoolExecutor.java b/smack-core/src/main/java/org/jivesoftware/smack/util/BoundedThreadPoolExecutor.java deleted file mode 100644 index c09ddd5d5..000000000 --- a/smack-core/src/main/java/org/jivesoftware/smack/util/BoundedThreadPoolExecutor.java +++ /dev/null @@ -1,63 +0,0 @@ -/** - * - * Copyright 2015 Florian Schmaus - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.jivesoftware.smack.util; - -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.Semaphore; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -public class BoundedThreadPoolExecutor extends ThreadPoolExecutor { - - private final Semaphore semaphore; - - public BoundedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, - TimeUnit unit, int bound, ThreadFactory threadFactory) { - // One could think that the array blocking queue bound should be "bound - 1" because the bound protected by the - // Semaphore also includes the "slot" in the worker thread executing the Runnable. But using that as bound could - // actually cause a RejectedExecutionException as the queue could fill up while the worker thread remains - // unscheduled and is thus not removing any entries. - super(corePoolSize, maximumPoolSize, keepAliveTime, - unit, new ArrayBlockingQueue(bound), threadFactory); - semaphore = new Semaphore(bound); - } - - public void executeBlocking(final Runnable command) throws InterruptedException { - semaphore.acquire(); - try { - execute(new Runnable() { - @Override - public void run() { - try { - command.run(); - } finally { - semaphore.release(); - } - } - }); - } catch (Exception e) { - semaphore.release(); - if (e instanceof RejectedExecutionException) { - throw (RejectedExecutionException) e; - } else { - throw new RuntimeException(e); - } - } - } -} diff --git a/smack-core/src/test/java/org/jivesoftware/smack/DummyConnection.java b/smack-core/src/test/java/org/jivesoftware/smack/DummyConnection.java index e5351835e..ab4a43bff 100644 --- a/smack-core/src/test/java/org/jivesoftware/smack/DummyConnection.java +++ b/smack-core/src/test/java/org/jivesoftware/smack/DummyConnection.java @@ -92,8 +92,6 @@ public class DummyConnection extends AbstractXMPPConnection { @Override protected void connectInternal() { connected = true; - saslFeatureReceived.reportSuccess(); - tlsHandled.reportSuccess(); streamId = "dummy-" + new Random(new Date().getTime()).nextInt(); // TODO: Remove in Smack 4.3, and likely the suppression of the deprecation warning. diff --git a/smack-extensions/src/main/java/org/jivesoftware/smackx/xdata/Form.java b/smack-extensions/src/main/java/org/jivesoftware/smackx/xdata/Form.java index 39e639304..63c8e5b5b 100644 --- a/smack-extensions/src/main/java/org/jivesoftware/smackx/xdata/Form.java +++ b/smack-extensions/src/main/java/org/jivesoftware/smackx/xdata/Form.java @@ -235,7 +235,7 @@ public class Form { if (field.getType() != FormField.Type.bool) { throw new IllegalArgumentException("This field is not of type boolean."); } - setAnswer(field, (value ? "1" : "0")); + setAnswer(field, Boolean.toString(value)); } /** diff --git a/smack-integration-test/src/main/java/org/jivesoftware/smackx/pubsub/PubSubIntegrationTest.java b/smack-integration-test/src/main/java/org/jivesoftware/smackx/pubsub/PubSubIntegrationTest.java index 403535d9f..a8f19b6c5 100644 --- a/smack-integration-test/src/main/java/org/jivesoftware/smackx/pubsub/PubSubIntegrationTest.java +++ b/smack-integration-test/src/main/java/org/jivesoftware/smackx/pubsub/PubSubIntegrationTest.java @@ -58,6 +58,7 @@ public class PubSubIntegrationTest extends AbstractSmackIntegrationTest { // items do not need payload, to prevent payload-required error responses when // publishing the item. config.setDeliverPayloads(false); + config.setPersistentItems(true); Node node = pubSubManagerOne.createNode(nodename, config); try { LeafNode leafNode = (LeafNode) node; diff --git a/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java b/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java index 2aed087a5..afa57c24e 100644 --- a/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java +++ b/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java @@ -895,6 +895,12 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { // We connected successfully to the servers TCP port initConnection(); + + // TLS handled will be successful either if TLS was established, or if it was not mandatory. + tlsHandled.checkIfSuccessOrWaitOrThrow(); + + // Wait with SASL auth until the SASL mechanisms have been received + saslFeatureReceived.checkIfSuccessOrWaitOrThrow(); } /**