Use cached executor in AbstractXMPPConnection

This commit is contained in:
Florian Schmaus 2014-12-17 18:21:54 +01:00
parent a87227c531
commit d081055312
2 changed files with 39 additions and 12 deletions

View File

@ -29,8 +29,11 @@ import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@ -68,7 +71,6 @@ import org.jivesoftware.smack.packet.PlainStreamElement;
import org.jivesoftware.smack.provider.PacketExtensionProvider;
import org.jivesoftware.smack.provider.ProviderManager;
import org.jivesoftware.smack.rosterstore.RosterStore;
import org.jivesoftware.smack.util.Async;
import org.jivesoftware.smack.util.DNSUtil;
import org.jivesoftware.smack.util.PacketParserUtils;
import org.jivesoftware.smack.util.dns.HostAddress;
@ -214,7 +216,26 @@ public abstract class AbstractXMPPConnection implements XMPPConnection {
* PacketListeners are invoked in the same order the stanzas arrived.
*/
private final ThreadPoolExecutor executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(100), new SmackExecutorThreadFactory(connectionCounterValue));
new ArrayBlockingQueue<Runnable>(100), new SmackExecutorThreadFactory(connectionCounterValue, "IncomingNotifier"));
/**
* Creates an executor service just as {@link Executors#newCachedThreadPool()} would do, but with a keep alive time
* of 2 minutes instead of 60 seconds. And a custom thread factory to set meaningful names on the threads and set
* them 'daemon'.
*/
private final ExecutorService cachedExecutorService = new ThreadPoolExecutor(
// @formatter:off
0, // corePoolSize
Integer.MAX_VALUE, // maximumPoolSize
60L * 2, // keepAliveTime
TimeUnit.SECONDS, // keepAliveTime unit, note that MINUTES is Android API 9
new SynchronousQueue<Runnable>(), // workQueue
new SmackExecutorThreadFactory( // threadFactory
connectionCounterValue,
"CachedExecutor"
)
// @formatter:on
);
/**
* SmackExecutorThreadFactory is a *static* inner class of XMPPConnection. Note that we must not
@ -222,16 +243,18 @@ public abstract class AbstractXMPPConnection implements XMPPConnection {
*/
private static final class SmackExecutorThreadFactory implements ThreadFactory {
private final int connectionCounterValue;
private final String name;
private int count = 0;
private SmackExecutorThreadFactory(int connectionCounterValue) {
private SmackExecutorThreadFactory(int connectionCounterValue, String name) {
this.connectionCounterValue = connectionCounterValue;
this.name = name;
}
@Override
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable, "Smack Executor Service " + count++ + " ("
+ connectionCounterValue + ")");
Thread thread = new Thread(runnable);
thread.setName("Smack Executor - " + name + ' ' + count++ + " (" + connectionCounterValue + ")");
thread.setDaemon(true);
return thread;
}
@ -725,7 +748,7 @@ public abstract class AbstractXMPPConnection implements XMPPConnection {
return;
}
// Notify in a new thread, because we can
Async.go(new Runnable() {
asyncGo(new Runnable() {
@Override
public void run() {
for (PacketListener listener : listenersToNotify) {
@ -737,8 +760,7 @@ public abstract class AbstractXMPPConnection implements XMPPConnection {
continue;
}
}
}}
, "Smack Sending Listeners Notification (" + getConnectionCounter() + ')');
}});
}
@Override
@ -1017,7 +1039,10 @@ public abstract class AbstractXMPPConnection implements XMPPConnection {
// gc'ed. It is possible that the XMPPConnection instance is gc'ed while the
// listenerExecutor ExecutorService call not be gc'ed until it got shut down.
executorService.shutdownNow();
cachedExecutorService.shutdown();
removeCallbacksService.shutdownNow();
} catch (Throwable t) {
LOGGER.log(Level.WARNING, "finalize() threw trhowable", t);
}
finally {
super.finalize();
@ -1129,7 +1154,7 @@ public abstract class AbstractXMPPConnection implements XMPPConnection {
}
private final ScheduledExecutorService removeCallbacksService = new ScheduledThreadPoolExecutor(1,
new SmackExecutorThreadFactory(connectionCounterValue));
new SmackExecutorThreadFactory(connectionCounterValue, "RemoveCallbacks"));
@Override
public void sendStanzaWithResponseCallback(Packet stanza, PacketFilter replyFilter,
@ -1222,4 +1247,7 @@ public abstract class AbstractXMPPConnection implements XMPPConnection {
this.lastStanzaReceived = System.currentTimeMillis();
}
protected final void asyncGo(Runnable runnable) {
cachedExecutorService.execute(runnable);
}
}

View File

@ -72,7 +72,6 @@ import org.jivesoftware.smack.tcp.sm.packet.StreamManagement.StreamManagementFea
import org.jivesoftware.smack.tcp.sm.predicates.Predicate;
import org.jivesoftware.smack.tcp.sm.provider.ParseStreamManagement;
import org.jivesoftware.smack.util.ArrayBlockingQueueWithShutdown;
import org.jivesoftware.smack.util.Async;
import org.jivesoftware.smack.util.PacketParserUtils;
import org.jivesoftware.smack.util.StringUtils;
import org.jivesoftware.smack.util.TLSUtils;
@ -1694,7 +1693,7 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
// Only spawn a new thread if there is a chance that some listener is invoked
if (atLeastOneStanzaIdAcknowledgedListener || !stanzaAcknowledgedListeners.isEmpty()) {
Async.go(new Runnable() {
asyncGo(new Runnable() {
@Override
public void run() {
try {
@ -1715,7 +1714,7 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
LOGGER.log(Level.FINER, "Received not connected exception, aborting", e);
}
}
}, "Stanza Acknowledged Listener Executor Thread " + handledCount + " (" + getConnectionCounter() + ')');
});
}
serverHandledStanzasCount = handledCount;