diff --git a/smack-core/src/main/java/org/jivesoftware/smack/AsyncButOrdered.java b/smack-core/src/main/java/org/jivesoftware/smack/AsyncButOrdered.java index 6fb6244ab..526e3cd2e 100644 --- a/smack-core/src/main/java/org/jivesoftware/smack/AsyncButOrdered.java +++ b/smack-core/src/main/java/org/jivesoftware/smack/AsyncButOrdered.java @@ -1,6 +1,6 @@ /** * - * Copyright 2018 Florian Schmaus + * Copyright 2018-2019 Florian Schmaus * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,7 @@ */ package org.jivesoftware.smack; +import java.util.HashMap; import java.util.Map; import java.util.Queue; import java.util.WeakHashMap; @@ -51,9 +52,17 @@ import java.util.concurrent.Executor; */ public class AsyncButOrdered { + /** + * A map with the currently pending runnables for a given key. Note that this is a weak hash map so we do not have + * to take care of removing the keys ourselfs from the map. + */ private final Map> pendingRunnables = new WeakHashMap<>(); - private final Map threadActiveMap = new WeakHashMap<>(); + /** + * A marker map if there is an active thread for the given key. Holds the responsible handler thread if one is + * active, otherwise the key is non-existend in the map. + */ + private final Map threadActiveMap = new HashMap<>(); private final Executor executor; @@ -65,6 +74,14 @@ public class AsyncButOrdered { this.executor = executor; } + private void scheduleHandler(Handler handler) { + if (executor == null) { + AbstractXMPPConnection.asyncGo(handler); + } else { + executor.execute(handler); + } + } + /** * Invoke the given {@link Runnable} asynchronous but ordered in respect to the given key. * @@ -73,6 +90,7 @@ public class AsyncButOrdered { * @return true if a new thread was created */ public boolean performAsyncButOrdered(K key, Runnable runnable) { + // First check if a key queue already exists, create one if not. Queue keyQueue; synchronized (pendingRunnables) { keyQueue = pendingRunnables.get(key); @@ -82,29 +100,27 @@ public class AsyncButOrdered { } } + // Then add the task to the queue. keyQueue.add(runnable); - boolean newHandler; + // Finally check if there is already a handler working on that queue, create one if not. + Handler newlyCreatedHandler = null; synchronized (threadActiveMap) { - Boolean threadActive = threadActiveMap.get(key); - if (threadActive == null) { - threadActive = false; - threadActiveMap.put(key, threadActive); - } + if (!threadActiveMap.containsKey(key)) { + newlyCreatedHandler = new Handler(keyQueue, key); - newHandler = !threadActive; - if (newHandler) { - Handler handler = new Handler(keyQueue, key); - threadActiveMap.put(key, true); - if (executor == null) { - AbstractXMPPConnection.asyncGo(handler); - } else { - executor.execute(handler); - } + // Mark that there is thread active for the given key. Note that this has to be done before scheduling + // the handler thread. + threadActiveMap.put(key, newlyCreatedHandler); } } - return newHandler; + if (newlyCreatedHandler != null) { + scheduleHandler(newlyCreatedHandler); + return true; + } + + return false; } public Executor asExecutorFor(final K key) { @@ -134,11 +150,14 @@ public class AsyncButOrdered { try { runnable.run(); } catch (Throwable t) { - // The run() method threw, this handler thread is going to terminate because of that. Ensure we note - // that in the map. + // The run() method threw, this handler thread is going to terminate because of that. We create + // a new handler to continue working on the queue while throwing the throwable so that the + // executor can handle it. + Handler newlyCreatedHandler = new Handler(keyQueue, key); synchronized (threadActiveMap) { - threadActiveMap.put(key, false); + threadActiveMap.put(key, newlyCreatedHandler); } + scheduleHandler(newlyCreatedHandler); throw t; } } @@ -146,7 +165,7 @@ public class AsyncButOrdered { synchronized (threadActiveMap) { // If the queue is empty, stop this handler, otherwise continue looping. if (keyQueue.isEmpty()) { - threadActiveMap.put(key, false); + threadActiveMap.remove(key); break mainloop; } }