1
0
Fork 0
mirror of https://github.com/vanitasvitae/Smack.git synced 2024-11-23 04:22:05 +01:00

New listener implementation for packet reader (SMACK-205).

git-svn-id: http://svn.igniterealtime.org/svn/repos/smack/trunk@7232 b35dd754-fafc-0310-a699-88a17e54d16e
This commit is contained in:
Matt Tucker 2007-02-21 00:57:31 +00:00 committed by matt
parent 9356de64bd
commit 1eb4841970
4 changed files with 60 additions and 130 deletions

View file

@ -44,7 +44,7 @@ import java.util.concurrent.*;
class PacketReader { class PacketReader {
private Thread readerThread; private Thread readerThread;
private Thread listenerThread; private ExecutorService listenerExecutor;
private XMPPConnection connection; private XMPPConnection connection;
private XmlPullParser parser; private XmlPullParser parser;
@ -58,7 +58,7 @@ class PacketReader {
private String connectionID = null; private String connectionID = null;
private Semaphore connectionSemaphore; private Semaphore connectionSemaphore;
protected PacketReader(XMPPConnection connection) { protected PacketReader(final XMPPConnection connection) {
this.connection = connection; this.connection = connection;
this.init(); this.init();
} }
@ -79,18 +79,17 @@ class PacketReader {
readerThread.setName("Smack Packet Reader (" + connection.connectionCounterValue + ")"); readerThread.setName("Smack Packet Reader (" + connection.connectionCounterValue + ")");
readerThread.setDaemon(true); readerThread.setDaemon(true);
listenerThread = new Thread() { // Create an executor to deliver incoming packets to listeners. We'll use a single
public void run() { // thread with an unbounded queue.
try { listenerExecutor = Executors.newSingleThreadExecutor(new ThreadFactory() {
processListeners(this);
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable,
"Smack Listener Processor (" + connection.connectionCounterValue + ")");
thread.setDaemon(true);
return thread;
} }
catch (Exception e) { });
e.printStackTrace();
}
}
};
listenerThread.setName("Smack Listener Processor (" + connection.connectionCounterValue + ")");
listenerThread.setDaemon(true);
resetParser(); resetParser();
} }
@ -121,7 +120,7 @@ class PacketReader {
* @param packetFilter the packet filter to use. * @param packetFilter the packet filter to use.
*/ */
public void addPacketListener(PacketListener packetListener, PacketFilter packetFilter) { public void addPacketListener(PacketListener packetListener, PacketFilter packetFilter) {
ListenerWrapper wrapper = new ListenerWrapper(this, packetListener, packetFilter); ListenerWrapper wrapper = new ListenerWrapper(packetListener, packetFilter);
listeners.put(packetListener, wrapper); listeners.put(packetListener, wrapper);
} }
@ -131,10 +130,7 @@ class PacketReader {
* @param packetListener the packet listener to remove. * @param packetListener the packet listener to remove.
*/ */
public void removePacketListener(PacketListener packetListener) { public void removePacketListener(PacketListener packetListener) {
ListenerWrapper wrapper = listeners.remove(packetListener); listeners.remove(packetListener);
if (wrapper != null) {
wrapper.cancel();
}
} }
/** /**
@ -149,7 +145,6 @@ class PacketReader {
connectionSemaphore = new Semaphore(1); connectionSemaphore = new Semaphore(1);
readerThread.start(); readerThread.start();
listenerThread.start();
// Wait for stream tag before returing. We'll wait a couple of seconds before // Wait for stream tag before returing. We'll wait a couple of seconds before
// giving up and throwing an error. // giving up and throwing an error.
try { try {
@ -192,10 +187,8 @@ class PacketReader {
} }
done = true; done = true;
// Make sure that the listenerThread is awake to shutdown properly // Shut down the listener executor.
synchronized (listenerThread) { listenerExecutor.shutdown();
listenerThread.notifyAll();
}
} }
/** /**
@ -230,11 +223,6 @@ class PacketReader {
e2.printStackTrace(); e2.printStackTrace();
} }
} }
// Make sure that the listenerThread is awake to shutdown properly
synchronized (listenerThread) {
listenerThread.notifyAll();
}
} }
/** /**
@ -242,11 +230,7 @@ class PacketReader {
*/ */
protected void notifyReconnection() { protected void notifyReconnection() {
// Notify connection listeners of the reconnection. // Notify connection listeners of the reconnection.
List<ConnectionListener> listenersCopy; for (ConnectionListener listener : connectionListeners) {
synchronized (connectionListeners) {
// Make a copy since it's possible that a listener will be removed from the list
listenersCopy = new ArrayList<ConnectionListener>(connectionListeners);
for (ConnectionListener listener : listenersCopy) {
try { try {
listener.reconnectionSuccessful(); listener.reconnectionSuccessful();
} }
@ -258,12 +242,6 @@ class PacketReader {
} }
} }
// Make sure that the listenerThread is awake to shutdown properly
synchronized (listenerThread) {
listenerThread.notifyAll();
}
}
/** /**
* Resets the parser using the latest connection's reader. Reseting the parser is necessary * Resets the parser using the latest connection's reader. Reseting the parser is necessary
* when the plain connection has been secured or when a new opening stream element is going * when the plain connection has been secured or when a new opening stream element is going
@ -280,31 +258,6 @@ class PacketReader {
} }
} }
/**
* Process listeners.
*
* @param thread the thread that is being used by the reader to process incoming packets.
*/
private void processListeners(Thread thread) {
while (!done && thread == listenerThread) {
boolean processedPacket = false;
for (ListenerWrapper wrapper: listeners.values()) {
processedPacket = processedPacket || wrapper.notifyListener();
}
if (!processedPacket) {
try {
// Wait until more packets are ready to be processed.
synchronized (listenerThread) {
listenerThread.wait();
}
}
catch (InterruptedException ie) {
// Ignore.
}
}
}
}
/** /**
* Parse top-level packets in order to process them further. * Parse top-level packets in order to process them further.
* *
@ -454,10 +407,8 @@ class PacketReader {
collector.processPacket(packet); collector.processPacket(packet);
} }
// Notify the listener thread that packets are waiting. // Deliver the incoming packet to listeners.
synchronized (listenerThread) { listenerExecutor.submit(new ListenerNotification(packet));
listenerThread.notifyAll();
}
} }
private StreamError parseStreamError(XmlPullParser parser) throws IOException, private StreamError parseStreamError(XmlPullParser parser) throws IOException,
@ -809,7 +760,8 @@ class PacketReader {
} }
private Bind parseResourceBinding(XmlPullParser parser) throws IOException, private Bind parseResourceBinding(XmlPullParser parser) throws IOException,
XmlPullParserException { XmlPullParserException
{
Bind bind = new Bind(); Bind bind = new Bind();
boolean done = false; boolean done = false;
while (!done) { while (!done) {
@ -832,49 +784,40 @@ class PacketReader {
} }
/** /**
* A wrapper class to associate a packet collector with a listener. * A runnable to notify all listeners of a packet.
*/
private class ListenerNotification implements Runnable {
private Packet packet;
public ListenerNotification(Packet packet) {
this.packet = packet;
}
public void run() {
for (ListenerWrapper listenerWrapper : listeners.values()) {
listenerWrapper.notifyListener(packet);
}
}
}
/**
* A wrapper class to associate a packet filter with a listener.
*/ */
private static class ListenerWrapper { private static class ListenerWrapper {
private PacketListener packetListener; private PacketListener packetListener;
private PacketCollector packetCollector; private PacketFilter packetFilter;
public ListenerWrapper(PacketReader packetReader, PacketListener packetListener, public ListenerWrapper(PacketListener packetListener, PacketFilter packetFilter) {
PacketFilter packetFilter)
{
this.packetListener = packetListener; this.packetListener = packetListener;
this.packetCollector = packetReader.createPacketCollector(packetFilter); this.packetFilter = packetFilter;
} }
public boolean equals(Object object) { public void notifyListener(Packet packet) {
if (object == null) { if (packetFilter == null || packetFilter.accept(packet)) {
return false;
}
if (object instanceof ListenerWrapper) {
return ((ListenerWrapper)object).packetListener.equals(this.packetListener);
}
// If the packet listener is equal to the wrapped packet listener, return true.
else if (object instanceof PacketListener) {
return object.equals(this.packetListener);
}
return false;
}
public boolean notifyListener() {
Packet packet = packetCollector.pollResult();
if (packet != null) {
packetListener.processPacket(packet); packetListener.processPacket(packet);
return true; }
}
else {
return false;
}
}
public void cancel() {
packetCollector.cancel();
packetCollector = null;
packetListener = null;
} }
} }
} }

View file

@ -354,26 +354,11 @@ class PacketWriter {
private PacketListener packetListener; private PacketListener packetListener;
private PacketFilter packetFilter; private PacketFilter packetFilter;
public ListenerWrapper(PacketListener packetListener, public ListenerWrapper(PacketListener packetListener, PacketFilter packetFilter) {
PacketFilter packetFilter)
{
this.packetListener = packetListener; this.packetListener = packetListener;
this.packetFilter = packetFilter; this.packetFilter = packetFilter;
} }
public boolean equals(Object object) {
if (object == null) {
return false;
}
if (object instanceof ListenerWrapper) {
return ((ListenerWrapper)object).packetListener.equals(this.packetListener);
}
else if (object instanceof PacketListener) {
return object.equals(this.packetListener);
}
return false;
}
public void notifyListener(Packet packet) { public void notifyListener(Packet packet) {
if (packetFilter == null || packetFilter.accept(packet)) { if (packetFilter == null || packetFilter.accept(packet)) {
packetListener.processPacket(packet); packetListener.processPacket(packet);

View file

@ -292,9 +292,6 @@ public class Roster {
else if (response.getType() == IQ.Type.ERROR) { else if (response.getType() == IQ.Type.ERROR) {
throw new XMPPException(response.getError()); throw new XMPPException(response.getError());
} }
else {
}
} }
/** /**

View file

@ -631,6 +631,11 @@ public class XMPPConnection {
* @param unavailablePresence the presence packet to send during shutdown. * @param unavailablePresence the presence packet to send during shutdown.
*/ */
public void disconnect(Presence unavailablePresence) { public void disconnect(Presence unavailablePresence) {
// If not connected, ignore this request.
if (packetReader == null || packetWriter == null) {
return;
}
shutdown(unavailablePresence); shutdown(unavailablePresence);
if (roster != null) { if (roster != null) {