Add AbstractXMPPConnection.setConcurrencyLevel(int)

Also limit the max pool size of the remove callbacks service and use the
same keep alive time for idle threads as the cached executor service
uses.

Note that it would be possible to merge those two. But this could lead
to tasks from the cached executor service blocking the removal of
callbacks, which we don't want.
This commit is contained in:
Florian Schmaus 2015-01-24 12:18:51 +01:00
parent 605c29a6fc
commit e6045c6593
1 changed files with 38 additions and 6 deletions

View File

@ -235,6 +235,40 @@ public abstract class AbstractXMPPConnection implements XMPPConnection {
private final ThreadPoolExecutor executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(100), new SmackExecutorThreadFactory(connectionCounterValue, "Incoming Processor"));
/**
* This scheduled thread pool executor is used to remove pending callbacks.
*/
private final ScheduledThreadPoolExecutor removeCallbacksService = new ScheduledThreadPoolExecutor(1,
new SmackExecutorThreadFactory(connectionCounterValue, "Remove Callbacks"));
private static int concurrencyLevel = Runtime.getRuntime().availableProcessors() + 1;
/**
* Set the concurrency level used by newly created connections.
* <p>
* The concurrency level determines the maximum pool size of the executor service that is used to e.g. invoke
* callbacks and IQ request handlers.
* </p>
* <p>
* The default value is <code>Runtime.getRuntime().availableProcessors() + 1</code>. Note that the number of
* available processors may change at runtime. So you may need to adjust it to your enviornment, although in most
* cases this should not be necessary.
* </p>
*
* @param concurrencyLevel the concurrency level used by new connections.
*/
public static void setConcurrencyLevel(int concurrencyLevel) {
if (concurrencyLevel < 1) {
throw new IllegalArgumentException("concurrencyLevel must be greater than zero");
}
AbstractXMPPConnection.concurrencyLevel = concurrencyLevel;
}
/**
* The constant long '120'.
*/
private static final long THREAD_KEEP_ALIVE_SECONDS = 60L * 2;
/**
* Creates an executor service just as {@link Executors#newCachedThreadPool()} would do, but with a keep alive time
* of 2 minutes instead of 60 seconds. And a custom thread factory to set meaningful names on the threads and set
@ -243,8 +277,8 @@ public abstract class AbstractXMPPConnection implements XMPPConnection {
private final ExecutorService cachedExecutorService = new ThreadPoolExecutor(
// @formatter:off
0, // corePoolSize
Integer.MAX_VALUE, // maximumPoolSize
60L * 2, // keepAliveTime
concurrencyLevel, // maximumPoolSize
THREAD_KEEP_ALIVE_SECONDS, // keepAliveTime
TimeUnit.SECONDS, // keepAliveTime unit, note that MINUTES is Android API 9
new SynchronousQueue<Runnable>(), // workQueue
new SmackExecutorThreadFactory( // threadFactory
@ -293,7 +327,8 @@ public abstract class AbstractXMPPConnection implements XMPPConnection {
*/
protected AbstractXMPPConnection(ConnectionConfiguration configuration) {
config = configuration;
removeCallbacksService.setKeepAliveTime(30, TimeUnit.SECONDS);
removeCallbacksService.setMaximumPoolSize(concurrencyLevel);
removeCallbacksService.setKeepAliveTime(THREAD_KEEP_ALIVE_SECONDS, TimeUnit.SECONDS);
}
protected ConnectionConfiguration getConfiguration() {
@ -1354,9 +1389,6 @@ public abstract class AbstractXMPPConnection implements XMPPConnection {
streamFeatures.put(key, feature);
}
private final ScheduledThreadPoolExecutor removeCallbacksService = new ScheduledThreadPoolExecutor(1,
new SmackExecutorThreadFactory(connectionCounterValue, "Remove Callbacks"));
@Override
public void sendStanzaWithResponseCallback(Packet stanza, PacketFilter replyFilter,
PacketListener callback) throws NotConnectedException {