From 1eb4841970008fca69bde71a6f464c7f5f157cfa Mon Sep 17 00:00:00 2001 From: Matt Tucker Date: Wed, 21 Feb 2007 00:57:31 +0000 Subject: [PATCH] New listener implementation for packet reader (SMACK-205). git-svn-id: http://svn.igniterealtime.org/svn/repos/smack/trunk@7232 b35dd754-fafc-0310-a699-88a17e54d16e --- .../org/jivesoftware/smack/PacketReader.java | 165 ++++++------------ .../org/jivesoftware/smack/PacketWriter.java | 17 +- source/org/jivesoftware/smack/Roster.java | 3 - .../jivesoftware/smack/XMPPConnection.java | 5 + 4 files changed, 60 insertions(+), 130 deletions(-) diff --git a/source/org/jivesoftware/smack/PacketReader.java b/source/org/jivesoftware/smack/PacketReader.java index 99a3f2718..09009d9ff 100644 --- a/source/org/jivesoftware/smack/PacketReader.java +++ b/source/org/jivesoftware/smack/PacketReader.java @@ -44,7 +44,7 @@ import java.util.concurrent.*; class PacketReader { private Thread readerThread; - private Thread listenerThread; + private ExecutorService listenerExecutor; private XMPPConnection connection; private XmlPullParser parser; @@ -58,7 +58,7 @@ class PacketReader { private String connectionID = null; private Semaphore connectionSemaphore; - protected PacketReader(XMPPConnection connection) { + protected PacketReader(final XMPPConnection connection) { this.connection = connection; this.init(); } @@ -79,18 +79,17 @@ class PacketReader { readerThread.setName("Smack Packet Reader (" + connection.connectionCounterValue + ")"); readerThread.setDaemon(true); - listenerThread = new Thread() { - public void run() { - try { - processListeners(this); - } - catch (Exception e) { - e.printStackTrace(); - } + // Create an executor to deliver incoming packets to listeners. We'll use a single + // thread with an unbounded queue. + listenerExecutor = Executors.newSingleThreadExecutor(new ThreadFactory() { + + public Thread newThread(Runnable runnable) { + Thread thread = new Thread(runnable, + "Smack Listener Processor (" + connection.connectionCounterValue + ")"); + thread.setDaemon(true); + return thread; } - }; - listenerThread.setName("Smack Listener Processor (" + connection.connectionCounterValue + ")"); - listenerThread.setDaemon(true); + }); resetParser(); } @@ -121,7 +120,7 @@ class PacketReader { * @param packetFilter the packet filter to use. */ public void addPacketListener(PacketListener packetListener, PacketFilter packetFilter) { - ListenerWrapper wrapper = new ListenerWrapper(this, packetListener, packetFilter); + ListenerWrapper wrapper = new ListenerWrapper(packetListener, packetFilter); listeners.put(packetListener, wrapper); } @@ -131,10 +130,7 @@ class PacketReader { * @param packetListener the packet listener to remove. */ public void removePacketListener(PacketListener packetListener) { - ListenerWrapper wrapper = listeners.remove(packetListener); - if (wrapper != null) { - wrapper.cancel(); - } + listeners.remove(packetListener); } /** @@ -149,7 +145,6 @@ class PacketReader { connectionSemaphore = new Semaphore(1); readerThread.start(); - listenerThread.start(); // Wait for stream tag before returing. We'll wait a couple of seconds before // giving up and throwing an error. try { @@ -192,10 +187,8 @@ class PacketReader { } done = true; - // Make sure that the listenerThread is awake to shutdown properly - synchronized (listenerThread) { - listenerThread.notifyAll(); - } + // Shut down the listener executor. + listenerExecutor.shutdown(); } /** @@ -230,11 +223,6 @@ class PacketReader { e2.printStackTrace(); } } - - // Make sure that the listenerThread is awake to shutdown properly - synchronized (listenerThread) { - listenerThread.notifyAll(); - } } /** @@ -242,25 +230,15 @@ class PacketReader { */ protected void notifyReconnection() { // Notify connection listeners of the reconnection. - List listenersCopy; - synchronized (connectionListeners) { - // Make a copy since it's possible that a listener will be removed from the list - listenersCopy = new ArrayList(connectionListeners); - for (ConnectionListener listener : listenersCopy) { - try { - listener.reconnectionSuccessful(); - } - catch (Exception e) { - // Cath and print any exception so we can recover - // from a faulty listener - e.printStackTrace(); - } + for (ConnectionListener listener : connectionListeners) { + try { + listener.reconnectionSuccessful(); + } + catch (Exception e) { + // Cath and print any exception so we can recover + // from a faulty listener + e.printStackTrace(); } - } - - // Make sure that the listenerThread is awake to shutdown properly - synchronized (listenerThread) { - listenerThread.notifyAll(); } } @@ -280,31 +258,6 @@ class PacketReader { } } - /** - * Process listeners. - * - * @param thread the thread that is being used by the reader to process incoming packets. - */ - private void processListeners(Thread thread) { - while (!done && thread == listenerThread) { - boolean processedPacket = false; - for (ListenerWrapper wrapper: listeners.values()) { - processedPacket = processedPacket || wrapper.notifyListener(); - } - if (!processedPacket) { - try { - // Wait until more packets are ready to be processed. - synchronized (listenerThread) { - listenerThread.wait(); - } - } - catch (InterruptedException ie) { - // Ignore. - } - } - } - } - /** * Parse top-level packets in order to process them further. * @@ -454,10 +407,8 @@ class PacketReader { collector.processPacket(packet); } - // Notify the listener thread that packets are waiting. - synchronized (listenerThread) { - listenerThread.notifyAll(); - } + // Deliver the incoming packet to listeners. + listenerExecutor.submit(new ListenerNotification(packet)); } private StreamError parseStreamError(XmlPullParser parser) throws IOException, @@ -788,7 +739,7 @@ class PacketReader { else { registration.setInstructions(value); } -} + } // Otherwise, it must be a packet extension. else { registration.addExtension( @@ -809,7 +760,8 @@ class PacketReader { } private Bind parseResourceBinding(XmlPullParser parser) throws IOException, - XmlPullParserException { + XmlPullParserException + { Bind bind = new Bind(); boolean done = false; while (!done) { @@ -832,49 +784,40 @@ class PacketReader { } /** - * A wrapper class to associate a packet collector with a listener. + * A runnable to notify all listeners of a packet. + */ + private class ListenerNotification implements Runnable { + + private Packet packet; + + public ListenerNotification(Packet packet) { + this.packet = packet; + } + + public void run() { + for (ListenerWrapper listenerWrapper : listeners.values()) { + listenerWrapper.notifyListener(packet); + } + } + } + + /** + * A wrapper class to associate a packet filter with a listener. */ private static class ListenerWrapper { private PacketListener packetListener; - private PacketCollector packetCollector; + private PacketFilter packetFilter; - public ListenerWrapper(PacketReader packetReader, PacketListener packetListener, - PacketFilter packetFilter) - { + public ListenerWrapper(PacketListener packetListener, PacketFilter packetFilter) { this.packetListener = packetListener; - this.packetCollector = packetReader.createPacketCollector(packetFilter); + this.packetFilter = packetFilter; } - - public boolean equals(Object object) { - if (object == null) { - return false; - } - if (object instanceof ListenerWrapper) { - return ((ListenerWrapper)object).packetListener.equals(this.packetListener); - } - // If the packet listener is equal to the wrapped packet listener, return true. - else if (object instanceof PacketListener) { - return object.equals(this.packetListener); - } - return false; - } - - public boolean notifyListener() { - Packet packet = packetCollector.pollResult(); - if (packet != null) { + + public void notifyListener(Packet packet) { + if (packetFilter == null || packetFilter.accept(packet)) { packetListener.processPacket(packet); - return true; } - else { - return false; - } - } - - public void cancel() { - packetCollector.cancel(); - packetCollector = null; - packetListener = null; } } } \ No newline at end of file diff --git a/source/org/jivesoftware/smack/PacketWriter.java b/source/org/jivesoftware/smack/PacketWriter.java index 8ff3a19c0..a52ccee1b 100644 --- a/source/org/jivesoftware/smack/PacketWriter.java +++ b/source/org/jivesoftware/smack/PacketWriter.java @@ -354,26 +354,11 @@ class PacketWriter { private PacketListener packetListener; private PacketFilter packetFilter; - public ListenerWrapper(PacketListener packetListener, - PacketFilter packetFilter) - { + public ListenerWrapper(PacketListener packetListener, PacketFilter packetFilter) { this.packetListener = packetListener; this.packetFilter = packetFilter; } - public boolean equals(Object object) { - if (object == null) { - return false; - } - if (object instanceof ListenerWrapper) { - return ((ListenerWrapper)object).packetListener.equals(this.packetListener); - } - else if (object instanceof PacketListener) { - return object.equals(this.packetListener); - } - return false; - } - public void notifyListener(Packet packet) { if (packetFilter == null || packetFilter.accept(packet)) { packetListener.processPacket(packet); diff --git a/source/org/jivesoftware/smack/Roster.java b/source/org/jivesoftware/smack/Roster.java index c7424a9ec..27597a566 100644 --- a/source/org/jivesoftware/smack/Roster.java +++ b/source/org/jivesoftware/smack/Roster.java @@ -292,9 +292,6 @@ public class Roster { else if (response.getType() == IQ.Type.ERROR) { throw new XMPPException(response.getError()); } - else { - - } } /** diff --git a/source/org/jivesoftware/smack/XMPPConnection.java b/source/org/jivesoftware/smack/XMPPConnection.java index 829823153..45d2de282 100644 --- a/source/org/jivesoftware/smack/XMPPConnection.java +++ b/source/org/jivesoftware/smack/XMPPConnection.java @@ -631,6 +631,11 @@ public class XMPPConnection { * @param unavailablePresence the presence packet to send during shutdown. */ public void disconnect(Presence unavailablePresence) { + // If not connected, ignore this request. + if (packetReader == null || packetWriter == null) { + return; + } + shutdown(unavailablePresence); if (roster != null) {