Move processPacket() method to XMPPConnection

This removes code duplication in BOSHConnection and TCPConnection's
PacketReader, by moving the common code to the superclass
XMPPConnection.
This commit is contained in:
Florian Schmaus 2014-03-14 00:25:22 +01:00
parent 89ed4d7492
commit 4b6f09f962
3 changed files with 71 additions and 134 deletions

View File

@ -21,14 +21,10 @@ import java.io.IOException;
import java.io.PipedReader; import java.io.PipedReader;
import java.io.PipedWriter; import java.io.PipedWriter;
import java.io.Writer; import java.io.Writer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import org.jivesoftware.smack.XMPPConnection; import org.jivesoftware.smack.XMPPConnection;
import org.jivesoftware.smack.ConnectionCreationListener; import org.jivesoftware.smack.ConnectionCreationListener;
import org.jivesoftware.smack.ConnectionListener; import org.jivesoftware.smack.ConnectionListener;
import org.jivesoftware.smack.PacketCollector;
import org.jivesoftware.smack.Roster; import org.jivesoftware.smack.Roster;
import org.jivesoftware.smack.XMPPException; import org.jivesoftware.smack.XMPPException;
import org.jivesoftware.smack.packet.Packet; import org.jivesoftware.smack.packet.Packet;
@ -84,11 +80,6 @@ public class BOSHConnection extends XMPPConnection {
private boolean wasAuthenticated = false; private boolean wasAuthenticated = false;
private boolean done = false; private boolean done = false;
/**
* The Thread environment for sending packet listeners.
*/
private ExecutorService listenerExecutor;
// The readerPipe and consumer thread are used for the debugger. // The readerPipe and consumer thread are used for the debugger.
private PipedWriter readerPipe; private PipedWriter readerPipe;
private Thread readerConsumer; private Thread readerConsumer;
@ -166,18 +157,6 @@ public class BOSHConnection extends XMPPConnection {
} }
client = BOSHClient.create(cfgBuilder.build()); client = BOSHClient.create(cfgBuilder.build());
// Create an executor to deliver incoming packets to listeners.
// We'll use a single thread with an unbounded queue.
listenerExecutor = Executors
.newSingleThreadExecutor(new ThreadFactory() {
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable,
"Smack Listener Processor ("
+ connectionCounterValue + ")");
thread.setDaemon(true);
return thread;
}
});
client.addBOSHClientConnListener(new BOSHConnectionListener(this)); client.addBOSHClientConnListener(new BOSHConnectionListener(this));
client.addBOSHClientResponseListener(new BOSHPacketReader(this)); client.addBOSHClientResponseListener(new BOSHPacketReader(this));
@ -517,10 +496,6 @@ public class BOSHConnection extends XMPPConnection {
writer = null; writer = null;
} }
// Shut down the listener executor.
if (listenerExecutor != null) {
listenerExecutor.shutdown();
}
readerConsumer = null; readerConsumer = null;
} }
@ -554,27 +529,6 @@ public class BOSHConnection extends XMPPConnection {
client.send(body); client.send(body);
} }
/**
* Processes a packet after it's been fully parsed by looping through the
* installed packet collectors and listeners and letting them examine the
* packet to see if they are a match with the filter.
*
* @param packet the packet to process.
*/
protected void processPacket(Packet packet) {
if (packet == null) {
return;
}
// Loop through all collectors and notify the appropriate ones.
for (PacketCollector collector : getPacketCollectors()) {
collector.processPacket(packet);
}
// Deliver the incoming packet to listeners.
listenerExecutor.submit(new ListenerNotification(packet));
}
/** /**
* Initialize the SmackDebugger which allows to log and debug XML traffic. * Initialize the SmackDebugger which allows to log and debug XML traffic.
*/ */
@ -738,22 +692,4 @@ public class BOSHConnection extends XMPPConnection {
} }
} }
} }
/**
* This class notifies all listeners that a packet was received.
*/
private class ListenerNotification implements Runnable {
private Packet packet;
public ListenerNotification(Packet packet) {
this.packet = packet;
}
public void run() {
for (ListenerWrapper listenerWrapper : recvListeners.values()) {
listenerWrapper.notifyListener(packet);
}
}
}
} }

View File

@ -27,11 +27,15 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
import org.jivesoftware.smack.compression.XMPPInputOutputStream; import org.jivesoftware.smack.compression.XMPPInputOutputStream;
@ -216,6 +220,19 @@ public abstract class XMPPConnection {
private final ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(2); private final ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(2);
/**
* Create an executor to deliver incoming packets to listeners. We'll use a single thread with an unbounded queue.
*/
private ExecutorService listenerExecutor = Executors.newSingleThreadExecutor(new ThreadFactory() {
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable,
"Smack Listener Processor (" + connectionCounterValue + ")");
thread.setDaemon(true);
return thread;
}
});
/** /**
* Create a new XMPPConnection to a XMPP server. * Create a new XMPPConnection to a XMPP server.
* *
@ -870,6 +887,49 @@ public abstract class XMPPConnection {
return packetReplyTimeout; return packetReplyTimeout;
} }
/**
* Processes a packet after it's been fully parsed by looping through the installed
* packet collectors and listeners and letting them examine the packet to see if
* they are a match with the filter.
*
* @param packet the packet to process.
*/
protected void processPacket(Packet packet) {
if (packet == null) {
return;
}
// Loop through all collectors and notify the appropriate ones.
for (PacketCollector collector: getPacketCollectors()) {
collector.processPacket(packet);
}
// Deliver the incoming packet to listeners.
listenerExecutor.submit(new ListenerNotification(packet));
}
/**
* A runnable to notify all listeners of a packet.
*/
private class ListenerNotification implements Runnable {
private Packet packet;
public ListenerNotification(Packet packet) {
this.packet = packet;
}
public void run() {
for (ListenerWrapper listenerWrapper : recvListeners.values()) {
try {
listenerWrapper.notifyListener(packet);
} catch (Exception e) {
LOGGER.log(Level.SEVERE, "Exception in packet listener", e);
}
}
}
}
/** /**
* A wrapper class to associate a packet filter with a listener. * A wrapper class to associate a packet filter with a listener.
*/ */

View File

@ -17,20 +17,20 @@
package org.jivesoftware.smack; package org.jivesoftware.smack;
import org.jivesoftware.smack.XMPPConnection.ListenerWrapper; import org.jivesoftware.smack.packet.IQ;
import org.jivesoftware.smack.packet.*; import org.jivesoftware.smack.packet.Packet;
import org.jivesoftware.smack.packet.Presence;
import org.jivesoftware.smack.packet.XMPPError;
import org.jivesoftware.smack.parsing.ParsingExceptionCallback; import org.jivesoftware.smack.parsing.ParsingExceptionCallback;
import org.jivesoftware.smack.parsing.UnparsablePacket; import org.jivesoftware.smack.parsing.UnparsablePacket;
import org.jivesoftware.smack.sasl.SASLMechanism.Challenge; import org.jivesoftware.smack.sasl.SASLMechanism.Challenge;
import org.jivesoftware.smack.sasl.SASLMechanism.Failure; import org.jivesoftware.smack.sasl.SASLMechanism.Failure;
import org.jivesoftware.smack.sasl.SASLMechanism.Success; import org.jivesoftware.smack.sasl.SASLMechanism.Success;
import org.jivesoftware.smack.util.PacketParserUtils; import org.jivesoftware.smack.util.PacketParserUtils;
import org.xmlpull.v1.XmlPullParserFactory; import org.xmlpull.v1.XmlPullParserFactory;
import org.xmlpull.v1.XmlPullParser; import org.xmlpull.v1.XmlPullParser;
import org.xmlpull.v1.XmlPullParserException; import org.xmlpull.v1.XmlPullParserException;
import java.util.concurrent.*;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
@ -47,7 +47,6 @@ class PacketReader {
private static final Logger LOGGER = Logger.getLogger(PacketReader.class.getName()); private static final Logger LOGGER = Logger.getLogger(PacketReader.class.getName());
private Thread readerThread; private Thread readerThread;
private ExecutorService listenerExecutor;
private TCPConnection connection; private TCPConnection connection;
private XmlPullParser parser; private XmlPullParser parser;
@ -76,18 +75,6 @@ class PacketReader {
readerThread.setName("Smack Packet Reader (" + connection.connectionCounterValue + ")"); readerThread.setName("Smack Packet Reader (" + connection.connectionCounterValue + ")");
readerThread.setDaemon(true); readerThread.setDaemon(true);
// Create an executor to deliver incoming packets to listeners. We'll use a single
// thread with an unbounded queue.
listenerExecutor = Executors.newSingleThreadExecutor(new ThreadFactory() {
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable,
"Smack Listener Processor (" + connection.connectionCounterValue + ")");
thread.setDaemon(true);
return thread;
}
});
resetParser(); resetParser();
} }
@ -140,9 +127,6 @@ class PacketReader {
} }
} }
done = true; done = true;
// Shut down the listener executor.
listenerExecutor.shutdown();
} }
/** /**
@ -185,7 +169,7 @@ class PacketReader {
} }
continue; continue;
} }
processPacket(packet); connection.processPacket(packet);
} }
else if (parser.getName().equals("iq")) { else if (parser.getName().equals("iq")) {
IQ iq; IQ iq;
@ -199,7 +183,7 @@ class PacketReader {
} }
continue; continue;
} }
processPacket(iq); connection.processPacket(iq);
} }
else if (parser.getName().equals("presence")) { else if (parser.getName().equals("presence")) {
Presence presence; Presence presence;
@ -213,7 +197,7 @@ class PacketReader {
} }
continue; continue;
} }
processPacket(presence); connection.processPacket(presence);
} }
// We found an opening stream. Record information about it, then notify // We found an opening stream. Record information about it, then notify
// the connectionID lock so that the packet reader startup can finish. // the connectionID lock so that the packet reader startup can finish.
@ -269,18 +253,18 @@ class PacketReader {
// SASL authentication has failed. The server may close the connection // SASL authentication has failed. The server may close the connection
// depending on the number of retries // depending on the number of retries
final Failure failure = PacketParserUtils.parseSASLFailure(parser); final Failure failure = PacketParserUtils.parseSASLFailure(parser);
processPacket(failure); connection.processPacket(failure);
connection.getSASLAuthentication().authenticationFailed(failure.getCondition()); connection.getSASLAuthentication().authenticationFailed(failure.getCondition());
} }
} }
else if (parser.getName().equals("challenge")) { else if (parser.getName().equals("challenge")) {
// The server is challenging the SASL authentication made by the client // The server is challenging the SASL authentication made by the client
String challengeData = parser.nextText(); String challengeData = parser.nextText();
processPacket(new Challenge(challengeData)); connection.processPacket(new Challenge(challengeData));
connection.getSASLAuthentication().challengeReceived(challengeData); connection.getSASLAuthentication().challengeReceived(challengeData);
} }
else if (parser.getName().equals("success")) { else if (parser.getName().equals("success")) {
processPacket(new Success(parser.nextText())); connection.processPacket(new Success(parser.nextText()));
// We now need to bind a resource for the connection // We now need to bind a resource for the connection
// Open a new stream and wait for the response // Open a new stream and wait for the response
connection.packetWriter.openStream(); connection.packetWriter.openStream();
@ -333,27 +317,6 @@ class PacketReader {
notify(); notify();
} }
/**
* Processes a packet after it's been fully parsed by looping through the installed
* packet collectors and listeners and letting them examine the packet to see if
* they are a match with the filter.
*
* @param packet the packet to process.
*/
private void processPacket(Packet packet) {
if (packet == null) {
return;
}
// Loop through all collectors and notify the appropriate ones.
for (PacketCollector collector: connection.getPacketCollectors()) {
collector.processPacket(packet);
}
// Deliver the incoming packet to listeners.
listenerExecutor.submit(new ListenerNotification(packet));
}
private void parseFeatures(XmlPullParser parser) throws Exception { private void parseFeatures(XmlPullParser parser) throws Exception {
boolean startTLSReceived = false; boolean startTLSReceived = false;
boolean startTLSRequired = false; boolean startTLSRequired = false;
@ -432,7 +395,7 @@ class PacketReader {
new XMPPError(XMPPError.Condition.forbidden)); new XMPPError(XMPPError.Condition.forbidden));
} }
} }
// Release the lock after TLS has been negotiated or we are not insterested in TLS // Release the lock after TLS has been negotiated or we are not insterested in TLS
if (!startTLSReceived || connection.getConfiguration().getSecurityMode() == if (!startTLSReceived || connection.getConfiguration().getSecurityMode() ==
ConnectionConfiguration.SecurityMode.disabled) ConnectionConfiguration.SecurityMode.disabled)
@ -440,26 +403,4 @@ class PacketReader {
releaseConnectionIDLock(); releaseConnectionIDLock();
} }
} }
/**
* A runnable to notify all listeners of a packet.
*/
private class ListenerNotification implements Runnable {
private Packet packet;
public ListenerNotification(Packet packet) {
this.packet = packet;
}
public void run() {
for (ListenerWrapper listenerWrapper : connection.recvListeners.values()) {
try {
listenerWrapper.notifyListener(packet);
} catch (Exception e) {
LOGGER.log(Level.SEVERE, "Exception in packet listener", e);
}
}
}
}
} }