From 3a4e6c6d3905a6955c142c5a6278435fef6d9c7f Mon Sep 17 00:00:00 2001 From: Florian Schmaus Date: Sun, 23 Feb 2014 21:36:57 +0100 Subject: [PATCH] Add automatically scheduled pings in PingManager replaces keep-alive functionality. SMACK-537 --- .../org/jivesoftware/smack/Connection.java | 10 + .../jivesoftware/smackx/ping/PingManager.java | 249 +++++++++++++++--- .../jivesoftware/smackx/ping/packet/Ping.java | 5 +- .../jivesoftware/smackx/ping/packet/Pong.java | 45 ++++ .../jivesoftware/smackx/ping/PingTest.java | 4 +- 5 files changed, 273 insertions(+), 40 deletions(-) create mode 100644 extensions/src/main/java/org/jivesoftware/smackx/ping/packet/Pong.java diff --git a/core/src/main/java/org/jivesoftware/smack/Connection.java b/core/src/main/java/org/jivesoftware/smack/Connection.java index f8b12e663..944729984 100644 --- a/core/src/main/java/org/jivesoftware/smack/Connection.java +++ b/core/src/main/java/org/jivesoftware/smack/Connection.java @@ -30,6 +30,10 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Logger; @@ -222,6 +226,8 @@ public abstract class Connection { protected XMPPInputOutputStream compressionHandler; + private final ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(2); + /** * Create a new Connection to a XMPP server. * @@ -951,4 +957,8 @@ public abstract class Connection { } } } + + public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { + return executorService.schedule(command, delay, unit); + } } diff --git a/extensions/src/main/java/org/jivesoftware/smackx/ping/PingManager.java b/extensions/src/main/java/org/jivesoftware/smackx/ping/PingManager.java index 5d71f04af..7351c666a 100644 --- a/extensions/src/main/java/org/jivesoftware/smackx/ping/PingManager.java +++ b/extensions/src/main/java/org/jivesoftware/smackx/ping/PingManager.java @@ -1,6 +1,6 @@ /** * - * Copyright 2012-2013 Florian Schmaus + * Copyright 2012-2014 Florian Schmaus * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,40 +18,51 @@ package org.jivesoftware.smackx.ping; import java.lang.ref.WeakReference; import java.util.Collections; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import java.util.WeakHashMap; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.logging.Logger; import org.jivesoftware.smack.Connection; import org.jivesoftware.smack.ConnectionCreationListener; +import org.jivesoftware.smack.ConnectionListener; import org.jivesoftware.smack.PacketListener; -import org.jivesoftware.smack.SmackConfiguration; import org.jivesoftware.smack.SmackError; import org.jivesoftware.smack.XMPPException; import org.jivesoftware.smack.filter.AndFilter; import org.jivesoftware.smack.filter.IQTypeFilter; import org.jivesoftware.smack.filter.PacketFilter; import org.jivesoftware.smack.filter.PacketTypeFilter; -import org.jivesoftware.smack.packet.IQ; import org.jivesoftware.smack.packet.Packet; import org.jivesoftware.smack.packet.IQ.Type; import org.jivesoftware.smackx.disco.ServiceDiscoveryManager; import org.jivesoftware.smackx.disco.packet.DiscoverInfo; import org.jivesoftware.smackx.ping.packet.Ping; +import org.jivesoftware.smackx.ping.packet.Pong; /** - * Implements the XMPP Ping as defined by XEP-0199. The XMPP Ping protocol - * allows one entity to 'ping' any other entity by simply sending a ping to - * the appropriate JID. - *

+ * Implements the XMPP Ping as defined by XEP-0199. The XMPP Ping protocol allows one entity to + * ping any other entity by simply sending a ping to the appropriate JID. PingManger also + * periodically sends XMPP pings to the server every 30 minutes to avoid NAT timeouts and to test + * the connection status. * * @author Florian Schmaus - * @see XEP-0199:XMPP - * Ping + * @see XEP-0199:XMPP Ping */ public class PingManager { - private static Map instances = Collections + public static final String NAMESPACE = "urn:xmpp:ping"; + + private static final Logger LOGGER = Logger.getLogger(PingManager.class.getName()); + + private static final Map INSTANCES = Collections .synchronizedMap(new WeakHashMap()); - + + private static final PacketFilter PING_PACKET_FILTER = new AndFilter( + new PacketTypeFilter(Ping.class), new IQTypeFilter(Type.GET)); + static { Connection.addConnectionCreationListener(new ConnectionCreationListener() { public void connectionCreated(Connection connection) { @@ -60,8 +71,6 @@ public class PingManager { }); } - private WeakReference weakRefConnection; - /** * Retrieves a {@link PingManager} for the specified {@link Connection}, creating one if it doesn't already * exist. @@ -71,36 +80,80 @@ public class PingManager { * @return The new or existing manager. */ public synchronized static PingManager getInstanceFor(Connection connection) { - PingManager pingManager = instances.get(connection); - + PingManager pingManager = INSTANCES.get(connection); if (pingManager == null) { pingManager = new PingManager(connection); } return pingManager; } + private static int defaultPingInterval = 60 * 30; + + /** + * Set the default ping interval which will be used for new connections. + * + * @param interval the interval in seconds + */ + public static void setDefaultPingInterval(int interval) { + defaultPingInterval = interval; + } + + private final Set pingFailedListeners = Collections + .synchronizedSet(new HashSet()); + + /** + * The interval in seconds between pings are send to the users server. + */ + private int pingInterval = defaultPingInterval; + + /** + * The time in milliseconds the last successful ping was send to the users server. + */ + private volatile long lastSuccessfulAutomaticPing = -1; + + private ScheduledFuture nextAutomaticPing; + + /** + * The time in milliseconds the last manual ping as successful. + */ + private long lastSuccessfulManualPing = -1; + + private WeakReference weakRefConnection; + private PingManager(Connection connection) { weakRefConnection = new WeakReference(connection); ServiceDiscoveryManager sdm = ServiceDiscoveryManager.getInstanceFor(connection); - - // The ServiceDiscoveryManager was not pre-initialized - if (sdm == null) - sdm = ServiceDiscoveryManager.getInstanceFor(connection); - - sdm.addFeature(Ping.NAMESPACE); - - PacketFilter pingPacketFilter = new AndFilter(new PacketTypeFilter(Ping.class), new IQTypeFilter(Type.GET)); - + sdm.addFeature(PingManager.NAMESPACE); + INSTANCES.put(connection, this); + connection.addPacketListener(new PacketListener() { - /** - * Sends a Pong for every Ping - */ + // Send a Pong for every Ping + @Override public void processPacket(Packet packet) { Connection connection = weakRefConnection.get(); - IQ pong = IQ.createResultIQ((Ping) packet); + Pong pong = new Pong(packet); connection.sendPacket(pong); } - }, pingPacketFilter); + }, PING_PACKET_FILTER); + connection.addConnectionListener(new ConnectionListener() { + @Override + public void connectionClosed() { + maybeStopPingServerTask(); + } + @Override + public void connectionClosedOnError(Exception arg0) { + maybeStopPingServerTask(); + } + @Override + public void reconnectionSuccessful() { + maybeSchedulePingServerTask(); + } + @Override + public void reconnectingIn(int seconds) {} + @Override + public void reconnectionFailed(Exception e) {} + }); + maybeSchedulePingServerTask(); } /** @@ -122,12 +175,11 @@ public class PingManager { connection.createPacketCollectorAndSend(ping).nextResultOrThrow(); } catch (XMPPException exc) { - return (jid.equals(connection.getServiceName()) && (exc.getSmackError() != SmackError.NO_RESPONSE_FROM_SERVER)); } return true; } - + /** * Same as calling {@link #ping(String, long)} with the defaultpacket reply * timeout. @@ -136,9 +188,10 @@ public class PingManager { * @return true if a reply was received from the entity, false otherwise. */ public boolean ping(String jid) { - return ping(jid, SmackConfiguration.getDefaultPacketReplyTimeout()); + Connection connection = weakRefConnection.get(); + return ping(jid, connection.getPacketReplyTimeout()); } - + /** * Query the specified entity to see if it supports the Ping protocol (XEP-0199) * @@ -149,7 +202,7 @@ public class PingManager { public boolean isPingSupported(String jid) throws XMPPException { Connection connection = weakRefConnection.get(); DiscoverInfo result = ServiceDiscoveryManager.getInstanceFor(connection).discoverInfo(jid); - return result.containsFeature(Ping.NAMESPACE); + return result.containsFeature(PingManager.NAMESPACE); } /** @@ -163,6 +216,132 @@ public class PingManager { */ public boolean pingMyServer() { Connection connection = weakRefConnection.get(); - return ping(connection.getServiceName()); + boolean res = ping(connection.getServiceName()); + if (!res) { + for (PingFailedListener l : pingFailedListeners) + l.pingFailed(); + } else { + pongReceived(); + } + return res; } + + /** + * Set the interval between the server is automatic pinged. A negative value disables automatic server pings. + * + * @param pingInterval the interval between the ping + */ + public void setPingInterval(int pingInterval) { + this.pingInterval = pingInterval; + maybeSchedulePingServerTask(); + } + + /** + * Get the current ping interval. + * + * @return the interval between pings in seconds + */ + public int getPingInterval() { + return pingInterval; + } + + /** + * Register a new PingFailedListener + * + * @param listener the listener to invoke + */ + public void registerPingFailedListener(PingFailedListener listener) { + pingFailedListeners.add(listener); + } + + /** + * Unregister a PingFailedListener + * + * @param listener the listener to remove + */ + public void unregisterPingFailedListener(PingFailedListener listener) { + pingFailedListeners.remove(listener); + } + + /** + * Returns the time of the last successful Ping with the + * users server. If there was no successful Ping (e.g. because this + * feature is disabled) -1 will be returned. + * + * @return + */ + public long getLastSuccessfulPing() { + return Math.max(lastSuccessfulAutomaticPing, lastSuccessfulManualPing); + } + + /** + * Cancels any existing periodic ping task if there is one and schedules a new ping task if + * pingInterval is greater then zero. + */ + private synchronized void maybeSchedulePingServerTask() { + maybeStopPingServerTask(); + if (pingInterval > 0) { + LOGGER.fine("Scheduling ServerPingTask in " + pingInterval + " seconds"); + Connection connection = weakRefConnection.get(); + nextAutomaticPing = connection.schedule(pingServerRunnable, pingInterval, TimeUnit.SECONDS); + } + } + + private void maybeStopPingServerTask() { + if (nextAutomaticPing != null) { + nextAutomaticPing.cancel(true); + nextAutomaticPing = null; + } + } + + private void pongReceived() { + lastSuccessfulManualPing = System.currentTimeMillis(); + } + + private final Runnable pingServerRunnable = new Runnable() { + private static final int DELTA = 1000; // 1 seconds + private static final int TRIES = 3; // 3 tries + + public void run() { + LOGGER.fine("ServerPingTask run()"); + Connection connection = weakRefConnection.get(); + if (connection == null) { + // connection has been collected by GC + // which means we can stop the thread by breaking the loop + return; + } + if (connection.isAuthenticated()) { + boolean res = false; + + for (int i = 0; i < TRIES; i++) { + if (i != 0) { + try { + Thread.sleep(DELTA); + } catch (InterruptedException e) { + // We received an interrupt + // This only happens if we should stop pinging + return; + } + } + res = pingMyServer(); + // stop when we receive a pong back + if (res) { + lastSuccessfulAutomaticPing = System.currentTimeMillis(); + break; + } + } + LOGGER.fine("ServerPingTask res=" + res); + if (!res) { + for (PingFailedListener l : pingFailedListeners) { + l.pingFailed(); + } + } else { + // Ping was successful, wind-up the periodic task again + maybeSchedulePingServerTask(); + } + } else { + LOGGER.warning("ServerPingTask: Connection was not authenticated"); + } + } + }; } diff --git a/extensions/src/main/java/org/jivesoftware/smackx/ping/packet/Ping.java b/extensions/src/main/java/org/jivesoftware/smackx/ping/packet/Ping.java index 68fd7ee79..8ec5ac744 100644 --- a/extensions/src/main/java/org/jivesoftware/smackx/ping/packet/Ping.java +++ b/extensions/src/main/java/org/jivesoftware/smackx/ping/packet/Ping.java @@ -14,14 +14,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.jivesoftware.smackx.ping.packet; import org.jivesoftware.smack.packet.IQ; +import org.jivesoftware.smackx.ping.PingManager; public class Ping extends IQ { - public static final String NAMESPACE = "urn:xmpp:ping"; public static final String ELEMENT = "ping"; public Ping() { @@ -34,6 +33,6 @@ public class Ping extends IQ { @Override public String getChildElementXML() { - return "<" + ELEMENT + " xmlns=\'" + NAMESPACE + "\' />"; + return "<" + ELEMENT + " xmlns=\'" + PingManager.NAMESPACE + "\' />"; } } diff --git a/extensions/src/main/java/org/jivesoftware/smackx/ping/packet/Pong.java b/extensions/src/main/java/org/jivesoftware/smackx/ping/packet/Pong.java new file mode 100644 index 000000000..af63f979d --- /dev/null +++ b/extensions/src/main/java/org/jivesoftware/smackx/ping/packet/Pong.java @@ -0,0 +1,45 @@ +/** + * + * Copyright 2012-2014 Florian Schmaus + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.jivesoftware.smackx.ping.packet; + +import org.jivesoftware.smack.packet.IQ; +import org.jivesoftware.smack.packet.Packet; + +public class Pong extends IQ { + + /** + * Composes a Pong packet from a received ping packet. This basically swaps + * the 'from' and 'to' attributes. And sets the IQ type to result. + * + * @param ping + */ + public Pong(Packet ping) { + setType(IQ.Type.RESULT); + setFrom(ping.getTo()); + setTo(ping.getFrom()); + setPacketID(ping.getPacketID()); + } + + /** + * Returns the child element of the Pong reply, which is non-existent. This + * is why we return 'null' here. See e.g. Example 11 from + * http://xmpp.org/extensions/xep-0199.html#e2e + */ + public String getChildElementXML() { + return null; + } +} diff --git a/extensions/src/test/java/org/jivesoftware/smackx/ping/PingTest.java b/extensions/src/test/java/org/jivesoftware/smackx/ping/PingTest.java index 63d611973..c3b36524b 100644 --- a/extensions/src/test/java/org/jivesoftware/smackx/ping/PingTest.java +++ b/extensions/src/test/java/org/jivesoftware/smackx/ping/PingTest.java @@ -186,7 +186,7 @@ public class PingTest { public void checkSuccessfulDiscoRequest() throws Exception { ThreadedDummyConnection con = new ThreadedDummyConnection(); DiscoverInfo info = new DiscoverInfo(); - info.addFeature(Ping.NAMESPACE); + info.addFeature(PingManager.NAMESPACE); //@formatter:off String reply = @@ -209,7 +209,7 @@ public class PingTest { public void checkUnuccessfulDiscoRequest() throws Exception { ThreadedDummyConnection con = new ThreadedDummyConnection(); DiscoverInfo info = new DiscoverInfo(); - info.addFeature(Ping.NAMESPACE); + info.addFeature(PingManager.NAMESPACE); //@formatter:off String reply =