1
0
Derivar 0

core: improve AsyncButOrdered

Instead of marking the handle as not running by setting the handler's
value in the map to false, we now remove simply the key if there is no
handler running. This also means we no longer need to use a weak hash
map for this.

Also reduce the size of the synchronized blocks, mainly by scheduling
the handler outside of the synchronized(threadActiveMap) block.

Make some code better readable and add some more comments. Also do
start a new handler thread if the task threw.
Este cometimento está contido em:
Florian Schmaus 2019-11-08 10:14:21 +01:00
ascendente a7a298c5d8
cometimento 9d626bf787
1 ficheiros modificados com 41 adições e 22 eliminações

Ver ficheiro

@ -1,6 +1,6 @@
/**
*
* Copyright 2018 Florian Schmaus
* Copyright 2018-2019 Florian Schmaus
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -16,6 +16,7 @@
*/
package org.jivesoftware.smack;
import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.WeakHashMap;
@ -51,9 +52,17 @@ import java.util.concurrent.Executor;
*/
public class AsyncButOrdered<K> {
/**
* A map with the currently pending runnables for a given key. Note that this is a weak hash map so we do not have
* to take care of removing the keys ourselfs from the map.
*/
private final Map<K, Queue<Runnable>> pendingRunnables = new WeakHashMap<>();
private final Map<K, Boolean> threadActiveMap = new WeakHashMap<>();
/**
* A marker map if there is an active thread for the given key. Holds the responsible handler thread if one is
* active, otherwise the key is non-existend in the map.
*/
private final Map<K, Handler> threadActiveMap = new HashMap<>();
private final Executor executor;
@ -65,6 +74,14 @@ public class AsyncButOrdered<K> {
this.executor = executor;
}
private void scheduleHandler(Handler handler) {
if (executor == null) {
AbstractXMPPConnection.asyncGo(handler);
} else {
executor.execute(handler);
}
}
/**
* Invoke the given {@link Runnable} asynchronous but ordered in respect to the given key.
*
@ -73,6 +90,7 @@ public class AsyncButOrdered<K> {
* @return true if a new thread was created
*/
public boolean performAsyncButOrdered(K key, Runnable runnable) {
// First check if a key queue already exists, create one if not.
Queue<Runnable> keyQueue;
synchronized (pendingRunnables) {
keyQueue = pendingRunnables.get(key);
@ -82,29 +100,27 @@ public class AsyncButOrdered<K> {
}
}
// Then add the task to the queue.
keyQueue.add(runnable);
boolean newHandler;
// Finally check if there is already a handler working on that queue, create one if not.
Handler newlyCreatedHandler = null;
synchronized (threadActiveMap) {
Boolean threadActive = threadActiveMap.get(key);
if (threadActive == null) {
threadActive = false;
threadActiveMap.put(key, threadActive);
}
if (!threadActiveMap.containsKey(key)) {
newlyCreatedHandler = new Handler(keyQueue, key);
newHandler = !threadActive;
if (newHandler) {
Handler handler = new Handler(keyQueue, key);
threadActiveMap.put(key, true);
if (executor == null) {
AbstractXMPPConnection.asyncGo(handler);
} else {
executor.execute(handler);
}
// Mark that there is thread active for the given key. Note that this has to be done before scheduling
// the handler thread.
threadActiveMap.put(key, newlyCreatedHandler);
}
}
return newHandler;
if (newlyCreatedHandler != null) {
scheduleHandler(newlyCreatedHandler);
return true;
}
return false;
}
public Executor asExecutorFor(final K key) {
@ -134,11 +150,14 @@ public class AsyncButOrdered<K> {
try {
runnable.run();
} catch (Throwable t) {
// The run() method threw, this handler thread is going to terminate because of that. Ensure we note
// that in the map.
// The run() method threw, this handler thread is going to terminate because of that. We create
// a new handler to continue working on the queue while throwing the throwable so that the
// executor can handle it.
Handler newlyCreatedHandler = new Handler(keyQueue, key);
synchronized (threadActiveMap) {
threadActiveMap.put(key, false);
threadActiveMap.put(key, newlyCreatedHandler);
}
scheduleHandler(newlyCreatedHandler);
throw t;
}
}
@ -146,7 +165,7 @@ public class AsyncButOrdered<K> {
synchronized (threadActiveMap) {
// If the queue is empty, stop this handler, otherwise continue looping.
if (keyQueue.isEmpty()) {
threadActiveMap.put(key, false);
threadActiveMap.remove(key);
break mainloop;
}
}