From 5c6f257027d2f1cfe516ffdaec313e47d1a1d9e0 Mon Sep 17 00:00:00 2001 From: Florian Schmaus Date: Wed, 27 Feb 2013 22:49:04 +0000 Subject: [PATCH] SMACK-388 Use ScheduledExecutorService. Set ping received when pinging another entity. Refactored PacketListener and ConnectionListeners as anonymous inner-classes git-svn-id: http://svn.igniterealtime.org/svn/repos/smack/trunk@13531 b35dd754-fafc-0310-a699-88a17e54d16e --- build/resources/META-INF/smack-config.xml | 4 +- .../smack/SmackConfiguration.java | 17 +- .../jivesoftware/smackx/ping/PingManager.java | 191 +++++++++--------- .../smackx/ping/ServerPingTask.java | 117 ++++------- 4 files changed, 153 insertions(+), 176 deletions(-) diff --git a/build/resources/META-INF/smack-config.xml b/build/resources/META-INF/smack-config.xml index 59a16fc78..c4e99936a 100644 --- a/build/resources/META-INF/smack-config.xml +++ b/build/resources/META-INF/smack-config.xml @@ -31,7 +31,7 @@ 10000 - - 1800000 + + 1800 diff --git a/source/org/jivesoftware/smack/SmackConfiguration.java b/source/org/jivesoftware/smack/SmackConfiguration.java index cf68c75b2..83f8d22f3 100644 --- a/source/org/jivesoftware/smack/SmackConfiguration.java +++ b/source/org/jivesoftware/smack/SmackConfiguration.java @@ -58,7 +58,10 @@ public final class SmackConfiguration { private static int localSocks5ProxyPort = 7777; private static int packetCollectorSize = 5000; - private static int defaultPingInterval = 1800000; // 30 min (30*60*1000) + /** + * defaultPingInterval (in seconds) + */ + private static int defaultPingInterval = 1800; // 30 min (30*60) private SmackConfiguration() { } @@ -109,7 +112,7 @@ public final class SmackConfiguration { else if (parser.getName().equals("packetCollectorSize")) { packetCollectorSize = parseIntProperty(parser, packetCollectorSize); } - else if (parser.getName().equals("defaultPingInterval")) { + else if (parser.getName().equals("defaultPingInterval")) { defaultPingInterval = parseIntProperty(parser, defaultPingInterval); } } @@ -308,10 +311,20 @@ public final class SmackConfiguration { SmackConfiguration.localSocks5ProxyPort = localSocks5ProxyPort; } + /** + * Returns the default ping interval (seconds) + * + * @return + */ public static int getDefaultPingInterval() { return defaultPingInterval; } + /** + * Sets the default ping interval (seconds). Set it to '-1' to disable the periodic ping + * + * @param defaultPingInterval + */ public static void setDefaultPingInterval(int defaultPingInterval) { SmackConfiguration.defaultPingInterval = defaultPingInterval; } diff --git a/source/org/jivesoftware/smackx/ping/PingManager.java b/source/org/jivesoftware/smackx/ping/PingManager.java index 49497e6cc..3ee79a3f5 100644 --- a/source/org/jivesoftware/smackx/ping/PingManager.java +++ b/source/org/jivesoftware/smackx/ping/PingManager.java @@ -21,10 +21,14 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.WeakHashMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; -import org.jivesoftware.smack.AbstractConnectionListener; import org.jivesoftware.smack.Connection; import org.jivesoftware.smack.ConnectionCreationListener; +import org.jivesoftware.smack.ConnectionListener; import org.jivesoftware.smack.PacketCollector; import org.jivesoftware.smack.PacketListener; import org.jivesoftware.smack.SmackConfiguration; @@ -51,13 +55,14 @@ import org.jivesoftware.smackx.ping.packet.Pong; * Ping */ public class PingManager { - + public static final String NAMESPACE = "urn:xmpp:ping"; public static final String ELEMENT = "ping"; - + + private static Map instances = Collections.synchronizedMap(new WeakHashMap()); - + static { Connection.addConnectionCreationListener(new ConnectionCreationListener() { public void connectionCreated(Connection connection) { @@ -65,49 +70,96 @@ public class PingManager { } }); } - + + private ScheduledExecutorService periodicPingExecutorService; private Connection connection; - private Thread serverPingThread; - private ServerPingTask serverPingTask; private int pingInterval = SmackConfiguration.getDefaultPingInterval(); private Set pingFailedListeners = Collections .synchronizedSet(new HashSet()); - + private ScheduledFuture periodicPingTask; + protected volatile long lastSuccessfulPingByTask = -1; + + // Ping Flood protection private long pingMinDelta = 100; private long lastPingStamp = 0; // timestamp of the last received ping - - // Last server pong timestamp if a ping request manually - private long lastServerPingStamp = -1; - + + // Timestamp of the last pong received, either from the server or another entity + // Note, no need to synchronize this value, it will only increase over time + private long lastSuccessfulManualPing = -1; + private PingManager(Connection connection) { ServiceDiscoveryManager sdm = ServiceDiscoveryManager.getInstanceFor(connection); sdm.addFeature(NAMESPACE); this.connection = connection; - PacketFilter pingPacketFilter = new PacketTypeFilter(Ping.class); - connection.addPacketListener(new PingPacketListener(), pingPacketFilter); - connection.addConnectionListener(new PingConnectionListener()); - instances.put(connection, this); - maybeStartPingServerTask(); + init(); } - + + private void init() { + periodicPingExecutorService = new ScheduledThreadPoolExecutor(1); + PacketFilter pingPacketFilter = new PacketTypeFilter(Ping.class); + connection.addPacketListener(new PacketListener() { + /** + * Sends a Pong for every Ping + */ + public void processPacket(Packet packet) { + if (pingMinDelta > 0) { + // Ping flood protection enabled + long currentMillies = System.currentTimeMillis(); + long delta = currentMillies - lastPingStamp; + lastPingStamp = currentMillies; + if (delta < pingMinDelta) { + return; + } + } + Pong pong = new Pong((Ping)packet); + connection.sendPacket(pong); + } + } + , pingPacketFilter); + connection.addConnectionListener(new ConnectionListener() { + + @Override + public void connectionClosed() { + maybeStopPingServerTask(); + } + + @Override + public void connectionClosedOnError(Exception arg0) { + maybeStopPingServerTask(); + } + + @Override + public void reconnectionSuccessful() { + maybeSchedulePingServerTask(); + } + + @Override + public void reconnectingIn(int seconds) { + } + + @Override + public void reconnectionFailed(Exception e) { + } + }); + instances.put(connection, this); + maybeSchedulePingServerTask(); + } + public static PingManager getInstanceFor(Connection connection) { PingManager pingManager = instances.get(connection); - + if (pingManager == null) { pingManager = new PingManager(connection); } - + return pingManager; } - + public void setPingIntervall(int pingIntervall) { this.pingInterval = pingIntervall; - if (serverPingTask != null) { - serverPingTask.setPingInterval(pingIntervall); - } } - + public int getPingIntervall() { return pingInterval; } @@ -189,11 +241,11 @@ public class PingManager { */ public boolean pingEntity(String jid, long pingTimeout) { IQ result = ping(jid, pingTimeout); - - if (result == null - || result.getType() == IQ.Type.ERROR) { + + if (result == null || result.getType() == IQ.Type.ERROR) { return false; - } + } + pongReceived(); return true; } @@ -218,7 +270,8 @@ public class PingManager { } return false; } - lastServerPingStamp = System.currentTimeMillis(); + // Maybe not really a pong, but an answer is an answer + pongReceived(); return true; } @@ -257,80 +310,34 @@ public class PingManager { * @return */ public long getLastSuccessfulPing() { - long taskLastSuccessfulPing = -1; - if (serverPingTask != null) { - taskLastSuccessfulPing = serverPingTask.getLastSucessfulPing(); - } - return Math.max(taskLastSuccessfulPing, lastServerPingStamp); + return Math.max(lastSuccessfulPingByTask, lastSuccessfulManualPing); } protected Set getPingFailedListeners() { return pingFailedListeners; } - - private class PingConnectionListener extends AbstractConnectionListener { - @Override - public void connectionClosed() { - maybeStopPingServerTask(); - } - - @Override - public void connectionClosedOnError(Exception arg0) { - maybeStopPingServerTask(); - } - - @Override - public void reconnectionSuccessful() { - maybeStartPingServerTask(); - } - - } - - private void maybeStartPingServerTask() { - if (serverPingTask != null) { - serverPingTask.setDone(); - serverPingThread.interrupt(); - serverPingTask = null; - serverPingThread = null; - } - + /** + * Cancels any existing periodic ping task if there is one and schedules a new ping task if pingInterval is greater + * then zero. + * + */ + protected synchronized void maybeSchedulePingServerTask() { + maybeStopPingServerTask(); if (pingInterval > 0) { - serverPingTask = new ServerPingTask(connection, pingInterval); - serverPingThread = new Thread(serverPingTask); - serverPingThread.setDaemon(true); - serverPingThread.setName("Smack Ping Server Task (" + connection.getServiceName() + ")"); - serverPingThread.start(); + periodicPingTask = periodicPingExecutorService.schedule(new ServerPingTask(connection), pingInterval, + TimeUnit.SECONDS); } } - + private void maybeStopPingServerTask() { - if (serverPingThread != null) { - serverPingTask.setDone(); - serverPingThread.interrupt(); + if (periodicPingTask != null) { + periodicPingTask.cancel(true); + periodicPingTask = null; } } - private class PingPacketListener implements PacketListener { - - public PingPacketListener() { - } - - /** - * Sends a Pong for every Ping - */ - public void processPacket(Packet packet) { - if (pingMinDelta > 0) { - // Ping flood protection enabled - long currentMillies = System.currentTimeMillis(); - long delta = currentMillies - lastPingStamp; - lastPingStamp = currentMillies; - if (delta < pingMinDelta) { - return; - } - } - Pong pong = new Pong((Ping)packet); - connection.sendPacket(pong); - } + private void pongReceived() { + lastSuccessfulManualPing = System.currentTimeMillis(); } } diff --git a/source/org/jivesoftware/smackx/ping/ServerPingTask.java b/source/org/jivesoftware/smackx/ping/ServerPingTask.java index e350cd7d3..0901b8f15 100644 --- a/source/org/jivesoftware/smackx/ping/ServerPingTask.java +++ b/source/org/jivesoftware/smackx/ping/ServerPingTask.java @@ -1,5 +1,5 @@ /** - * Copyright 2012 Florian Schmaus + * Copyright 2012-2013 Florian Schmaus * * All rights reserved. Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,99 +22,56 @@ import java.util.Set; import org.jivesoftware.smack.Connection; class ServerPingTask implements Runnable { - + // This has to be a weak reference because IIRC all threads are roots // for objects and we have a new thread here that should hold a strong // reference to connection so that it can be GCed. private WeakReference weakConnection; - private int pingInterval; - private volatile long lastSuccessfulPing = -1; - + private int delta = 1000; // 1 seconds private int tries = 3; // 3 tries - - protected ServerPingTask(Connection connection, int pingIntervall) { + + protected ServerPingTask(Connection connection) { this.weakConnection = new WeakReference(connection); - this.pingInterval = pingIntervall; } - - protected void setDone() { - this.pingInterval = -1; - } - - protected void setPingInterval(int pingIntervall) { - this.pingInterval = pingIntervall; - } - - protected int getIntInterval() { - return pingInterval; - } - - protected long getLastSucessfulPing() { - return lastSuccessfulPing; - } - + public void run() { - sleep(60000); - - outerLoop: - while(pingInterval > 0) { - Connection connection = weakConnection.get(); - if (connection == null) { - // connection has been collected by GC - // which means we can stop the thread by breaking the loop - break; - } - if (connection.isAuthenticated()) { - PingManager pingManager = PingManager.getInstanceFor(connection); - boolean res = false; - - for(int i = 0; i < tries; i++) { - if (i != 0) { - try { - Thread.sleep(delta); - } catch (InterruptedException e) { - // We received an interrupt - // This only happens if we should stop pinging - break outerLoop; - } - } - res = pingManager.pingMyServer(); - // stop when we receive a pong back - if (res) { - lastSuccessfulPing = System.currentTimeMillis(); - break; + Connection connection = weakConnection.get(); + if (connection == null) { + // connection has been collected by GC + // which means we can stop the thread by breaking the loop + return; + } + if (connection.isAuthenticated()) { + PingManager pingManager = PingManager.getInstanceFor(connection); + boolean res = false; + + for (int i = 0; i < tries; i++) { + if (i != 0) { + try { + Thread.sleep(delta); + } catch (InterruptedException e) { + // We received an interrupt + // This only happens if we should stop pinging + return; } } - if (!res) { - Set pingFailedListeners = pingManager.getPingFailedListeners(); - for (PingFailedListener l : pingFailedListeners) { - l.pingFailed(); - } + res = pingManager.pingMyServer(); + // stop when we receive a pong back + if (res) { + pingManager.lastSuccessfulPingByTask = System.currentTimeMillis(); + break; } } - sleep(); - } - } - - /* - * If pingInterval > 0 sleeps a minimum of pingInterval - */ - private void sleep(int extraSleepTime) { - int totalSleep = pingInterval + extraSleepTime; - if (totalSleep > 0) { - try { - Thread.sleep(totalSleep); - } catch (InterruptedException e) { - /* Ignore */ + if (!res) { + Set pingFailedListeners = pingManager.getPingFailedListeners(); + for (PingFailedListener l : pingFailedListeners) { + l.pingFailed(); + } + } else { + // Ping was successful, wind-up the periodic task again + pingManager.maybeSchedulePingServerTask(); } } } - - /** - * Sleeps the amount of pingInterval - */ - private void sleep() { - sleep(0); - } }