diff --git a/smack-core/src/main/java/org/jivesoftware/smack/AbstractXMPPConnection.java b/smack-core/src/main/java/org/jivesoftware/smack/AbstractXMPPConnection.java index 78855474a..6bbf7c8b0 100644 --- a/smack-core/src/main/java/org/jivesoftware/smack/AbstractXMPPConnection.java +++ b/smack-core/src/main/java/org/jivesoftware/smack/AbstractXMPPConnection.java @@ -76,7 +76,6 @@ import org.jivesoftware.smack.parsing.ParsingExceptionCallback; import org.jivesoftware.smack.provider.ExtensionElementProvider; import org.jivesoftware.smack.provider.ProviderManager; import org.jivesoftware.smack.sasl.core.SASLAnonymous; -import org.jivesoftware.smack.util.BoundedThreadPoolExecutor; import org.jivesoftware.smack.util.DNSUtil; import org.jivesoftware.smack.util.Objects; import org.jivesoftware.smack.util.PacketParserUtils; @@ -233,14 +232,6 @@ public abstract class AbstractXMPPConnection implements XMPPConnection { private ParsingExceptionCallback parsingExceptionCallback = SmackConfiguration.getDefaultParsingExceptionCallback(); - /** - * ExecutorService used to invoke the PacketListeners on newly arrived and parsed stanzas. It is - * important that we use a single threaded ExecutorService in order to guarantee that the - * PacketListeners are invoked in the same order the stanzas arrived. - */ - private final BoundedThreadPoolExecutor executorService = new BoundedThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, - 100, new SmackExecutorThreadFactory(this, "Incoming Processor")); - /** * This scheduled thread pool executor is used to remove pending callbacks. */ @@ -1081,17 +1072,17 @@ public abstract class AbstractXMPPConnection implements XMPPConnection { assert (stanza != null); lastStanzaReceived = System.currentTimeMillis(); // Deliver the incoming packet to listeners. - executorService.executeBlocking(new Runnable() { - @Override - public void run() { - invokeStanzaCollectorsAndNotifyRecvListeners(stanza); - } - }); + invokeStanzaCollectorsAndNotifyRecvListeners(stanza); } /** * Invoke {@link StanzaCollector#processStanza(Stanza)} for every * StanzaCollector with the given packet. Also notify the receive listeners with a matching stanza(/packet) filter about the packet. + *

+ * This method will be invoked by the connections incoming processing thread which may be shared across multiple connections and + * thus it is important that no user code, e.g. in form of a callback, is invoked by this method. For the same reason, + * this method must not block for an extended period of time. + *

* * @param packet the stanza(/packet) to notify the StanzaCollectors and receive listeners about. */ @@ -1413,7 +1404,6 @@ public abstract class AbstractXMPPConnection implements XMPPConnection { // reference to their ExecutorService which prevents the ExecutorService from being // gc'ed. It is possible that the XMPPConnection instance is gc'ed while the // listenerExecutor ExecutorService call not be gc'ed until it got shut down. - executorService.shutdownNow(); cachedExecutorService.shutdown(); removeCallbacksService.shutdownNow(); singleThreadedExecutorService.shutdownNow(); diff --git a/smack-core/src/main/java/org/jivesoftware/smack/util/BoundedThreadPoolExecutor.java b/smack-core/src/main/java/org/jivesoftware/smack/util/BoundedThreadPoolExecutor.java deleted file mode 100644 index c09ddd5d5..000000000 --- a/smack-core/src/main/java/org/jivesoftware/smack/util/BoundedThreadPoolExecutor.java +++ /dev/null @@ -1,63 +0,0 @@ -/** - * - * Copyright 2015 Florian Schmaus - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.jivesoftware.smack.util; - -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.Semaphore; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -public class BoundedThreadPoolExecutor extends ThreadPoolExecutor { - - private final Semaphore semaphore; - - public BoundedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, - TimeUnit unit, int bound, ThreadFactory threadFactory) { - // One could think that the array blocking queue bound should be "bound - 1" because the bound protected by the - // Semaphore also includes the "slot" in the worker thread executing the Runnable. But using that as bound could - // actually cause a RejectedExecutionException as the queue could fill up while the worker thread remains - // unscheduled and is thus not removing any entries. - super(corePoolSize, maximumPoolSize, keepAliveTime, - unit, new ArrayBlockingQueue(bound), threadFactory); - semaphore = new Semaphore(bound); - } - - public void executeBlocking(final Runnable command) throws InterruptedException { - semaphore.acquire(); - try { - execute(new Runnable() { - @Override - public void run() { - try { - command.run(); - } finally { - semaphore.release(); - } - } - }); - } catch (Exception e) { - semaphore.release(); - if (e instanceof RejectedExecutionException) { - throw (RejectedExecutionException) e; - } else { - throw new RuntimeException(e); - } - } - } -}