Introduce asyncGoLimited()

which limits the number of threads created for asynchronous
operations.

Fixes SMACK-864.
This commit is contained in:
Florian Schmaus 2019-04-10 12:18:06 +02:00
parent 0ec7e84cbc
commit 6076a9dfa5
3 changed files with 113 additions and 3 deletions

View File

@ -26,6 +26,7 @@ import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArraySet;
@ -299,6 +300,17 @@ public abstract class AbstractXMPPConnection implements XMPPConnection {
protected static final AsyncButOrdered<AbstractXMPPConnection> ASYNC_BUT_ORDERED = new AsyncButOrdered<>();
/**
* An executor which uses {@link #asyncGoLimited(Runnable)} to limit the number of asynchronously processed runnables
* per connection.
*/
private final Executor limitedExcutor = new Executor() {
@Override
public void execute(Runnable runnable) {
asyncGoLimited(runnable);
}
};
/**
* The used host to establish the connection to
*/
@ -1166,7 +1178,7 @@ public abstract class AbstractXMPPConnection implements XMPPConnection {
executorService = ASYNC_BUT_ORDERED.asExecutorFor(this);
break;
case async:
executorService = CACHED_EXECUTOR_SERVICE;
executorService = limitedExcutor;
break;
}
final IQRequestHandler finalIqRequestHandler = iqRequestHandler;
@ -1216,7 +1228,7 @@ public abstract class AbstractXMPPConnection implements XMPPConnection {
}
for (final StanzaListener listener : listenersToNotify) {
asyncGo(new Runnable() {
asyncGoLimited(new Runnable() {
@Override
public void run() {
try {
@ -1773,6 +1785,75 @@ public abstract class AbstractXMPPConnection implements XMPPConnection {
return getClass().getSimpleName() + '[' + localEndpointString + "] (" + getConnectionCounter() + ')';
}
/**
* A queue of deferred runnables that where not executed immediately because {@link #currentAsyncRunnables} reached
* {@link #maxAsyncRunnables}. Note that we use a {@code LinkedList} in order to avoid space blowups in case the
* list ever becomes very big and shrinks again.
*/
private final Queue<Runnable> deferredAsyncRunnables = new LinkedList<>();
private int deferredAsyncRunnablesCount;
private int deferredAsyncRunnablesCountPrevious;
private int maxAsyncRunnables = SmackConfiguration.getDefaultConcurrencyLevelLimit();
private int currentAsyncRunnables;
protected void asyncGoLimited(final Runnable runnable) {
Runnable wrappedRunnable = new Runnable() {
@Override
public void run() {
runnable.run();
synchronized (deferredAsyncRunnables) {
Runnable defferredRunnable = deferredAsyncRunnables.poll();
if (defferredRunnable == null) {
currentAsyncRunnables--;
} else {
deferredAsyncRunnablesCount--;
asyncGo(defferredRunnable);
}
}
}
};
synchronized (deferredAsyncRunnables) {
if (currentAsyncRunnables < maxAsyncRunnables) {
currentAsyncRunnables++;
asyncGo(wrappedRunnable);
} else {
deferredAsyncRunnablesCount++;
deferredAsyncRunnables.add(wrappedRunnable);
}
final int HIGH_WATERMARK = 100;
final int INFORM_WATERMARK = 20;
final int deferredAsyncRunnablesCount = this.deferredAsyncRunnablesCount;
if (deferredAsyncRunnablesCount >= HIGH_WATERMARK
&& deferredAsyncRunnablesCountPrevious < HIGH_WATERMARK) {
LOGGER.log(Level.WARNING, "High watermark of " + HIGH_WATERMARK + " simultaneous executing runnables reached");
} else if (deferredAsyncRunnablesCount >= INFORM_WATERMARK
&& deferredAsyncRunnablesCountPrevious < INFORM_WATERMARK) {
LOGGER.log(Level.INFO, INFORM_WATERMARK + " simultaneous executing runnables reached");
}
deferredAsyncRunnablesCountPrevious = deferredAsyncRunnablesCount;
}
}
public void setMaxAsyncOperations(int maxAsyncOperations) {
if (maxAsyncOperations < 1) {
throw new IllegalArgumentException("Max async operations must be greater than 0");
}
synchronized (deferredAsyncRunnables) {
maxAsyncRunnables = maxAsyncOperations;
}
}
protected static void asyncGo(Runnable runnable) {
CACHED_EXECUTOR_SERVICE.execute(runnable);
}

View File

@ -55,6 +55,16 @@ public class AsyncButOrdered<K> {
private final Map<K, Boolean> threadActiveMap = new WeakHashMap<>();
private final Executor executor;
public AsyncButOrdered() {
this(null);
}
public AsyncButOrdered(Executor executor) {
this.executor = executor;
}
/**
* Invoke the given {@link Runnable} asynchronous but ordered in respect to the given key.
*
@ -86,7 +96,11 @@ public class AsyncButOrdered<K> {
if (newHandler) {
Handler handler = new Handler(keyQueue, key);
threadActiveMap.put(key, true);
AbstractXMPPConnection.asyncGo(handler);
if (executor == null) {
AbstractXMPPConnection.asyncGo(handler);
} else {
executor.execute(handler);
}
}
}

View File

@ -365,4 +365,19 @@ public final class SmackConfiguration {
public static void setUnknownIqRequestReplyMode(UnknownIqRequestReplyMode unknownIqRequestReplyMode) {
SmackConfiguration.unknownIqRequestReplyMode = Objects.requireNonNull(unknownIqRequestReplyMode, "Must set mode");
}
private static final int defaultConcurrencyLevelLimit;
static {
int availableProcessors = Runtime.getRuntime().availableProcessors();
if (availableProcessors < 8) {
defaultConcurrencyLevelLimit = 8;
} else {
defaultConcurrencyLevelLimit = (int) (availableProcessors * 1.1);
}
}
public static int getDefaultConcurrencyLevelLimit() {
return defaultConcurrencyLevelLimit;
}
}