Add automatically scheduled pings in PingManager

replaces keep-alive functionality. SMACK-537
This commit is contained in:
Florian Schmaus 2014-02-23 21:36:57 +01:00
parent 54a421e84e
commit 3a4e6c6d39
5 changed files with 273 additions and 40 deletions

View File

@ -30,6 +30,10 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger;
@ -222,6 +226,8 @@ public abstract class Connection {
protected XMPPInputOutputStream compressionHandler;
private final ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(2);
/**
* Create a new Connection to a XMPP server.
*
@ -951,4 +957,8 @@ public abstract class Connection {
}
}
}
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
return executorService.schedule(command, delay, unit);
}
}

View File

@ -1,6 +1,6 @@
/**
*
* Copyright 2012-2013 Florian Schmaus
* Copyright 2012-2014 Florian Schmaus
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -18,40 +18,51 @@ package org.jivesoftware.smackx.ping;
import java.lang.ref.WeakReference;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.WeakHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import org.jivesoftware.smack.Connection;
import org.jivesoftware.smack.ConnectionCreationListener;
import org.jivesoftware.smack.ConnectionListener;
import org.jivesoftware.smack.PacketListener;
import org.jivesoftware.smack.SmackConfiguration;
import org.jivesoftware.smack.SmackError;
import org.jivesoftware.smack.XMPPException;
import org.jivesoftware.smack.filter.AndFilter;
import org.jivesoftware.smack.filter.IQTypeFilter;
import org.jivesoftware.smack.filter.PacketFilter;
import org.jivesoftware.smack.filter.PacketTypeFilter;
import org.jivesoftware.smack.packet.IQ;
import org.jivesoftware.smack.packet.Packet;
import org.jivesoftware.smack.packet.IQ.Type;
import org.jivesoftware.smackx.disco.ServiceDiscoveryManager;
import org.jivesoftware.smackx.disco.packet.DiscoverInfo;
import org.jivesoftware.smackx.ping.packet.Ping;
import org.jivesoftware.smackx.ping.packet.Pong;
/**
* Implements the XMPP Ping as defined by XEP-0199. The XMPP Ping protocol
* allows one entity to 'ping' any other entity by simply sending a ping to
* the appropriate JID.
* <p>
* Implements the XMPP Ping as defined by XEP-0199. The XMPP Ping protocol allows one entity to
* ping any other entity by simply sending a ping to the appropriate JID. PingManger also
* periodically sends XMPP pings to the server every 30 minutes to avoid NAT timeouts and to test
* the connection status.
*
* @author Florian Schmaus
* @see <a href="http://www.xmpp.org/extensions/xep-0199.html">XEP-0199:XMPP
* Ping</a>
* @see <a href="http://www.xmpp.org/extensions/xep-0199.html">XEP-0199:XMPP Ping</a>
*/
public class PingManager {
private static Map<Connection, PingManager> instances = Collections
public static final String NAMESPACE = "urn:xmpp:ping";
private static final Logger LOGGER = Logger.getLogger(PingManager.class.getName());
private static final Map<Connection, PingManager> INSTANCES = Collections
.synchronizedMap(new WeakHashMap<Connection, PingManager>());
private static final PacketFilter PING_PACKET_FILTER = new AndFilter(
new PacketTypeFilter(Ping.class), new IQTypeFilter(Type.GET));
static {
Connection.addConnectionCreationListener(new ConnectionCreationListener() {
public void connectionCreated(Connection connection) {
@ -60,8 +71,6 @@ public class PingManager {
});
}
private WeakReference<Connection> weakRefConnection;
/**
* Retrieves a {@link PingManager} for the specified {@link Connection}, creating one if it doesn't already
* exist.
@ -71,36 +80,80 @@ public class PingManager {
* @return The new or existing manager.
*/
public synchronized static PingManager getInstanceFor(Connection connection) {
PingManager pingManager = instances.get(connection);
PingManager pingManager = INSTANCES.get(connection);
if (pingManager == null) {
pingManager = new PingManager(connection);
}
return pingManager;
}
private static int defaultPingInterval = 60 * 30;
/**
* Set the default ping interval which will be used for new connections.
*
* @param interval the interval in seconds
*/
public static void setDefaultPingInterval(int interval) {
defaultPingInterval = interval;
}
private final Set<PingFailedListener> pingFailedListeners = Collections
.synchronizedSet(new HashSet<PingFailedListener>());
/**
* The interval in seconds between pings are send to the users server.
*/
private int pingInterval = defaultPingInterval;
/**
* The time in milliseconds the last successful ping was send to the users server.
*/
private volatile long lastSuccessfulAutomaticPing = -1;
private ScheduledFuture<?> nextAutomaticPing;
/**
* The time in milliseconds the last manual ping as successful.
*/
private long lastSuccessfulManualPing = -1;
private WeakReference<Connection> weakRefConnection;
private PingManager(Connection connection) {
weakRefConnection = new WeakReference<Connection>(connection);
ServiceDiscoveryManager sdm = ServiceDiscoveryManager.getInstanceFor(connection);
// The ServiceDiscoveryManager was not pre-initialized
if (sdm == null)
sdm = ServiceDiscoveryManager.getInstanceFor(connection);
sdm.addFeature(Ping.NAMESPACE);
PacketFilter pingPacketFilter = new AndFilter(new PacketTypeFilter(Ping.class), new IQTypeFilter(Type.GET));
sdm.addFeature(PingManager.NAMESPACE);
INSTANCES.put(connection, this);
connection.addPacketListener(new PacketListener() {
/**
* Sends a Pong for every Ping
*/
// Send a Pong for every Ping
@Override
public void processPacket(Packet packet) {
Connection connection = weakRefConnection.get();
IQ pong = IQ.createResultIQ((Ping) packet);
Pong pong = new Pong(packet);
connection.sendPacket(pong);
}
}, pingPacketFilter);
}, PING_PACKET_FILTER);
connection.addConnectionListener(new ConnectionListener() {
@Override
public void connectionClosed() {
maybeStopPingServerTask();
}
@Override
public void connectionClosedOnError(Exception arg0) {
maybeStopPingServerTask();
}
@Override
public void reconnectionSuccessful() {
maybeSchedulePingServerTask();
}
@Override
public void reconnectingIn(int seconds) {}
@Override
public void reconnectionFailed(Exception e) {}
});
maybeSchedulePingServerTask();
}
/**
@ -122,12 +175,11 @@ public class PingManager {
connection.createPacketCollectorAndSend(ping).nextResultOrThrow();
}
catch (XMPPException exc) {
return (jid.equals(connection.getServiceName()) && (exc.getSmackError() != SmackError.NO_RESPONSE_FROM_SERVER));
}
return true;
}
/**
* Same as calling {@link #ping(String, long)} with the defaultpacket reply
* timeout.
@ -136,9 +188,10 @@ public class PingManager {
* @return true if a reply was received from the entity, false otherwise.
*/
public boolean ping(String jid) {
return ping(jid, SmackConfiguration.getDefaultPacketReplyTimeout());
Connection connection = weakRefConnection.get();
return ping(jid, connection.getPacketReplyTimeout());
}
/**
* Query the specified entity to see if it supports the Ping protocol (XEP-0199)
*
@ -149,7 +202,7 @@ public class PingManager {
public boolean isPingSupported(String jid) throws XMPPException {
Connection connection = weakRefConnection.get();
DiscoverInfo result = ServiceDiscoveryManager.getInstanceFor(connection).discoverInfo(jid);
return result.containsFeature(Ping.NAMESPACE);
return result.containsFeature(PingManager.NAMESPACE);
}
/**
@ -163,6 +216,132 @@ public class PingManager {
*/
public boolean pingMyServer() {
Connection connection = weakRefConnection.get();
return ping(connection.getServiceName());
boolean res = ping(connection.getServiceName());
if (!res) {
for (PingFailedListener l : pingFailedListeners)
l.pingFailed();
} else {
pongReceived();
}
return res;
}
/**
* Set the interval between the server is automatic pinged. A negative value disables automatic server pings.
*
* @param pingInterval the interval between the ping
*/
public void setPingInterval(int pingInterval) {
this.pingInterval = pingInterval;
maybeSchedulePingServerTask();
}
/**
* Get the current ping interval.
*
* @return the interval between pings in seconds
*/
public int getPingInterval() {
return pingInterval;
}
/**
* Register a new PingFailedListener
*
* @param listener the listener to invoke
*/
public void registerPingFailedListener(PingFailedListener listener) {
pingFailedListeners.add(listener);
}
/**
* Unregister a PingFailedListener
*
* @param listener the listener to remove
*/
public void unregisterPingFailedListener(PingFailedListener listener) {
pingFailedListeners.remove(listener);
}
/**
* Returns the time of the last successful Ping with the
* users server. If there was no successful Ping (e.g. because this
* feature is disabled) -1 will be returned.
*
* @return
*/
public long getLastSuccessfulPing() {
return Math.max(lastSuccessfulAutomaticPing, lastSuccessfulManualPing);
}
/**
* Cancels any existing periodic ping task if there is one and schedules a new ping task if
* pingInterval is greater then zero.
*/
private synchronized void maybeSchedulePingServerTask() {
maybeStopPingServerTask();
if (pingInterval > 0) {
LOGGER.fine("Scheduling ServerPingTask in " + pingInterval + " seconds");
Connection connection = weakRefConnection.get();
nextAutomaticPing = connection.schedule(pingServerRunnable, pingInterval, TimeUnit.SECONDS);
}
}
private void maybeStopPingServerTask() {
if (nextAutomaticPing != null) {
nextAutomaticPing.cancel(true);
nextAutomaticPing = null;
}
}
private void pongReceived() {
lastSuccessfulManualPing = System.currentTimeMillis();
}
private final Runnable pingServerRunnable = new Runnable() {
private static final int DELTA = 1000; // 1 seconds
private static final int TRIES = 3; // 3 tries
public void run() {
LOGGER.fine("ServerPingTask run()");
Connection connection = weakRefConnection.get();
if (connection == null) {
// connection has been collected by GC
// which means we can stop the thread by breaking the loop
return;
}
if (connection.isAuthenticated()) {
boolean res = false;
for (int i = 0; i < TRIES; i++) {
if (i != 0) {
try {
Thread.sleep(DELTA);
} catch (InterruptedException e) {
// We received an interrupt
// This only happens if we should stop pinging
return;
}
}
res = pingMyServer();
// stop when we receive a pong back
if (res) {
lastSuccessfulAutomaticPing = System.currentTimeMillis();
break;
}
}
LOGGER.fine("ServerPingTask res=" + res);
if (!res) {
for (PingFailedListener l : pingFailedListeners) {
l.pingFailed();
}
} else {
// Ping was successful, wind-up the periodic task again
maybeSchedulePingServerTask();
}
} else {
LOGGER.warning("ServerPingTask: Connection was not authenticated");
}
}
};
}

View File

@ -14,14 +14,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jivesoftware.smackx.ping.packet;
import org.jivesoftware.smack.packet.IQ;
import org.jivesoftware.smackx.ping.PingManager;
public class Ping extends IQ {
public static final String NAMESPACE = "urn:xmpp:ping";
public static final String ELEMENT = "ping";
public Ping() {
@ -34,6 +33,6 @@ public class Ping extends IQ {
@Override
public String getChildElementXML() {
return "<" + ELEMENT + " xmlns=\'" + NAMESPACE + "\' />";
return "<" + ELEMENT + " xmlns=\'" + PingManager.NAMESPACE + "\' />";
}
}

View File

@ -0,0 +1,45 @@
/**
*
* Copyright 2012-2014 Florian Schmaus
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jivesoftware.smackx.ping.packet;
import org.jivesoftware.smack.packet.IQ;
import org.jivesoftware.smack.packet.Packet;
public class Pong extends IQ {
/**
* Composes a Pong packet from a received ping packet. This basically swaps
* the 'from' and 'to' attributes. And sets the IQ type to result.
*
* @param ping
*/
public Pong(Packet ping) {
setType(IQ.Type.RESULT);
setFrom(ping.getTo());
setTo(ping.getFrom());
setPacketID(ping.getPacketID());
}
/**
* Returns the child element of the Pong reply, which is non-existent. This
* is why we return 'null' here. See e.g. Example 11 from
* http://xmpp.org/extensions/xep-0199.html#e2e
*/
public String getChildElementXML() {
return null;
}
}

View File

@ -186,7 +186,7 @@ public class PingTest {
public void checkSuccessfulDiscoRequest() throws Exception {
ThreadedDummyConnection con = new ThreadedDummyConnection();
DiscoverInfo info = new DiscoverInfo();
info.addFeature(Ping.NAMESPACE);
info.addFeature(PingManager.NAMESPACE);
//@formatter:off
String reply =
@ -209,7 +209,7 @@ public class PingTest {
public void checkUnuccessfulDiscoRequest() throws Exception {
ThreadedDummyConnection con = new ThreadedDummyConnection();
DiscoverInfo info = new DiscoverInfo();
info.addFeature(Ping.NAMESPACE);
info.addFeature(PingManager.NAMESPACE);
//@formatter:off
String reply =