diff --git a/config/checkstyle.xml b/config/checkstyle.xml index 041e31ffb..d925f4909 100644 --- a/config/checkstyle.xml +++ b/config/checkstyle.xml @@ -6,6 +6,11 @@ + + + + + @@ -61,6 +66,14 @@ + + + + + diff --git a/smack-core/src/main/java/org/jivesoftware/smack/AbstractXMPPConnection.java b/smack-core/src/main/java/org/jivesoftware/smack/AbstractXMPPConnection.java index 69026be48..e82ad15aa 100644 --- a/smack-core/src/main/java/org/jivesoftware/smack/AbstractXMPPConnection.java +++ b/smack-core/src/main/java/org/jivesoftware/smack/AbstractXMPPConnection.java @@ -41,6 +41,7 @@ import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CopyOnWriteArraySet; @@ -343,6 +344,17 @@ public abstract class AbstractXMPPConnection implements XMPPConnection { protected final AsyncButOrdered inOrderListeners = new AsyncButOrdered<>(); + /** + * An executor which uses {@link #asyncGoLimited(Runnable)} to limit the number of asynchronously processed runnables + * per connection. + */ + private final Executor limitedExcutor = new Executor() { + @Override + public void execute(Runnable runnable) { + asyncGoLimited(runnable); + } + }; + /** * The used host to establish the connection to */ @@ -1336,7 +1348,7 @@ public abstract class AbstractXMPPConnection implements XMPPConnection { executorService = ASYNC_BUT_ORDERED.asExecutorFor(this); break; case async: - executorService = CACHED_EXECUTOR_SERVICE; + executorService = limitedExcutor; break; } final IQRequestHandler finalIqRequestHandler = iqRequestHandler; @@ -1379,7 +1391,7 @@ public abstract class AbstractXMPPConnection implements XMPPConnection { final Collection listenersToNotify = new LinkedList<>(); extractMatchingListeners(packet, asyncRecvListeners, listenersToNotify); for (final StanzaListener listener : listenersToNotify) { - asyncGo(new Runnable() { + asyncGoLimited(new Runnable() { @Override public void run() { try { @@ -1875,6 +1887,75 @@ public abstract class AbstractXMPPConnection implements XMPPConnection { return getClass().getSimpleName() + '[' + localEndpointString + "] (" + getConnectionCounter() + ')'; } + /** + * A queue of deferred runnables that where not executed immediately because {@link #currentAsyncRunnables} reached + * {@link #maxAsyncRunnables}. Note that we use a {@code LinkedList} in order to avoid space blowups in case the + * list ever becomes very big and shrinks again. + */ + private final Queue deferredAsyncRunnables = new LinkedList<>(); + + private int deferredAsyncRunnablesCount; + + private int deferredAsyncRunnablesCountPrevious; + + private int maxAsyncRunnables = SmackConfiguration.getDefaultConcurrencyLevelLimit(); + + private int currentAsyncRunnables; + + protected void asyncGoLimited(final Runnable runnable) { + Runnable wrappedRunnable = new Runnable() { + @Override + public void run() { + runnable.run(); + + synchronized (deferredAsyncRunnables) { + Runnable defferredRunnable = deferredAsyncRunnables.poll(); + if (defferredRunnable == null) { + currentAsyncRunnables--; + } else { + deferredAsyncRunnablesCount--; + asyncGo(defferredRunnable); + } + } + } + }; + + synchronized (deferredAsyncRunnables) { + if (currentAsyncRunnables < maxAsyncRunnables) { + currentAsyncRunnables++; + asyncGo(wrappedRunnable); + } else { + deferredAsyncRunnablesCount++; + deferredAsyncRunnables.add(wrappedRunnable); + } + + final int HIGH_WATERMARK = 100; + final int INFORM_WATERMARK = 20; + + final int deferredAsyncRunnablesCount = this.deferredAsyncRunnablesCount; + + if (deferredAsyncRunnablesCount >= HIGH_WATERMARK + && deferredAsyncRunnablesCountPrevious < HIGH_WATERMARK) { + LOGGER.log(Level.WARNING, "High watermark of " + HIGH_WATERMARK + " simultaneous executing runnables reached"); + } else if (deferredAsyncRunnablesCount >= INFORM_WATERMARK + && deferredAsyncRunnablesCountPrevious < INFORM_WATERMARK) { + LOGGER.log(Level.INFO, INFORM_WATERMARK + " simultaneous executing runnables reached"); + } + + deferredAsyncRunnablesCountPrevious = deferredAsyncRunnablesCount; + } + } + + public void setMaxAsyncOperations(int maxAsyncOperations) { + if (maxAsyncOperations < 1) { + throw new IllegalArgumentException("Max async operations must be greater than 0"); + } + + synchronized (deferredAsyncRunnables) { + maxAsyncRunnables = maxAsyncOperations; + } + } + protected static void asyncGo(Runnable runnable) { CACHED_EXECUTOR_SERVICE.execute(runnable); } diff --git a/smack-core/src/main/java/org/jivesoftware/smack/AsyncButOrdered.java b/smack-core/src/main/java/org/jivesoftware/smack/AsyncButOrdered.java index 00fb94549..6fb6244ab 100644 --- a/smack-core/src/main/java/org/jivesoftware/smack/AsyncButOrdered.java +++ b/smack-core/src/main/java/org/jivesoftware/smack/AsyncButOrdered.java @@ -55,6 +55,16 @@ public class AsyncButOrdered { private final Map threadActiveMap = new WeakHashMap<>(); + private final Executor executor; + + public AsyncButOrdered() { + this(null); + } + + public AsyncButOrdered(Executor executor) { + this.executor = executor; + } + /** * Invoke the given {@link Runnable} asynchronous but ordered in respect to the given key. * @@ -86,7 +96,11 @@ public class AsyncButOrdered { if (newHandler) { Handler handler = new Handler(keyQueue, key); threadActiveMap.put(key, true); - AbstractXMPPConnection.asyncGo(handler); + if (executor == null) { + AbstractXMPPConnection.asyncGo(handler); + } else { + executor.execute(handler); + } } } diff --git a/smack-core/src/main/java/org/jivesoftware/smack/SmackConfiguration.java b/smack-core/src/main/java/org/jivesoftware/smack/SmackConfiguration.java index 01f2995a4..10695df47 100644 --- a/smack-core/src/main/java/org/jivesoftware/smack/SmackConfiguration.java +++ b/smack-core/src/main/java/org/jivesoftware/smack/SmackConfiguration.java @@ -365,4 +365,19 @@ public final class SmackConfiguration { public static void setUnknownIqRequestReplyMode(UnknownIqRequestReplyMode unknownIqRequestReplyMode) { SmackConfiguration.unknownIqRequestReplyMode = Objects.requireNonNull(unknownIqRequestReplyMode, "Must set mode"); } + + private static final int defaultConcurrencyLevelLimit; + + static { + int availableProcessors = Runtime.getRuntime().availableProcessors(); + if (availableProcessors < 8) { + defaultConcurrencyLevelLimit = 8; + } else { + defaultConcurrencyLevelLimit = (int) (availableProcessors * 1.1); + } + } + + public static int getDefaultConcurrencyLevelLimit() { + return defaultConcurrencyLevelLimit; + } } diff --git a/smack-experimental/src/main/java/org/jivesoftware/smackx/jingle_filetransfer/JingleFileTransferManager.java b/smack-experimental/src/main/java/org/jivesoftware/smackx/jingle_filetransfer/JingleFileTransferManager.java index 7b96cdbd4..a50ea0519 100644 --- a/smack-experimental/src/main/java/org/jivesoftware/smackx/jingle_filetransfer/JingleFileTransferManager.java +++ b/smack-experimental/src/main/java/org/jivesoftware/smackx/jingle_filetransfer/JingleFileTransferManager.java @@ -32,7 +32,7 @@ public final class JingleFileTransferManager extends Manager { super(connection); } - public static JingleFileTransferManager getInstanceFor(XMPPConnection connection) { + public static synchronized JingleFileTransferManager getInstanceFor(XMPPConnection connection) { JingleFileTransferManager manager = INSTANCES.get(connection); if (manager == null) { manager = new JingleFileTransferManager(connection); diff --git a/smack-experimental/src/main/java/org/jivesoftware/smackx/mam/MamManager.java b/smack-experimental/src/main/java/org/jivesoftware/smackx/mam/MamManager.java index 3e3e52a22..dd7db3426 100644 --- a/smack-experimental/src/main/java/org/jivesoftware/smackx/mam/MamManager.java +++ b/smack-experimental/src/main/java/org/jivesoftware/smackx/mam/MamManager.java @@ -178,7 +178,9 @@ public final class MamManager extends Manager { * @param connection the XMPP connection to get the archive for. * @return the instance of MamManager. */ + // CHECKSTYLE:OFF:RegexpSingleline public static MamManager getInstanceFor(XMPPConnection connection) { + // CHECKSTYLE:ON:RegexpSingleline return getInstanceFor(connection, (Jid) null); } diff --git a/smack-experimental/src/main/java/org/jivesoftware/smackx/reference/ReferenceManager.java b/smack-experimental/src/main/java/org/jivesoftware/smackx/reference/ReferenceManager.java index d3bdfce12..b588aa5e3 100644 --- a/smack-experimental/src/main/java/org/jivesoftware/smackx/reference/ReferenceManager.java +++ b/smack-experimental/src/main/java/org/jivesoftware/smackx/reference/ReferenceManager.java @@ -51,7 +51,7 @@ public final class ReferenceManager extends Manager { * @param connection xmpp connection * @return reference manager instance */ - public static ReferenceManager getInstanceFor(XMPPConnection connection) { + public static synchronized ReferenceManager getInstanceFor(XMPPConnection connection) { ReferenceManager manager = INSTANCES.get(connection); if (manager == null) { manager = new ReferenceManager(connection); diff --git a/smack-experimental/src/main/java/org/jivesoftware/smackx/sid/StableUniqueStanzaIdManager.java b/smack-experimental/src/main/java/org/jivesoftware/smackx/sid/StableUniqueStanzaIdManager.java index 99ec03f5d..a9fd4fcfa 100644 --- a/smack-experimental/src/main/java/org/jivesoftware/smackx/sid/StableUniqueStanzaIdManager.java +++ b/smack-experimental/src/main/java/org/jivesoftware/smackx/sid/StableUniqueStanzaIdManager.java @@ -78,7 +78,7 @@ public final class StableUniqueStanzaIdManager extends Manager { * @param connection xmpp-connection * @return manager instance for the connection */ - public static StableUniqueStanzaIdManager getInstanceFor(XMPPConnection connection) { + public static synchronized StableUniqueStanzaIdManager getInstanceFor(XMPPConnection connection) { StableUniqueStanzaIdManager manager = INSTANCES.get(connection); if (manager == null) { manager = new StableUniqueStanzaIdManager(connection); diff --git a/smack-experimental/src/main/java/org/jivesoftware/smackx/spoiler/SpoilerManager.java b/smack-experimental/src/main/java/org/jivesoftware/smackx/spoiler/SpoilerManager.java index f3bb80c85..fa5c8680b 100644 --- a/smack-experimental/src/main/java/org/jivesoftware/smackx/spoiler/SpoilerManager.java +++ b/smack-experimental/src/main/java/org/jivesoftware/smackx/spoiler/SpoilerManager.java @@ -61,7 +61,7 @@ public final class SpoilerManager extends Manager { * @param connection xmpp connection * @return SpoilerManager */ - public static SpoilerManager getInstanceFor(XMPPConnection connection) { + public static synchronized SpoilerManager getInstanceFor(XMPPConnection connection) { SpoilerManager manager = INSTANCES.get(connection); if (manager == null) { manager = new SpoilerManager(connection); diff --git a/smack-extensions/src/main/java/org/jivesoftware/smackx/disco/ServiceDiscoveryManager.java b/smack-extensions/src/main/java/org/jivesoftware/smackx/disco/ServiceDiscoveryManager.java index 674e5ca9a..38b965577 100644 --- a/smack-extensions/src/main/java/org/jivesoftware/smackx/disco/ServiceDiscoveryManager.java +++ b/smack-extensions/src/main/java/org/jivesoftware/smackx/disco/ServiceDiscoveryManager.java @@ -1,6 +1,6 @@ /** * - * Copyright 2003-2007 Jive Software, 2018 Florian Schmaus. + * Copyright 2003-2007 Jive Software, 2018-2019 Florian Schmaus. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -227,7 +227,7 @@ public final class ServiceDiscoveryManager extends Manager { /** * Returns the type of client that will be returned when asked for the client identity in a * disco request. The valid types are defined by the category client. Follow this link to learn - * the possible types: Jabber::Registrar. + * the possible types: XMPP Registry for Service Discovery Identities * * @return the type of client that will be returned when asked for the client identity in a * disco request. @@ -271,8 +271,8 @@ public final class ServiceDiscoveryManager extends Manager { */ public Set getIdentities() { Set res = new HashSet<>(identities); - // Add the default identity that must exist - res.add(defaultIdentity); + // Add the main identity that must exist + res.add(identity); return Collections.unmodifiableSet(res); } diff --git a/smack-extensions/src/main/java/org/jivesoftware/smackx/disco/packet/DiscoverInfo.java b/smack-extensions/src/main/java/org/jivesoftware/smackx/disco/packet/DiscoverInfo.java index 1f02a0edb..c31a45aad 100644 --- a/smack-extensions/src/main/java/org/jivesoftware/smackx/disco/packet/DiscoverInfo.java +++ b/smack-extensions/src/main/java/org/jivesoftware/smackx/disco/packet/DiscoverInfo.java @@ -263,7 +263,7 @@ public class DiscoverInfo extends IQ implements TypedCloneable { * Represents the identity of a given XMPP entity. An entity may have many identities but all * the identities SHOULD have the same name.

* - * Refer to Jabber::Registrar + * Refer to XMPP Registry for Service Discovery Identities * in order to get the official registry of values for the category and type * attributes. * @@ -327,7 +327,7 @@ public class DiscoverInfo extends IQ implements TypedCloneable { /** * Returns the entity's category. To get the official registry of values for the - * 'category' attribute refer to Jabber::Registrar + * 'category' attribute refer to XMPP Registry for Service Discovery Identities. * * @return the entity's category. */ @@ -346,7 +346,7 @@ public class DiscoverInfo extends IQ implements TypedCloneable { /** * Returns the entity's type. To get the official registry of values for the - * 'type' attribute refer to Jabber::Registrar + * 'type' attribute refer to XMPP Registry for Service Discovery Identities. * * @return the entity's type. */ diff --git a/smack-extensions/src/main/java/org/jivesoftware/smackx/jingle/JingleTransportMethodManager.java b/smack-extensions/src/main/java/org/jivesoftware/smackx/jingle/JingleTransportMethodManager.java index 848dc1671..9a9941a07 100644 --- a/smack-extensions/src/main/java/org/jivesoftware/smackx/jingle/JingleTransportMethodManager.java +++ b/smack-extensions/src/main/java/org/jivesoftware/smackx/jingle/JingleTransportMethodManager.java @@ -48,7 +48,7 @@ public final class JingleTransportMethodManager extends Manager { super(connection); } - public static JingleTransportMethodManager getInstanceFor(XMPPConnection connection) { + public static synchronized JingleTransportMethodManager getInstanceFor(XMPPConnection connection) { JingleTransportMethodManager manager = INSTANCES.get(connection); if (manager == null) { manager = new JingleTransportMethodManager(connection); diff --git a/smack-extensions/src/main/java/org/jivesoftware/smackx/jingle/transports/jingle_ibb/JingleIBBTransportManager.java b/smack-extensions/src/main/java/org/jivesoftware/smackx/jingle/transports/jingle_ibb/JingleIBBTransportManager.java index 500a7b0a8..63dada1ae 100644 --- a/smack-extensions/src/main/java/org/jivesoftware/smackx/jingle/transports/jingle_ibb/JingleIBBTransportManager.java +++ b/smack-extensions/src/main/java/org/jivesoftware/smackx/jingle/transports/jingle_ibb/JingleIBBTransportManager.java @@ -38,7 +38,7 @@ public final class JingleIBBTransportManager extends JingleTransportManager