SMACK-412 Split the ping implementation to a server ping to replace keepalive and a simplified ping manager for manual pings of other entities.

git-svn-id: http://svn.igniterealtime.org/svn/repos/smack/branches/smack_3_3_0@13569 b35dd754-fafc-0310-a699-88a17e54d16e
This commit is contained in:
rcollier 2013-03-19 02:37:36 +00:00
parent a55b54f20b
commit aab1dcdabe
19 changed files with 749 additions and 651 deletions

View File

@ -2,21 +2,6 @@
<!-- Smack configuration file. -->
<smack>
<!-- Classes that will be loaded when Smack starts -->
<startupClasses>
<className>org.jivesoftware.smackx.ServiceDiscoveryManager</className>
<className>org.jivesoftware.smack.PrivacyListManager</className>
<className>org.jivesoftware.smackx.XHTMLManager</className>
<className>org.jivesoftware.smackx.muc.MultiUserChat</className>
<className>org.jivesoftware.smackx.bytestreams.ibb.InBandBytestreamManager</className>
<className>org.jivesoftware.smackx.bytestreams.socks5.Socks5BytestreamManager</className>
<className>org.jivesoftware.smackx.filetransfer.FileTransferManager</className>
<className>org.jivesoftware.smackx.LastActivityManager</className>
<className>org.jivesoftware.smack.ReconnectionManager</className>
<className>org.jivesoftware.smackx.commands.AdHocCommandManager</className>
<className>org.jivesoftware.smack.util.dns.JavaxResolver</className>
</startupClasses>
<!-- Packet reply timeout in milliseconds -->
<packetReplyTimeout>5000</packetReplyTimeout>
@ -31,11 +16,24 @@
<!-- Port of the local Socks5 proxy -->
<packetCollectorSize>10000</packetCollectorSize>
<!-- Default interval (seconds) in which the Ping Manager sends ping request to the server (30 minutes) -->
<defaultPingInterval>1800</defaultPingInterval>
<!-- Automatic enable Entity Caps (XEP-0115) for new connections -->
<autoEnableEntityCaps>false</autoEnableEntityCaps>
<!-- Classes that will be loaded when Smack starts -->
<startupClasses>
<className>org.jivesoftware.smackx.ServiceDiscoveryManager</className>
<className>org.jivesoftware.smack.PrivacyListManager</className>
<className>org.jivesoftware.smack.ping.ServerPingManager</className>
<className>org.jivesoftware.smackx.XHTMLManager</className>
<className>org.jivesoftware.smackx.muc.MultiUserChat</className>
<className>org.jivesoftware.smackx.bytestreams.ibb.InBandBytestreamManager</className>
<className>org.jivesoftware.smackx.bytestreams.socks5.Socks5BytestreamManager</className>
<className>org.jivesoftware.smackx.filetransfer.FileTransferManager</className>
<className>org.jivesoftware.smackx.LastActivityManager</className>
<className>org.jivesoftware.smack.ReconnectionManager</className>
<className>org.jivesoftware.smackx.commands.AdHocCommandManager</className>
</startupClasses>
</smack>

View File

@ -241,7 +241,7 @@
<iqProvider>
<elementName>ping</elementName>
<namespace>urn:xmpp:ping</namespace>
<className>org.jivesoftware.smackx.ping.provider.PingProvider</className>
<className>org.jivesoftware.smack.ping.provider.PingProvider</className>
</iqProvider>
<extensionProvider>

View File

@ -178,12 +178,12 @@ public abstract class Connection {
protected SmackDebugger debugger = null;
/**
* The Reader which is used for the {@see debugger}.
* The Reader which is used for the debugger.
*/
protected Reader reader;
/**
* The Writer which is used for the {@see debugger}.
* The Writer which is used for the debugger.
*/
protected Writer writer;

View File

@ -40,18 +40,11 @@ import java.util.concurrent.BlockingQueue;
class PacketWriter {
private Thread writerThread;
private Thread keepAliveThread;
private Writer writer;
private XMPPConnection connection;
private final BlockingQueue<Packet> queue;
volatile boolean done;
/**
* Timestamp when the last stanza was sent to the server. This information is used
* by the keep alive process to only send heartbeats when the connection has been idle.
*/
private long lastActive = System.currentTimeMillis();
/**
* Creates a new packet writer with the specified connection.
*
@ -117,25 +110,6 @@ class PacketWriter {
writerThread.start();
}
/**
* Starts the keep alive process. A white space (aka heartbeat) is going to be
* sent to the server every 30 seconds (by default) since the last stanza was sent
* to the server.
*/
void startKeepAliveProcess() {
// Schedule a keep-alive task to run if the feature is enabled. will write
// out a space character each time it runs to keep the TCP/IP connection open.
int keepAliveInterval = SmackConfiguration.getKeepAliveInterval();
if (keepAliveInterval > 0) {
KeepAliveTask task = new KeepAliveTask(keepAliveInterval);
keepAliveThread = new Thread(task);
task.setThread(keepAliveThread);
keepAliveThread.setDaemon(true);
keepAliveThread.setName("Smack Keep Alive (" + connection.connectionCounterValue + ")");
keepAliveThread.start();
}
}
void setWriter(Writer writer) {
this.writer = writer;
}
@ -149,9 +123,6 @@ class PacketWriter {
synchronized (queue) {
queue.notifyAll();
}
// Interrupt the keep alive thread if one was created
if (keepAliveThread != null)
keepAliveThread.interrupt();
}
/**
@ -193,10 +164,9 @@ class PacketWriter {
if (packet != null) {
synchronized (writer) {
writer.write(packet.toXML());
writer.flush();
if (queue.isEmpty()) {
writer.flush();
// Keep track of the last time a stanza was sent to the server
lastActive = System.currentTimeMillis();
}
}
}
@ -268,54 +238,4 @@ class PacketWriter {
writer.write(stream.toString());
writer.flush();
}
/**
* A TimerTask that keeps connections to the server alive by sending a space
* character on an interval.
*/
private class KeepAliveTask implements Runnable {
private int delay;
private Thread thread;
public KeepAliveTask(int delay) {
this.delay = delay;
}
protected void setThread(Thread thread) {
this.thread = thread;
}
public void run() {
try {
// Sleep a minimum of 15 seconds plus delay before sending first heartbeat. This will give time to
// properly finish TLS negotiation and then start sending heartbeats.
Thread.sleep(15000 + delay);
}
catch (InterruptedException ie) {
// Do nothing
}
while (!done && keepAliveThread == thread) {
synchronized (writer) {
// Send heartbeat if no packet has been sent to the server for a given time
if (System.currentTimeMillis() - lastActive >= delay) {
try {
writer.write(" ");
writer.flush();
}
catch (Exception e) {
// Do nothing
}
}
}
try {
// Sleep until we should write the next keep-alive.
Thread.sleep(delay);
}
catch (InterruptedException ie) {
// Do nothing
}
}
}
}
}

View File

@ -58,11 +58,6 @@ public final class SmackConfiguration {
private static int localSocks5ProxyPort = 7777;
private static int packetCollectorSize = 5000;
/**
* defaultPingInterval (in seconds)
*/
private static int defaultPingInterval = 1800; // 30 min (30*60)
/**
* This automatically enables EntityCaps for new connections if it is set to true
*/
@ -117,9 +112,6 @@ public final class SmackConfiguration {
else if (parser.getName().equals("packetCollectorSize")) {
packetCollectorSize = parseIntProperty(parser, packetCollectorSize);
}
else if (parser.getName().equals("defaultPingInterval")) {
defaultPingInterval = parseIntProperty(parser, defaultPingInterval);
}
else if (parser.getName().equals("autoEnableEntityCaps")) {
autoEnableEntityCaps = Boolean.parseBoolean(parser.nextText());
}
@ -319,24 +311,6 @@ public final class SmackConfiguration {
SmackConfiguration.localSocks5ProxyPort = localSocks5ProxyPort;
}
/**
* Returns the default ping interval (seconds)
*
* @return
*/
public static int getDefaultPingInterval() {
return defaultPingInterval;
}
/**
* Sets the default ping interval (seconds). Set it to '-1' to disable the periodic ping
*
* @param defaultPingInterval
*/
public static void setDefaultPingInterval(int defaultPingInterval) {
SmackConfiguration.defaultPingInterval = defaultPingInterval;
}
/**
* Check if Entity Caps are enabled as default for every new connection
* @return

View File

@ -650,10 +650,6 @@ public class XMPPConnection extends Connection {
// Make note of the fact that we're now connected.
connected = true;
// Start keep alive process (after TLS was negotiated - if available)
packetWriter.startKeepAliveProcess();
if (isFirstInitialization) {
// Notify listeners that a new connection has been established
for (ConnectionCreationListener listener : getConnectionCreationListeners()) {

View File

@ -14,8 +14,14 @@
* limitations under the License.
*/
package org.jivesoftware.smackx.ping;
package org.jivesoftware.smack.ping;
/**
* Defines the callback used whenever the server ping fails.
*/
public interface PingFailedListener {
/**
* Called when the server ping fails.
*/
void pingFailed();
}

View File

@ -0,0 +1,285 @@
/**
* Copyright 2012-2013 Florian Schmaus
*
* All rights reserved. Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jivesoftware.smack.ping;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.WeakHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.jivesoftware.smack.Connection;
import org.jivesoftware.smack.ConnectionCreationListener;
import org.jivesoftware.smack.ConnectionListener;
import org.jivesoftware.smack.PacketCollector;
import org.jivesoftware.smack.PacketListener;
import org.jivesoftware.smack.SmackConfiguration;
import org.jivesoftware.smack.XMPPException;
import org.jivesoftware.smack.filter.AndFilter;
import org.jivesoftware.smack.filter.IQTypeFilter;
import org.jivesoftware.smack.filter.PacketFilter;
import org.jivesoftware.smack.filter.PacketIDFilter;
import org.jivesoftware.smack.filter.PacketTypeFilter;
import org.jivesoftware.smack.packet.IQ;
import org.jivesoftware.smack.packet.IQ.Type;
import org.jivesoftware.smack.packet.Packet;
import org.jivesoftware.smack.ping.packet.Ping;
import org.jivesoftware.smackx.ServiceDiscoveryManager;
/**
* Using an implementation of <a href="http://www.xmpp.org/extensions/xep-0199.html">XMPP Ping (XEP-0199)</a>. This
* class provides keepalive functionality with the server that will periodically "ping" the server to maintain and/or
* verify that the connection still exists.
* <p>
* The ping is done at the application level and is therefore protocol agnostic. It will thus work for both standard TCP
* connections as well as BOSH or any other transport protocol. It will also work regardless of whether the server
* supports the Ping extension, since an error response to the ping serves the same purpose as a pong.
*
* @author Florian Schmaus
*/
public class ServerPingManager {
public static final long PING_MINIMUM = 10000;
private static Map<Connection, ServerPingManager> instances = Collections
.synchronizedMap(new WeakHashMap<Connection, ServerPingManager>());
private static long defaultPingInterval = SmackConfiguration.getKeepAliveInterval();
private static ScheduledExecutorService periodicPingExecutorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable runnable) {
Thread pingThread = new Thread(runnable, "Smack Server Ping");
pingThread.setDaemon(true);
return pingThread;
}
});
static {
if (defaultPingInterval > 0) {
Connection.addConnectionCreationListener(new ConnectionCreationListener() {
public void connectionCreated(Connection connection) {
new ServerPingManager(connection);
}
});
}
}
private Connection connection;
private long pingInterval = SmackConfiguration.getKeepAliveInterval();
private Set<PingFailedListener> pingFailedListeners = Collections.synchronizedSet(new HashSet<PingFailedListener>());
private volatile ScheduledFuture<?> periodicPingTask;
private volatile long lastSuccessfulContact = -1;
/**
* Retrieves a {@link ServerPingManager} for the specified {@link Connection}, creating one if it doesn't already
* exist.
*
* @param connection
* The connection the manager is attached to.
* @return The new or existing manager.
*/
public synchronized static ServerPingManager getInstanceFor(Connection connection) {
ServerPingManager pingManager = instances.get(connection);
if (pingManager == null) {
pingManager = new ServerPingManager(connection);
}
return pingManager;
}
private ServerPingManager(Connection connection) {
ServiceDiscoveryManager sdm = ServiceDiscoveryManager.getInstanceFor(connection);
sdm.addFeature(Ping.NAMESPACE);
this.connection = connection;
init();
}
private void init() {
PacketFilter pingPacketFilter = new AndFilter(new PacketTypeFilter(Ping.class), new IQTypeFilter(Type.GET));
connection.addPacketListener(new PacketListener() {
/**
* Sends a Pong for every Ping
*/
public void processPacket(Packet packet) {
IQ pong = IQ.createResultIQ((Ping) packet);
connection.sendPacket(pong);
}
}, pingPacketFilter);
connection.addConnectionListener(new ConnectionListener() {
@Override
public void connectionClosed() {
stopPingServerTask();
}
@Override
public void connectionClosedOnError(Exception arg0) {
stopPingServerTask();
}
@Override
public void reconnectionSuccessful() {
schedulePingServerTask();
}
@Override
public void reconnectingIn(int seconds) {
}
@Override
public void reconnectionFailed(Exception e) {
}
});
// Listen for all incoming packets and reset the scheduled ping whenever
// one arrives.
connection.addPacketListener(new PacketListener() {
@Override
public void processPacket(Packet packet) {
// reschedule the ping based on this last server contact
lastSuccessfulContact = System.currentTimeMillis();
schedulePingServerTask();
}
}, null);
instances.put(connection, this);
schedulePingServerTask();
}
/**
* Sets the ping interval.
*
* @param pingInterval
* The new ping time interval in milliseconds.
*/
public void setPingInterval(long newPingInterval) {
if (newPingInterval < PING_MINIMUM)
newPingInterval = PING_MINIMUM;
if (pingInterval != newPingInterval) {
pingInterval = newPingInterval;
schedulePingServerTask();
}
}
/**
* Stops pinging the server. This cannot stop a ping that has already started, but will prevent another from being triggered.
* <p>
* To restart, call {@link #setPingInterval(long)}.
*/
public void stopPinging() {
pingInterval = -1;
stopPingServerTask();
}
/**
* Gets the ping interval.
*
* @return The ping interval in milliseconds.
*/
public long getPingInterval() {
return pingInterval;
}
/**
* Add listener for notification when a server ping fails.
*
* <p>
* Please note that this doesn't necessarily mean that the connection is lost, a slow to respond server could also
* cause a failure due to taking too long to respond and thus causing a reply timeout.
*
* @param listener
* The listener to be called
*/
public void addPingFailedListener(PingFailedListener listener) {
pingFailedListeners.add(listener);
}
/**
* Remove the listener.
*
* @param listener
* The listener to be removed.
*/
public void removePingFailedListener(PingFailedListener listener) {
pingFailedListeners.remove(listener);
}
/**
* Returns the time of the last successful contact with the server. (i.e. the last time any message was received).
*
* @return Time of last message or -1 if none has been received since manager was created.
*/
public long getLastSuccessfulContact() {
return lastSuccessfulContact;
}
/**
* Cancels any existing periodic ping task if there is one and schedules a new ping task if pingInterval is greater
* then zero.
*
* This is designed so only one executor is used for scheduling all pings on all connections. This results in only 1 thread used for pinging.
*/
private synchronized void schedulePingServerTask() {
stopPingServerTask();
if (pingInterval > 0) {
periodicPingTask = periodicPingExecutorService.schedule(new Runnable() {
@Override
public void run() {
Ping ping = new Ping();
PacketFilter responseFilter = new PacketIDFilter(ping.getPacketID());
final PacketCollector response = connection.createPacketCollector(responseFilter);
connection.sendPacket(ping);
if (!pingFailedListeners.isEmpty()) {
// Schedule a collector for the ping reply, notify listeners if none is received.
periodicPingExecutorService.schedule(new Runnable() {
@Override
public void run() {
Packet result = response.nextResult(1);
// Stop queuing results
response.cancel();
// The actual result of the reply can be ignored since we only care if we actually got one.
if (result == null) {
for (PingFailedListener listener : pingFailedListeners) {
listener.pingFailed();
}
}
}
}, SmackConfiguration.getPacketReplyTimeout(), TimeUnit.MILLISECONDS);
}
}
}, getPingInterval(), TimeUnit.MILLISECONDS);
}
}
private void stopPingServerTask() {
if (periodicPingTask != null) {
periodicPingTask.cancel(true);
periodicPingTask = null;
}
}
}

View File

@ -14,25 +14,26 @@
* limitations under the License.
*/
package org.jivesoftware.smackx.ping.packet;
package org.jivesoftware.smack.ping.packet;
import org.jivesoftware.smack.packet.IQ;
import org.jivesoftware.smackx.ping.PingManager;
public class Ping extends IQ {
public static final String NAMESPACE = "urn:xmpp:ping";
public static final String ELEMENT = "ping";
public Ping() {
}
public Ping(String from, String to) {
public Ping(String to) {
setTo(to);
setFrom(from);
setType(IQ.Type.GET);
setPacketID(getPacketID());
}
@Override
public String getChildElementXML() {
return "<" + PingManager.ELEMENT + " xmlns=\'" + PingManager.NAMESPACE + "\' />";
return "<" + ELEMENT + " xmlns=\'" + NAMESPACE + "\' />";
}
}

View File

@ -14,11 +14,11 @@
* limitations under the License.
*/
package org.jivesoftware.smackx.ping.provider;
package org.jivesoftware.smack.ping.provider;
import org.jivesoftware.smack.packet.IQ;
import org.jivesoftware.smack.ping.packet.Ping;
import org.jivesoftware.smack.provider.IQProvider;
import org.jivesoftware.smackx.ping.packet.Ping;
import org.xmlpull.v1.XmlPullParser;
public class PingProvider implements IQProvider {

View File

@ -47,7 +47,7 @@ final public class SyncPacketSend
response.cancel();
if (result == null) {
throw new XMPPException("No response from server.");
throw new XMPPException("No response from " + packet.getTo());
}
else if (result.getError() != null) {
throw new XMPPException(result.getError());

View File

@ -16,328 +16,80 @@
package org.jivesoftware.smackx.ping;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.WeakHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.jivesoftware.smack.Connection;
import org.jivesoftware.smack.ConnectionCreationListener;
import org.jivesoftware.smack.ConnectionListener;
import org.jivesoftware.smack.PacketCollector;
import org.jivesoftware.smack.PacketListener;
import org.jivesoftware.smack.SmackConfiguration;
import org.jivesoftware.smack.XMPPException;
import org.jivesoftware.smack.filter.PacketFilter;
import org.jivesoftware.smack.filter.PacketIDFilter;
import org.jivesoftware.smack.filter.PacketTypeFilter;
import org.jivesoftware.smack.packet.IQ;
import org.jivesoftware.smack.packet.Packet;
import org.jivesoftware.smack.ping.ServerPingManager;
import org.jivesoftware.smack.ping.packet.Ping;
import org.jivesoftware.smack.util.SyncPacketSend;
import org.jivesoftware.smackx.ServiceDiscoveryManager;
import org.jivesoftware.smackx.packet.DiscoverInfo;
import org.jivesoftware.smackx.ping.packet.Ping;
import org.jivesoftware.smackx.ping.packet.Pong;
/**
* Implements the XMPP Ping as defined by XEP-0199. This protocol offers an
* alternative to the traditional 'white space ping' approach of determining the
* availability of an entity. The XMPP Ping protocol allows ping messages to be
* send in a more XML-friendly approach, which can be used over more than one
* hop in the communication path.
* Implements the XMPP Ping as defined by XEP-0199. The XMPP Ping protocol
* allows one entity to 'ping' any other entity by simply sending a ping to
* the appropriate JID.
* <p>
* NOTE: The {@link ServerPingManager} already provides a keepalive functionality
* for regularly pinging the server to keep the underlying transport connection
* alive. This class is specifically intended to do manual pings of other
* entities.
*
* @author Florian Schmaus
* @see <a href="http://www.xmpp.org/extensions/xep-0199.html">XEP-0199:XMPP
* Ping</a>
*/
public class PingManager {
public static final String NAMESPACE = "urn:xmpp:ping";
public static final String ELEMENT = "ping";
private static Map<Connection, PingManager> instances =
Collections.synchronizedMap(new WeakHashMap<Connection, PingManager>());
static {
Connection.addConnectionCreationListener(new ConnectionCreationListener() {
public void connectionCreated(Connection connection) {
new PingManager(connection);
}
});
}
private ScheduledExecutorService periodicPingExecutorService;
private Connection connection;
private int pingInterval = SmackConfiguration.getDefaultPingInterval();
private Set<PingFailedListener> pingFailedListeners = Collections
.synchronizedSet(new HashSet<PingFailedListener>());
private ScheduledFuture<?> periodicPingTask;
protected volatile long lastSuccessfulPingByTask = -1;
// Ping Flood protection
private long pingMinDelta = 100;
private long lastPingStamp = 0; // timestamp of the last received ping
// Timestamp of the last pong received, either from the server or another entity
// Note, no need to synchronize this value, it will only increase over time
private long lastSuccessfulManualPing = -1;
private PingManager(Connection connection) {
public PingManager(Connection connection) {
ServiceDiscoveryManager sdm = ServiceDiscoveryManager.getInstanceFor(connection);
sdm.addFeature(NAMESPACE);
sdm.addFeature(Ping.NAMESPACE);
this.connection = connection;
init();
}
private void init() {
periodicPingExecutorService = new ScheduledThreadPoolExecutor(1);
PacketFilter pingPacketFilter = new PacketTypeFilter(Ping.class);
connection.addPacketListener(new PacketListener() {
/**
* Sends a Pong for every Ping
*/
public void processPacket(Packet packet) {
if (pingMinDelta > 0) {
// Ping flood protection enabled
long currentMillies = System.currentTimeMillis();
long delta = currentMillies - lastPingStamp;
lastPingStamp = currentMillies;
if (delta < pingMinDelta) {
return;
}
}
Pong pong = new Pong((Ping)packet);
connection.sendPacket(pong);
}
/**
* Pings the given jid. This method will return false if an error occurs.
* <p>
* Use {@link #isPingSupported(String)} to determine if XMPP Ping is supported
* by the entity.
*
* @param jid The id of the entity the ping is being sent to
* @param pingTimeout The time to wait for a reply
* @return true if a reply was received from the entity, false otherwise.
*/
public boolean ping(String jid, long pingTimeout) {
Ping ping = new Ping(jid);
try {
SyncPacketSend.getReply(connection, ping);
}
, pingPacketFilter);
connection.addConnectionListener(new ConnectionListener() {
@Override
public void connectionClosed() {
maybeStopPingServerTask();
}
@Override
public void connectionClosedOnError(Exception arg0) {
maybeStopPingServerTask();
}
@Override
public void reconnectionSuccessful() {
maybeSchedulePingServerTask();
}
@Override
public void reconnectingIn(int seconds) {
}
@Override
public void reconnectionFailed(Exception e) {
}
});
instances.put(connection, this);
maybeSchedulePingServerTask();
}
public static PingManager getInstanceFor(Connection connection) {
PingManager pingManager = instances.get(connection);
if (pingManager == null) {
pingManager = new PingManager(connection);
catch (XMPPException exc) {
return false;
}
return pingManager;
}
public void setPingIntervall(int pingIntervall) {
this.pingInterval = pingIntervall;
}
public int getPingIntervall() {
return pingInterval;
}
public void registerPingFailedListener(PingFailedListener listener) {
pingFailedListeners.add(listener);
}
public void unregisterPingFailedListener(PingFailedListener listener) {
pingFailedListeners.remove(listener);
}
public void disablePingFloodProtection() {
setPingMinimumInterval(-1);
}
public void setPingMinimumInterval(long ms) {
this.pingMinDelta = ms;
}
public long getPingMinimumInterval() {
return this.pingMinDelta;
return true;
}
/**
* Pings the given jid and returns the IQ response which is either of
* IQ.Type.ERROR or IQ.Type.RESULT. If we are not connected or if there was
* no reply, null is returned.
* Same as calling {@link #ping(String, long)} with the defaultpacket reply
* timeout.
*
* You should use isPingSupported(jid) to determine if XMPP Ping is
* supported by the user.
*
* @param jid
* @param pingTimeout
* @return
* @param jid The id of the entity the ping is being sent to
* @return true if a reply was received from the entity, false otherwise.
*/
public IQ ping(String jid, long pingTimeout) {
// Make sure we actually connected to the server
if (!connection.isAuthenticated())
return null;
Ping ping = new Ping(connection.getUser(), jid);
PacketCollector collector =
connection.createPacketCollector(new PacketIDFilter(ping.getPacketID()));
connection.sendPacket(ping);
IQ result = (IQ) collector.nextResult(pingTimeout);
collector.cancel();
return result;
}
/**
* Pings the given jid and returns the IQ response with the default
* packet reply timeout
*
* @param jid
* @return
*/
public IQ ping(String jid) {
public boolean ping(String jid) {
return ping(jid, SmackConfiguration.getPacketReplyTimeout());
}
/**
* Pings the given Entity.
* Query the specified entity to see if it supports the Ping protocol (XEP-0199)
*
* Note that XEP-199 shows that if we receive a error response
* service-unavailable there is no way to determine if the response was send
* by the entity (e.g. a user JID) or from a server in between. This is
* intended behavior to avoid presence leaks.
*
* Always use isPingSupported(jid) to determine if XMPP Ping is supported
* by the entity.
*
* @param jid
* @return True if a pong was received, otherwise false
* @param jid The id of the entity the query is being sent to
* @return true if it supports ping, false otherwise.
* @throws XMPPException An XMPP related error occurred during the request
*/
public boolean pingEntity(String jid, long pingTimeout) {
IQ result = ping(jid, pingTimeout);
if (result == null || result.getType() == IQ.Type.ERROR) {
return false;
}
pongReceived();
return true;
}
public boolean pingEntity(String jid) {
return pingEntity(jid, SmackConfiguration.getPacketReplyTimeout());
}
/**
* Pings the user's server. Will notify the registered
* pingFailedListeners in case of error.
*
* If we receive as response, we can be sure that it came from the server.
*
* @return true if successful, otherwise false
*/
public boolean pingMyServer(long pingTimeout) {
IQ result = ping(connection.getServiceName(), pingTimeout);
if (result == null) {
for (PingFailedListener l : pingFailedListeners) {
l.pingFailed();
}
return false;
}
// Maybe not really a pong, but an answer is an answer
pongReceived();
return true;
}
/**
* Pings the user's server with the PacketReplyTimeout as defined
* in SmackConfiguration.
*
* @return true if successful, otherwise false
*/
public boolean pingMyServer() {
return pingMyServer(SmackConfiguration.getPacketReplyTimeout());
}
/**
* Returns true if XMPP Ping is supported by a given JID
*
* @param jid
* @return
*/
public boolean isPingSupported(String jid) {
try {
DiscoverInfo result =
ServiceDiscoveryManager.getInstanceFor(connection).discoverInfo(jid);
return result.containsFeature(NAMESPACE);
}
catch (XMPPException e) {
return false;
}
}
/**
* Returns the time of the last successful Ping Pong with the
* users server. If there was no successful Ping (e.g. because this
* feature is disabled) -1 will be returned.
*
* @return
*/
public long getLastSuccessfulPing() {
return Math.max(lastSuccessfulPingByTask, lastSuccessfulManualPing);
}
protected Set<PingFailedListener> getPingFailedListeners() {
return pingFailedListeners;
}
/**
* Cancels any existing periodic ping task if there is one and schedules a new ping task if pingInterval is greater
* then zero.
*
*/
protected synchronized void maybeSchedulePingServerTask() {
maybeStopPingServerTask();
if (pingInterval > 0) {
periodicPingTask = periodicPingExecutorService.schedule(new ServerPingTask(connection), pingInterval,
TimeUnit.SECONDS);
}
}
private void maybeStopPingServerTask() {
if (periodicPingTask != null) {
periodicPingTask.cancel(true);
periodicPingTask = null;
}
}
private void pongReceived() {
lastSuccessfulManualPing = System.currentTimeMillis();
public boolean isPingSupported(String jid) throws XMPPException {
DiscoverInfo result = ServiceDiscoveryManager.getInstanceFor(connection).discoverInfo(jid);
return result.containsFeature(Ping.NAMESPACE);
}
}

View File

@ -1,77 +0,0 @@
/**
* Copyright 2012-2013 Florian Schmaus
*
* All rights reserved. Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jivesoftware.smackx.ping;
import java.lang.ref.WeakReference;
import java.util.Set;
import org.jivesoftware.smack.Connection;
class ServerPingTask implements Runnable {
// This has to be a weak reference because IIRC all threads are roots
// for objects and we have a new thread here that should hold a strong
// reference to connection so that it can be GCed.
private WeakReference<Connection> weakConnection;
private int delta = 1000; // 1 seconds
private int tries = 3; // 3 tries
protected ServerPingTask(Connection connection) {
this.weakConnection = new WeakReference<Connection>(connection);
}
public void run() {
Connection connection = weakConnection.get();
if (connection == null) {
// connection has been collected by GC
// which means we can stop the thread by breaking the loop
return;
}
if (connection.isAuthenticated()) {
PingManager pingManager = PingManager.getInstanceFor(connection);
boolean res = false;
for (int i = 0; i < tries; i++) {
if (i != 0) {
try {
Thread.sleep(delta);
} catch (InterruptedException e) {
// We received an interrupt
// This only happens if we should stop pinging
return;
}
}
res = pingManager.pingMyServer();
// stop when we receive a pong back
if (res) {
pingManager.lastSuccessfulPingByTask = System.currentTimeMillis();
break;
}
}
if (!res) {
Set<PingFailedListener> pingFailedListeners = pingManager.getPingFailedListeners();
for (PingFailedListener l : pingFailedListeners) {
l.pingFailed();
}
} else {
// Ping was successful, wind-up the periodic task again
pingManager.maybeSchedulePingServerTask();
}
}
}
}

View File

@ -1,45 +0,0 @@
/**
* Copyright 2012 Florian Schmaus
*
* All rights reserved. Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jivesoftware.smackx.ping.packet;
import org.jivesoftware.smack.packet.IQ;
public class Pong extends IQ {
/**
* Composes a Pong packet from a received ping packet. This basically swaps
* the 'from' and 'to' attributes. And sets the IQ type to result.
*
* @param ping
*/
public Pong(Ping ping) {
setType(IQ.Type.RESULT);
setFrom(ping.getTo());
setTo(ping.getFrom());
setPacketID(ping.getPacketID());
}
/*
* Returns the child element of the Pong reply, which is non-existent. This
* is why we return 'null' here. See e.g. Example 11 from
* http://xmpp.org/extensions/xep-0199.html#e2e
*/
public String getChildElementXML() {
return null;
}
}

View File

@ -57,11 +57,15 @@ public class DummyConnection extends Connection {
private final BlockingQueue<Packet> queue = new LinkedBlockingQueue<Packet>();
public DummyConnection() {
super(new ConnectionConfiguration("example.com"));
this(new ConnectionConfiguration("example.com"));
}
public DummyConnection(ConnectionConfiguration configuration) {
super(configuration);
for (ConnectionCreationListener listener : getConnectionCreationListeners()) {
listener.connectionCreated(this);
}
}
@Override
@ -191,15 +195,27 @@ public class DummyConnection extends Connection {
}
/**
* Returns the first packet that's sent through {@link #sendPacket(Packet)} and
* that has not been returned by earlier calls to this method. This method
* will block for up to two seconds if no packets have been sent yet.
* Returns the first packet that's sent through {@link #sendPacket(Packet)}
* and that has not been returned by earlier calls to this method.
*
* @return a sent packet.
* @throws InterruptedException
*/
public Packet getSentPacket() throws InterruptedException {
return queue.poll(2, TimeUnit.SECONDS);
return queue.poll();
}
/**
* Returns the first packet that's sent through {@link #sendPacket(Packet)}
* and that has not been returned by earlier calls to this method. This
* method will block for up to the specified number of seconds if no packets
* have been sent yet.
*
* @return a sent packet.
* @throws InterruptedException
*/
public Packet getSentPacket(int wait) throws InterruptedException {
return queue.poll(wait, TimeUnit.SECONDS);
}
/**

View File

@ -17,80 +17,85 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jivesoftware.smack;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.jivesoftware.smack.packet.IQ;
import org.jivesoftware.smack.packet.Message;
import org.jivesoftware.smack.packet.Packet;
import org.jivesoftware.smack.packet.IQ.Type;
public class ThreadedDummyConnection extends DummyConnection
{
private BlockingQueue<IQ> replyQ = new ArrayBlockingQueue<IQ>(1);
private BlockingQueue<Packet> messageQ = new LinkedBlockingQueue<Packet>(5);
@Override
public void sendPacket(Packet packet)
{
super.sendPacket(packet);
if ((packet instanceof IQ) && !replyQ.isEmpty())
{
// Set reply packet to match one being sent. We haven't started the
// other thread yet so this is still safe.
IQ replyPacket = replyQ.peek();
replyPacket.setPacketID(packet.getPacketID());
replyPacket.setFrom(packet.getTo());
replyPacket.setTo(packet.getFrom());
replyPacket.setType(Type.RESULT);
new ProcessQueue(replyQ).start();
}
}
public void addMessage(Message msgToProcess)
{
messageQ.add(msgToProcess);
}
public void addIQReply(IQ reply)
{
replyQ.add(reply);
}
public void processMessages()
{
if (!messageQ.isEmpty())
new ProcessQueue(messageQ).start();
else
System.out.println("No messages to process");
}
class ProcessQueue extends Thread
{
private BlockingQueue<? extends Packet> processQ;
ProcessQueue(BlockingQueue<? extends Packet> queue)
{
processQ = queue;
}
@Override
public void run()
{
try
{
processPacket(processQ.take());
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
};
}
package org.jivesoftware.smack;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.jivesoftware.smack.packet.IQ;
import org.jivesoftware.smack.packet.Message;
import org.jivesoftware.smack.packet.Packet;
import org.jivesoftware.smack.packet.IQ.Type;
public class ThreadedDummyConnection extends DummyConnection {
private BlockingQueue<IQ> replyQ = new ArrayBlockingQueue<IQ>(1);
private BlockingQueue<Packet> messageQ = new LinkedBlockingQueue<Packet>(5);
private volatile boolean timeout = false;
@Override
public void sendPacket(Packet packet) {
super.sendPacket(packet);
if (packet instanceof IQ && !timeout) {
timeout = false;
// Set reply packet to match one being sent. We haven't started the
// other thread yet so this is still safe.
IQ replyPacket = replyQ.peek();
// If no reply has been set via addIQReply, then we create a simple reply
if (replyPacket == null) {
replyPacket = IQ.createResultIQ((IQ) packet);
replyQ.add(replyPacket);
}
replyPacket.setPacketID(packet.getPacketID());
replyPacket.setFrom(packet.getTo());
replyPacket.setTo(packet.getFrom());
replyPacket.setType(Type.RESULT);
new ProcessQueue(replyQ).start();
}
}
/**
* Calling this method will cause the next sendPacket call with an IQ packet to timeout.
* This is accomplished by simply stopping the auto creating of the reply packet
* or processing one that was entered via {@link #processPacket(Packet)}.
*/
public void setTimeout() {
timeout = true;
}
public void addMessage(Message msgToProcess) {
messageQ.add(msgToProcess);
}
public void addIQReply(IQ reply) {
replyQ.add(reply);
}
public void processMessages() {
if (!messageQ.isEmpty())
new ProcessQueue(messageQ).start();
else
System.out.println("No messages to process");
}
class ProcessQueue extends Thread {
private BlockingQueue<? extends Packet> processQ;
ProcessQueue(BlockingQueue<? extends Packet> queue) {
processQ = queue;
}
@Override
public void run() {
try {
processPacket(processQ.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
}

View File

@ -0,0 +1,162 @@
package org.jivesoftware.smack.ping;
import static org.custommonkey.xmlunit.XMLAssert.assertXMLEqual;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.jivesoftware.smack.Connection;
import org.jivesoftware.smack.DummyConnection;
import org.jivesoftware.smack.PacketInterceptor;
import org.jivesoftware.smack.PacketListener;
import org.jivesoftware.smack.SmackConfiguration;
import org.jivesoftware.smack.TestUtils;
import org.jivesoftware.smack.ThreadedDummyConnection;
import org.jivesoftware.smack.filter.IQTypeFilter;
import org.jivesoftware.smack.filter.PacketTypeFilter;
import org.jivesoftware.smack.packet.IQ;
import org.jivesoftware.smack.packet.Packet;
import org.jivesoftware.smack.ping.packet.Ping;
import org.jivesoftware.smack.util.PacketParserUtils;
import org.junit.Test;
public class ServerPingTest {
private static String TO = "juliet@capulet.lit/balcony";
private static String ID = "s2c1";
private static Properties outputProperties = new Properties();
{
outputProperties.put(javax.xml.transform.OutputKeys.OMIT_XML_DECLARATION, "yes");
}
/*
* Stanza copied from spec
*/
@Test
public void validatePingStanzaXML() throws Exception {
// @formatter:off
String control = "<iq to='juliet@capulet.lit/balcony' id='s2c1' type='get'>"
+ "<ping xmlns='urn:xmpp:ping'/>" + "</iq>";
// @formatter:on
Ping ping = new Ping(TO);
ping.setPacketID(ID);
assertXMLEqual(control, ping.toXML());
}
@Test
public void checkProvider() throws Exception {
// @formatter:off
String control = "<iq from='capulet.lit' to='juliet@capulet.lit/balcony' id='s2c1' type='get'>"
+ "<ping xmlns='urn:xmpp:ping'/>" + "</iq>";
// @formatter:on
DummyConnection con = new DummyConnection();
IQ pingRequest = PacketParserUtils.parseIQ(TestUtils.getIQParser(control), con);
assertTrue(pingRequest instanceof Ping);
con.processPacket(pingRequest);
Packet pongPacket = con.getSentPacket();
assertTrue(pongPacket instanceof IQ);
IQ pong = (IQ) pongPacket;
assertEquals("juliet@capulet.lit/balcony", pong.getFrom());
assertEquals("capulet.lit", pong.getTo());
assertEquals("s2c1", pong.getPacketID());
assertEquals(IQ.Type.RESULT, pong.getType());
}
@Test
public void serverPingFailSingleConnection() throws Exception {
DummyConnection connection = getConnection();
CountDownLatch latch = new CountDownLatch(2);
addInterceptor(connection, latch);
addPingFailedListener(connection, latch);
// Time based testing kind of sucks, but this should be reliable on a DummyConnection since there
// is no actual server involved. This will provide enough time to ping and wait for the lack of response.
assertTrue(latch.await(getWaitTime(), TimeUnit.MILLISECONDS));
}
@Test
public void serverPingSuccessfulSingleConnection() throws Exception {
ThreadedDummyConnection connection = getThreadedConnection();
final CountDownLatch latch = new CountDownLatch(1);
connection.addPacketListener(new PacketListener() {
@Override
public void processPacket(Packet packet) {
latch.countDown();
}
}, new IQTypeFilter(IQ.Type.RESULT));
// Time based testing kind of sucks, but this should be reliable on a DummyConnection since there
// is no actual server involved. This will provide enough time to ping and wait for the lack of response.
assertTrue(latch.await(getWaitTime(), TimeUnit.MILLISECONDS));
}
@Test
public void serverPingFailMultipleConnection() throws Exception {
CountDownLatch latch = new CountDownLatch(6);
SmackConfiguration.setPacketReplyTimeout(15000);
DummyConnection con1 = getConnection();
addInterceptor(con1, latch);
addPingFailedListener(con1, latch);
DummyConnection con2 = getConnection();
addInterceptor(con2, latch);
addPingFailedListener(con2, latch);
DummyConnection con3 = getConnection();
addInterceptor(con3, latch);
addPingFailedListener(con2, latch);
assertTrue(latch.await(getWaitTime(), TimeUnit.MILLISECONDS));
}
private void addPingFailedListener(DummyConnection con, final CountDownLatch latch) {
ServerPingManager manager = ServerPingManager.getInstanceFor(con);
manager.addPingFailedListener(new PingFailedListener() {
@Override
public void pingFailed() {
latch.countDown();
}
});
}
private DummyConnection getConnection() {
DummyConnection con = new DummyConnection();
ServerPingManager mgr = ServerPingManager.getInstanceFor(con);
mgr.setPingInterval(ServerPingManager.PING_MINIMUM);
return con;
}
private ThreadedDummyConnection getThreadedConnection() {
ThreadedDummyConnection con = new ThreadedDummyConnection();
ServerPingManager mgr = ServerPingManager.getInstanceFor(con);
mgr.setPingInterval(ServerPingManager.PING_MINIMUM);
return con;
}
private void addInterceptor(final Connection con, final CountDownLatch latch) {
con.addPacketInterceptor(new PacketInterceptor() {
@Override
public void interceptPacket(Packet packet) {
con.removePacketInterceptor(this);
latch.countDown();
}
}, new PacketTypeFilter(Ping.class));
}
private long getWaitTime() {
return ServerPingManager.PING_MINIMUM + SmackConfiguration.getPacketReplyTimeout() + 3000;
}
}

View File

@ -15,24 +15,127 @@
*/
package org.jivesoftware.smackx.ping;
import static org.junit.Assert.assertEquals;
import org.jivesoftware.smackx.ping.packet.Ping;
import org.jivesoftware.smackx.ping.packet.Pong;
import org.jivesoftware.smack.DummyConnection;
import org.jivesoftware.smack.TestUtils;
import org.jivesoftware.smack.ThreadedDummyConnection;
import org.jivesoftware.smack.packet.IQ;
import org.jivesoftware.smack.packet.Packet;
import org.jivesoftware.smack.ping.packet.Ping;
import org.jivesoftware.smack.util.PacketParserUtils;
import org.jivesoftware.smackx.packet.DiscoverInfo;
import org.junit.Test;
import static org.junit.Assert.*;
public class PingPongTest {
@Test
public void createPongfromPingTest() {
Ping ping = new Ping("from@sender.local/resourceFrom", "to@receiver.local/resourceTo");
public void checkSendingPing() throws Exception {
DummyConnection con = new DummyConnection();
PingManager pinger = new PingManager(con);
pinger.ping("test@myserver.com");
// create a pong from a ping
Pong pong = new Pong(ping);
assertEquals(pong.getFrom(), ping.getTo());
assertEquals(pong.getTo(), ping.getFrom());
assertEquals(pong.getPacketID(), ping.getPacketID());
Packet sentPacket = con.getSentPacket();
assertTrue(sentPacket instanceof Ping);
}
@Test
public void checkSuccessfulPing() throws Exception {
ThreadedDummyConnection con = new ThreadedDummyConnection();
PingManager pinger = new PingManager(con);
boolean pingSuccess = pinger.ping("test@myserver.com");
assertTrue(pingSuccess);
}
/**
* DummyConnection will not reply so it will timeout.
* @throws Exception
*/
@Test
public void checkFailedPingOnTimeout() throws Exception {
DummyConnection con = new DummyConnection();
PingManager pinger = new PingManager(con);
boolean pingSuccess = pinger.ping("test@myserver.com");
assertFalse(pingSuccess);
}
/**
* DummyConnection will not reply so it will timeout.
* @throws Exception
*/
@Test
public void checkFailedPingError() throws Exception {
ThreadedDummyConnection con = new ThreadedDummyConnection();
//@formatter:off
String reply =
"<iq type='error' id='qrzSp-16' to='test@myserver.com'>" +
"<ping xmlns='urn:xmpp:ping'/>" +
"<error type='cancel'>" +
"<service-unavailable xmlns='urn:ietf:params:xml:ns:xmpp-stanzas'/>" +
"</error>" +
"</iq>";
//@formatter:on
IQ serviceUnavailable = PacketParserUtils.parseIQ(TestUtils.getIQParser(reply), con);
con.addIQReply(serviceUnavailable);
PingManager pinger = new PingManager(con);
boolean pingSuccess = pinger.ping("test@myserver.com");
assertFalse(pingSuccess);
}
@Test
public void checkSuccessfulDiscoRequest() throws Exception {
ThreadedDummyConnection con = new ThreadedDummyConnection();
DiscoverInfo info = new DiscoverInfo();
info.addFeature(Ping.NAMESPACE);
//@formatter:off
String reply =
"<iq type='result' id='qrzSp-16' to='test@myserver.com'>" +
"<query xmlns='http://jabber.org/protocol/disco#info'><identity category='client' type='pc' name='Pidgin'/>" +
"<feature var='urn:xmpp:ping'/>" +
"</query></iq>";
//@formatter:on
IQ discoReply = PacketParserUtils.parseIQ(TestUtils.getIQParser(reply), con);
con.addIQReply(discoReply);
PingManager pinger = new PingManager(con);
boolean pingSupported = pinger.isPingSupported("test@myserver.com");
assertTrue(pingSupported);
}
@Test
public void checkUnuccessfulDiscoRequest() throws Exception {
ThreadedDummyConnection con = new ThreadedDummyConnection();
DiscoverInfo info = new DiscoverInfo();
info.addFeature(Ping.NAMESPACE);
//@formatter:off
String reply =
"<iq type='result' id='qrzSp-16' to='test@myserver.com'>" +
"<query xmlns='http://jabber.org/protocol/disco#info'><identity category='client' type='pc' name='Pidgin'/>" +
"<feature var='urn:xmpp:noping'/>" +
"</query></iq>";
//@formatter:on
IQ discoReply = PacketParserUtils.parseIQ(TestUtils.getIQParser(reply), con);
con.addIQReply(discoReply);
PingManager pinger = new PingManager(con);
boolean pingSupported = pinger.isPingSupported("test@myserver.com");
assertFalse(pingSupported);
}
}

View File

@ -83,7 +83,9 @@ public class ConfigureFormTest
Node node = mgr.getNode("princely_musings");
SmackConfiguration.setPacketReplyTimeout(100);
SmackConfiguration.setPacketReplyTimeout(100);
con.setTimeout();
node.getNodeConfiguration();
}
}