Fixed AbstractXMPPConnection.cachedExecutorService

the combination with concurrencyLevel and LinkedBlockingQueue never
worked as intented. The idea was that the cachedExecutorService would
spawn new threads until maximumPoolSize (=concurrencyLevel) is reached,
and then start queing the Runnables.

But this was not the case, since ThreadPoolExecutor does not take into
consideration if the worker threads is busy, i.e. executing a Runnable,
or idle, i.e. waiting for a Runnable.

This means that if a busy Worker would execute a Runnable, which would
block, because it's waiting for an event (e.g. an incoming IQ
request), then the handling of those incoming IQ request would be
queued by ThreadPoolExecutor, because no fewer threads then corePoolSize
are running and the task can be queued (since the LinkedBlockingQueue is
unbounded).
This commit is contained in:
Florian Schmaus 2015-02-27 23:38:13 +01:00
parent caa7b9acb8
commit 21c0be5e2a
1 changed files with 2 additions and 37 deletions

View File

@ -32,7 +32,6 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadPoolExecutor;
@ -247,46 +246,12 @@ public abstract class AbstractXMPPConnection implements XMPPConnection {
private final ScheduledExecutorService removeCallbacksService = Executors.newSingleThreadScheduledExecutor(
new SmackExecutorThreadFactory(connectionCounterValue, "Remove Callbacks"));
private static int concurrencyLevel = Runtime.getRuntime().availableProcessors() + 1;
/**
* Set the concurrency level used by newly created connections.
* <p>
* The concurrency level determines the maximum pool size of the executor service that is used to e.g. invoke
* callbacks and IQ request handlers.
* </p>
* <p>
* The default value is <code>Runtime.getRuntime().availableProcessors() + 1</code>. Note that the number of
* available processors may change at runtime. So you may need to adjust it to your enviornment, although in most
* cases this should not be necessary.
* </p>
*
* @param concurrencyLevel the concurrency level used by new connections.
*/
public static void setConcurrencyLevel(int concurrencyLevel) {
if (concurrencyLevel < 1) {
throw new IllegalArgumentException("concurrencyLevel must be greater than zero");
}
AbstractXMPPConnection.concurrencyLevel = concurrencyLevel;
}
/**
* The constant long '120'.
*/
private static final long THREAD_KEEP_ALIVE_SECONDS = 60L * 2;
/**
* Creates an executor service just as {@link Executors#newCachedThreadPool()} would do, but with a keep alive time
* of 2 minutes instead of 60 seconds. And a custom thread factory to set meaningful names on the threads and set
* A cached thread pool executor service with custom thread factory to set meaningful names on the threads and set
* them 'daemon'.
*/
private final ExecutorService cachedExecutorService = new ThreadPoolExecutor(
private final ExecutorService cachedExecutorService = Executors.newCachedThreadPool(
// @formatter:off
0, // corePoolSize
concurrencyLevel, // maximumPoolSize
THREAD_KEEP_ALIVE_SECONDS, // keepAliveTime
TimeUnit.SECONDS, // keepAliveTime unit, note that MINUTES is Android API 9
new LinkedBlockingQueue<Runnable>(), // workQueue
new SmackExecutorThreadFactory( // threadFactory
connectionCounterValue,
"Cached Executor"