mirror of
https://github.com/vanitasvitae/Smack.git
synced 2024-11-22 12:02:05 +01:00
Assure stanzaIdAcknowledgedListeners are removed
after at most 12 hours. Also set a keep alive time for the removeCallbacksService to 30 seconds and add AbstractXMPPConnection.schedule(Runnable, long, TimeUnit).
This commit is contained in:
parent
6209d75536
commit
106512d8d4
2 changed files with 16 additions and 3 deletions
|
@ -32,7 +32,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
|
||||||
import java.util.concurrent.CopyOnWriteArraySet;
|
import java.util.concurrent.CopyOnWriteArraySet;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.ScheduledExecutorService;
|
import java.util.concurrent.ScheduledFuture;
|
||||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||||
import java.util.concurrent.SynchronousQueue;
|
import java.util.concurrent.SynchronousQueue;
|
||||||
import java.util.concurrent.ThreadPoolExecutor;
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
|
@ -297,6 +297,7 @@ public abstract class AbstractXMPPConnection implements XMPPConnection {
|
||||||
*/
|
*/
|
||||||
protected AbstractXMPPConnection(ConnectionConfiguration configuration) {
|
protected AbstractXMPPConnection(ConnectionConfiguration configuration) {
|
||||||
config = configuration;
|
config = configuration;
|
||||||
|
removeCallbacksService.setKeepAliveTime(30, TimeUnit.SECONDS);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected ConnectionConfiguration getConfiguration() {
|
protected ConnectionConfiguration getConfiguration() {
|
||||||
|
@ -1376,7 +1377,7 @@ public abstract class AbstractXMPPConnection implements XMPPConnection {
|
||||||
streamFeatures.put(key, feature);
|
streamFeatures.put(key, feature);
|
||||||
}
|
}
|
||||||
|
|
||||||
private final ScheduledExecutorService removeCallbacksService = new ScheduledThreadPoolExecutor(1,
|
private final ScheduledThreadPoolExecutor removeCallbacksService = new ScheduledThreadPoolExecutor(1,
|
||||||
new SmackExecutorThreadFactory(connectionCounterValue, "Remove Callbacks"));
|
new SmackExecutorThreadFactory(connectionCounterValue, "Remove Callbacks"));
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -1550,4 +1551,7 @@ public abstract class AbstractXMPPConnection implements XMPPConnection {
|
||||||
cachedExecutorService.execute(runnable);
|
cachedExecutorService.execute(runnable);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected final ScheduledFuture<?> schedule(Runnable runnable, long delay, TimeUnit unit) {
|
||||||
|
return removeCallbacksService.schedule(runnable, delay, unit);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -121,6 +121,7 @@ import java.util.concurrent.ArrayBlockingQueue;
|
||||||
import java.util.concurrent.BlockingQueue;
|
import java.util.concurrent.BlockingQueue;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.logging.Level;
|
import java.util.logging.Level;
|
||||||
import java.util.logging.Logger;
|
import java.util.logging.Logger;
|
||||||
|
|
||||||
|
@ -1534,11 +1535,19 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
|
||||||
* @return the previous listener for this stanza ID or null.
|
* @return the previous listener for this stanza ID or null.
|
||||||
* @throws StreamManagementNotEnabledException if Stream Management is not enabled.
|
* @throws StreamManagementNotEnabledException if Stream Management is not enabled.
|
||||||
*/
|
*/
|
||||||
public PacketListener addStanzaIdAcknowledgedListener(String id, PacketListener listener) throws StreamManagementNotEnabledException {
|
public PacketListener addStanzaIdAcknowledgedListener(final String id, PacketListener listener) throws StreamManagementNotEnabledException {
|
||||||
// Prevent users from adding callbacks that will never get removed
|
// Prevent users from adding callbacks that will never get removed
|
||||||
if (!smWasEnabledAtLeastOnce) {
|
if (!smWasEnabledAtLeastOnce) {
|
||||||
throw new StreamManagementException.StreamManagementNotEnabledException();
|
throw new StreamManagementException.StreamManagementNotEnabledException();
|
||||||
}
|
}
|
||||||
|
// Remove the listener after max. 12 hours
|
||||||
|
final int removeAfterSeconds = Math.min(getMaxSmResumptionTime() + 60, 12 * 60 * 60);
|
||||||
|
schedule(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
stanzaIdAcknowledgedListeners.remove(id);
|
||||||
|
}
|
||||||
|
}, removeAfterSeconds, TimeUnit.SECONDS);
|
||||||
return stanzaIdAcknowledgedListeners.put(id, listener);
|
return stanzaIdAcknowledgedListeners.put(id, listener);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue