From d0810553124b89e4b9d44cb7cd3b95e58d8cd802 Mon Sep 17 00:00:00 2001 From: Florian Schmaus Date: Wed, 17 Dec 2014 18:21:54 +0100 Subject: [PATCH] Use cached executor in AbstractXMPPConnection --- .../smack/AbstractXMPPConnection.java | 46 +++++++++++++++---- .../smack/tcp/XMPPTCPConnection.java | 5 +- 2 files changed, 39 insertions(+), 12 deletions(-) diff --git a/smack-core/src/main/java/org/jivesoftware/smack/AbstractXMPPConnection.java b/smack-core/src/main/java/org/jivesoftware/smack/AbstractXMPPConnection.java index 6b30bfa15..1fe7bafef 100644 --- a/smack-core/src/main/java/org/jivesoftware/smack/AbstractXMPPConnection.java +++ b/smack-core/src/main/java/org/jivesoftware/smack/AbstractXMPPConnection.java @@ -29,8 +29,11 @@ import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -68,7 +71,6 @@ import org.jivesoftware.smack.packet.PlainStreamElement; import org.jivesoftware.smack.provider.PacketExtensionProvider; import org.jivesoftware.smack.provider.ProviderManager; import org.jivesoftware.smack.rosterstore.RosterStore; -import org.jivesoftware.smack.util.Async; import org.jivesoftware.smack.util.DNSUtil; import org.jivesoftware.smack.util.PacketParserUtils; import org.jivesoftware.smack.util.dns.HostAddress; @@ -214,7 +216,26 @@ public abstract class AbstractXMPPConnection implements XMPPConnection { * PacketListeners are invoked in the same order the stanzas arrived. */ private final ThreadPoolExecutor executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, - new ArrayBlockingQueue(100), new SmackExecutorThreadFactory(connectionCounterValue)); + new ArrayBlockingQueue(100), new SmackExecutorThreadFactory(connectionCounterValue, "IncomingNotifier")); + + /** + * Creates an executor service just as {@link Executors#newCachedThreadPool()} would do, but with a keep alive time + * of 2 minutes instead of 60 seconds. And a custom thread factory to set meaningful names on the threads and set + * them 'daemon'. + */ + private final ExecutorService cachedExecutorService = new ThreadPoolExecutor( + // @formatter:off + 0, // corePoolSize + Integer.MAX_VALUE, // maximumPoolSize + 60L * 2, // keepAliveTime + TimeUnit.SECONDS, // keepAliveTime unit, note that MINUTES is Android API 9 + new SynchronousQueue(), // workQueue + new SmackExecutorThreadFactory( // threadFactory + connectionCounterValue, + "CachedExecutor" + ) + // @formatter:on + ); /** * SmackExecutorThreadFactory is a *static* inner class of XMPPConnection. Note that we must not @@ -222,16 +243,18 @@ public abstract class AbstractXMPPConnection implements XMPPConnection { */ private static final class SmackExecutorThreadFactory implements ThreadFactory { private final int connectionCounterValue; + private final String name; private int count = 0; - private SmackExecutorThreadFactory(int connectionCounterValue) { + private SmackExecutorThreadFactory(int connectionCounterValue, String name) { this.connectionCounterValue = connectionCounterValue; + this.name = name; } @Override public Thread newThread(Runnable runnable) { - Thread thread = new Thread(runnable, "Smack Executor Service " + count++ + " (" - + connectionCounterValue + ")"); + Thread thread = new Thread(runnable); + thread.setName("Smack Executor - " + name + ' ' + count++ + " (" + connectionCounterValue + ")"); thread.setDaemon(true); return thread; } @@ -725,7 +748,7 @@ public abstract class AbstractXMPPConnection implements XMPPConnection { return; } // Notify in a new thread, because we can - Async.go(new Runnable() { + asyncGo(new Runnable() { @Override public void run() { for (PacketListener listener : listenersToNotify) { @@ -737,8 +760,7 @@ public abstract class AbstractXMPPConnection implements XMPPConnection { continue; } } - }} - , "Smack Sending Listeners Notification (" + getConnectionCounter() + ')'); + }}); } @Override @@ -1017,7 +1039,10 @@ public abstract class AbstractXMPPConnection implements XMPPConnection { // gc'ed. It is possible that the XMPPConnection instance is gc'ed while the // listenerExecutor ExecutorService call not be gc'ed until it got shut down. executorService.shutdownNow(); + cachedExecutorService.shutdown(); removeCallbacksService.shutdownNow(); + } catch (Throwable t) { + LOGGER.log(Level.WARNING, "finalize() threw trhowable", t); } finally { super.finalize(); @@ -1129,7 +1154,7 @@ public abstract class AbstractXMPPConnection implements XMPPConnection { } private final ScheduledExecutorService removeCallbacksService = new ScheduledThreadPoolExecutor(1, - new SmackExecutorThreadFactory(connectionCounterValue)); + new SmackExecutorThreadFactory(connectionCounterValue, "RemoveCallbacks")); @Override public void sendStanzaWithResponseCallback(Packet stanza, PacketFilter replyFilter, @@ -1222,4 +1247,7 @@ public abstract class AbstractXMPPConnection implements XMPPConnection { this.lastStanzaReceived = System.currentTimeMillis(); } + protected final void asyncGo(Runnable runnable) { + cachedExecutorService.execute(runnable); + } } diff --git a/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java b/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java index f82170060..a51c6997e 100644 --- a/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java +++ b/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java @@ -72,7 +72,6 @@ import org.jivesoftware.smack.tcp.sm.packet.StreamManagement.StreamManagementFea import org.jivesoftware.smack.tcp.sm.predicates.Predicate; import org.jivesoftware.smack.tcp.sm.provider.ParseStreamManagement; import org.jivesoftware.smack.util.ArrayBlockingQueueWithShutdown; -import org.jivesoftware.smack.util.Async; import org.jivesoftware.smack.util.PacketParserUtils; import org.jivesoftware.smack.util.StringUtils; import org.jivesoftware.smack.util.TLSUtils; @@ -1694,7 +1693,7 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { // Only spawn a new thread if there is a chance that some listener is invoked if (atLeastOneStanzaIdAcknowledgedListener || !stanzaAcknowledgedListeners.isEmpty()) { - Async.go(new Runnable() { + asyncGo(new Runnable() { @Override public void run() { try { @@ -1715,7 +1714,7 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { LOGGER.log(Level.FINER, "Received not connected exception, aborting", e); } } - }, "Stanza Acknowledged Listener Executor Thread " + handledCount + " (" + getConnectionCounter() + ')'); + }); } serverHandledStanzasCount = handledCount;