1
0
Fork 0
mirror of https://github.com/vanitasvitae/Smack.git synced 2024-06-13 07:04:49 +02:00
Smack/smack-core/src/main/java/org/jivesoftware/smack/AsyncButOrdered.java
Florian Schmaus 9d626bf787 core: improve AsyncButOrdered
Instead of marking the handle as not running by setting the handler's
value in the map to false, we now remove simply the key if there is no
handler running. This also means we no longer need to use a weak hash
map for this.

Also reduce the size of the synchronized blocks, mainly by scheduling
the handler outside of the synchronized(threadActiveMap) block.

Make some code better readable and add some more comments. Also do
start a new handler thread if the task threw.
2019-11-08 10:14:21 +01:00

176 lines
6.7 KiB
Java

/**
*
* Copyright 2018-2019 Florian Schmaus
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jivesoftware.smack;
import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.WeakHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
/**
* Helper class to perform an operation asynchronous but keeping the order in respect to a given key.
* <p>
* A typical use pattern for this helper class consists of callbacks for an abstract entity where the order of callbacks
* matters, which eventually call user code in form of listeners. Since the order the callbacks matters, you need to use
* synchronous connection listeners. But if those listeners would invoke the user provided listeners, and if those user
* provided listeners would take a long time to complete, or even worse, block, then Smack's total progress is stalled,
* since synchronous connection listeners are invoked from the main event loop.
* </p>
* <p>
* It is common for those situations that the order of callbacks is not globally important, but only important in
* respect to an particular entity. Take chat state notifications (CSN) for example: Assume there are two contacts which
* send you CSNs. If a contact sends you first 'active' and then 'inactive, it is crucial that first the listener is
* called with 'active' and afterwards with 'inactive'. But if there is another contact is sending 'composing' followed
* by 'paused', then it is also important that the listeners are invoked in the correct order, but the order in which
* the listeners for those two contacts are invoked does not matter.
* </p>
* <p>
* Using this helper class, one would call {@link #performAsyncButOrdered(Object, Runnable)} which the remote contacts
* JID as first argument and a {@link Runnable} invoking the user listeners as second. This class guarantees that
* runnables of subsequent invocations are always executed after the runnables of previous invocations using the same
* key.
* </p>
*
* @param <K> the type of the key
* @since 4.3
*/
public class AsyncButOrdered<K> {
/**
* A map with the currently pending runnables for a given key. Note that this is a weak hash map so we do not have
* to take care of removing the keys ourselfs from the map.
*/
private final Map<K, Queue<Runnable>> pendingRunnables = new WeakHashMap<>();
/**
* A marker map if there is an active thread for the given key. Holds the responsible handler thread if one is
* active, otherwise the key is non-existend in the map.
*/
private final Map<K, Handler> threadActiveMap = new HashMap<>();
private final Executor executor;
public AsyncButOrdered() {
this(null);
}
public AsyncButOrdered(Executor executor) {
this.executor = executor;
}
private void scheduleHandler(Handler handler) {
if (executor == null) {
AbstractXMPPConnection.asyncGo(handler);
} else {
executor.execute(handler);
}
}
/**
* Invoke the given {@link Runnable} asynchronous but ordered in respect to the given key.
*
* @param key the key deriving the order
* @param runnable the {@link Runnable} to run
* @return true if a new thread was created
*/
public boolean performAsyncButOrdered(K key, Runnable runnable) {
// First check if a key queue already exists, create one if not.
Queue<Runnable> keyQueue;
synchronized (pendingRunnables) {
keyQueue = pendingRunnables.get(key);
if (keyQueue == null) {
keyQueue = new ConcurrentLinkedQueue<>();
pendingRunnables.put(key, keyQueue);
}
}
// Then add the task to the queue.
keyQueue.add(runnable);
// Finally check if there is already a handler working on that queue, create one if not.
Handler newlyCreatedHandler = null;
synchronized (threadActiveMap) {
if (!threadActiveMap.containsKey(key)) {
newlyCreatedHandler = new Handler(keyQueue, key);
// Mark that there is thread active for the given key. Note that this has to be done before scheduling
// the handler thread.
threadActiveMap.put(key, newlyCreatedHandler);
}
}
if (newlyCreatedHandler != null) {
scheduleHandler(newlyCreatedHandler);
return true;
}
return false;
}
public Executor asExecutorFor(final K key) {
return new Executor() {
@Override
public void execute(Runnable runnable) {
performAsyncButOrdered(key, runnable);
}
};
}
private class Handler implements Runnable {
private final Queue<Runnable> keyQueue;
private final K key;
Handler(Queue<Runnable> keyQueue, K key) {
this.keyQueue = keyQueue;
this.key = key;
}
@Override
public void run() {
mainloop:
while (true) {
Runnable runnable = null;
while ((runnable = keyQueue.poll()) != null) {
try {
runnable.run();
} catch (Throwable t) {
// The run() method threw, this handler thread is going to terminate because of that. We create
// a new handler to continue working on the queue while throwing the throwable so that the
// executor can handle it.
Handler newlyCreatedHandler = new Handler(keyQueue, key);
synchronized (threadActiveMap) {
threadActiveMap.put(key, newlyCreatedHandler);
}
scheduleHandler(newlyCreatedHandler);
throw t;
}
}
synchronized (threadActiveMap) {
// If the queue is empty, stop this handler, otherwise continue looping.
if (keyQueue.isEmpty()) {
threadActiveMap.remove(key);
break mainloop;
}
}
}
}
}
}