diff --git a/bosh/src/main/java/org/jivesoftware/smack/BOSHConnection.java b/bosh/src/main/java/org/jivesoftware/smack/BOSHConnection.java index 10eaf81cc..5a0b8d3bc 100644 --- a/bosh/src/main/java/org/jivesoftware/smack/BOSHConnection.java +++ b/bosh/src/main/java/org/jivesoftware/smack/BOSHConnection.java @@ -21,14 +21,10 @@ import java.io.IOException; import java.io.PipedReader; import java.io.PipedWriter; import java.io.Writer; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; import org.jivesoftware.smack.XMPPConnection; import org.jivesoftware.smack.ConnectionCreationListener; import org.jivesoftware.smack.ConnectionListener; -import org.jivesoftware.smack.PacketCollector; import org.jivesoftware.smack.Roster; import org.jivesoftware.smack.XMPPException; import org.jivesoftware.smack.packet.Packet; @@ -84,11 +80,6 @@ public class BOSHConnection extends XMPPConnection { private boolean wasAuthenticated = false; private boolean done = false; - /** - * The Thread environment for sending packet listeners. - */ - private ExecutorService listenerExecutor; - // The readerPipe and consumer thread are used for the debugger. private PipedWriter readerPipe; private Thread readerConsumer; @@ -166,18 +157,6 @@ public class BOSHConnection extends XMPPConnection { } client = BOSHClient.create(cfgBuilder.build()); - // Create an executor to deliver incoming packets to listeners. - // We'll use a single thread with an unbounded queue. - listenerExecutor = Executors - .newSingleThreadExecutor(new ThreadFactory() { - public Thread newThread(Runnable runnable) { - Thread thread = new Thread(runnable, - "Smack Listener Processor (" - + connectionCounterValue + ")"); - thread.setDaemon(true); - return thread; - } - }); client.addBOSHClientConnListener(new BOSHConnectionListener(this)); client.addBOSHClientResponseListener(new BOSHPacketReader(this)); @@ -517,10 +496,6 @@ public class BOSHConnection extends XMPPConnection { writer = null; } - // Shut down the listener executor. - if (listenerExecutor != null) { - listenerExecutor.shutdown(); - } readerConsumer = null; } @@ -554,27 +529,6 @@ public class BOSHConnection extends XMPPConnection { client.send(body); } - /** - * Processes a packet after it's been fully parsed by looping through the - * installed packet collectors and listeners and letting them examine the - * packet to see if they are a match with the filter. - * - * @param packet the packet to process. - */ - protected void processPacket(Packet packet) { - if (packet == null) { - return; - } - - // Loop through all collectors and notify the appropriate ones. - for (PacketCollector collector : getPacketCollectors()) { - collector.processPacket(packet); - } - - // Deliver the incoming packet to listeners. - listenerExecutor.submit(new ListenerNotification(packet)); - } - /** * Initialize the SmackDebugger which allows to log and debug XML traffic. */ @@ -738,22 +692,4 @@ public class BOSHConnection extends XMPPConnection { } } } - - /** - * This class notifies all listeners that a packet was received. - */ - private class ListenerNotification implements Runnable { - - private Packet packet; - - public ListenerNotification(Packet packet) { - this.packet = packet; - } - - public void run() { - for (ListenerWrapper listenerWrapper : recvListeners.values()) { - listenerWrapper.notifyListener(packet); - } - } - } } diff --git a/core/src/main/java/org/jivesoftware/smack/XMPPConnection.java b/core/src/main/java/org/jivesoftware/smack/XMPPConnection.java index 65aacd852..eb8190a0e 100644 --- a/core/src/main/java/org/jivesoftware/smack/XMPPConnection.java +++ b/core/src/main/java/org/jivesoftware/smack/XMPPConnection.java @@ -27,11 +27,15 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Level; import java.util.logging.Logger; import org.jivesoftware.smack.compression.XMPPInputOutputStream; @@ -216,6 +220,19 @@ public abstract class XMPPConnection { private final ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(2); + /** + * Create an executor to deliver incoming packets to listeners. We'll use a single thread with an unbounded queue. + */ + private ExecutorService listenerExecutor = Executors.newSingleThreadExecutor(new ThreadFactory() { + + public Thread newThread(Runnable runnable) { + Thread thread = new Thread(runnable, + "Smack Listener Processor (" + connectionCounterValue + ")"); + thread.setDaemon(true); + return thread; + } + }); + /** * Create a new XMPPConnection to a XMPP server. * @@ -870,6 +887,49 @@ public abstract class XMPPConnection { return packetReplyTimeout; } + /** + * Processes a packet after it's been fully parsed by looping through the installed + * packet collectors and listeners and letting them examine the packet to see if + * they are a match with the filter. + * + * @param packet the packet to process. + */ + protected void processPacket(Packet packet) { + if (packet == null) { + return; + } + + // Loop through all collectors and notify the appropriate ones. + for (PacketCollector collector: getPacketCollectors()) { + collector.processPacket(packet); + } + + // Deliver the incoming packet to listeners. + listenerExecutor.submit(new ListenerNotification(packet)); + } + + /** + * A runnable to notify all listeners of a packet. + */ + private class ListenerNotification implements Runnable { + + private Packet packet; + + public ListenerNotification(Packet packet) { + this.packet = packet; + } + + public void run() { + for (ListenerWrapper listenerWrapper : recvListeners.values()) { + try { + listenerWrapper.notifyListener(packet); + } catch (Exception e) { + LOGGER.log(Level.SEVERE, "Exception in packet listener", e); + } + } + } + } + /** * A wrapper class to associate a packet filter with a listener. */ diff --git a/tcp/src/main/java/org/jivesoftware/smack/PacketReader.java b/tcp/src/main/java/org/jivesoftware/smack/PacketReader.java index cd245f963..76159ee06 100644 --- a/tcp/src/main/java/org/jivesoftware/smack/PacketReader.java +++ b/tcp/src/main/java/org/jivesoftware/smack/PacketReader.java @@ -17,20 +17,20 @@ package org.jivesoftware.smack; -import org.jivesoftware.smack.XMPPConnection.ListenerWrapper; -import org.jivesoftware.smack.packet.*; +import org.jivesoftware.smack.packet.IQ; +import org.jivesoftware.smack.packet.Packet; +import org.jivesoftware.smack.packet.Presence; +import org.jivesoftware.smack.packet.XMPPError; import org.jivesoftware.smack.parsing.ParsingExceptionCallback; import org.jivesoftware.smack.parsing.UnparsablePacket; import org.jivesoftware.smack.sasl.SASLMechanism.Challenge; import org.jivesoftware.smack.sasl.SASLMechanism.Failure; import org.jivesoftware.smack.sasl.SASLMechanism.Success; import org.jivesoftware.smack.util.PacketParserUtils; - import org.xmlpull.v1.XmlPullParserFactory; import org.xmlpull.v1.XmlPullParser; import org.xmlpull.v1.XmlPullParserException; -import java.util.concurrent.*; import java.util.logging.Level; import java.util.logging.Logger; @@ -47,7 +47,6 @@ class PacketReader { private static final Logger LOGGER = Logger.getLogger(PacketReader.class.getName()); private Thread readerThread; - private ExecutorService listenerExecutor; private TCPConnection connection; private XmlPullParser parser; @@ -76,18 +75,6 @@ class PacketReader { readerThread.setName("Smack Packet Reader (" + connection.connectionCounterValue + ")"); readerThread.setDaemon(true); - // Create an executor to deliver incoming packets to listeners. We'll use a single - // thread with an unbounded queue. - listenerExecutor = Executors.newSingleThreadExecutor(new ThreadFactory() { - - public Thread newThread(Runnable runnable) { - Thread thread = new Thread(runnable, - "Smack Listener Processor (" + connection.connectionCounterValue + ")"); - thread.setDaemon(true); - return thread; - } - }); - resetParser(); } @@ -140,9 +127,6 @@ class PacketReader { } } done = true; - - // Shut down the listener executor. - listenerExecutor.shutdown(); } /** @@ -185,7 +169,7 @@ class PacketReader { } continue; } - processPacket(packet); + connection.processPacket(packet); } else if (parser.getName().equals("iq")) { IQ iq; @@ -199,7 +183,7 @@ class PacketReader { } continue; } - processPacket(iq); + connection.processPacket(iq); } else if (parser.getName().equals("presence")) { Presence presence; @@ -213,7 +197,7 @@ class PacketReader { } continue; } - processPacket(presence); + connection.processPacket(presence); } // We found an opening stream. Record information about it, then notify // the connectionID lock so that the packet reader startup can finish. @@ -269,18 +253,18 @@ class PacketReader { // SASL authentication has failed. The server may close the connection // depending on the number of retries final Failure failure = PacketParserUtils.parseSASLFailure(parser); - processPacket(failure); + connection.processPacket(failure); connection.getSASLAuthentication().authenticationFailed(failure.getCondition()); } } else if (parser.getName().equals("challenge")) { // The server is challenging the SASL authentication made by the client String challengeData = parser.nextText(); - processPacket(new Challenge(challengeData)); + connection.processPacket(new Challenge(challengeData)); connection.getSASLAuthentication().challengeReceived(challengeData); } else if (parser.getName().equals("success")) { - processPacket(new Success(parser.nextText())); + connection.processPacket(new Success(parser.nextText())); // We now need to bind a resource for the connection // Open a new stream and wait for the response connection.packetWriter.openStream(); @@ -333,27 +317,6 @@ class PacketReader { notify(); } - /** - * Processes a packet after it's been fully parsed by looping through the installed - * packet collectors and listeners and letting them examine the packet to see if - * they are a match with the filter. - * - * @param packet the packet to process. - */ - private void processPacket(Packet packet) { - if (packet == null) { - return; - } - - // Loop through all collectors and notify the appropriate ones. - for (PacketCollector collector: connection.getPacketCollectors()) { - collector.processPacket(packet); - } - - // Deliver the incoming packet to listeners. - listenerExecutor.submit(new ListenerNotification(packet)); - } - private void parseFeatures(XmlPullParser parser) throws Exception { boolean startTLSReceived = false; boolean startTLSRequired = false; @@ -432,7 +395,7 @@ class PacketReader { new XMPPError(XMPPError.Condition.forbidden)); } } - + // Release the lock after TLS has been negotiated or we are not insterested in TLS if (!startTLSReceived || connection.getConfiguration().getSecurityMode() == ConnectionConfiguration.SecurityMode.disabled) @@ -440,26 +403,4 @@ class PacketReader { releaseConnectionIDLock(); } } - - /** - * A runnable to notify all listeners of a packet. - */ - private class ListenerNotification implements Runnable { - - private Packet packet; - - public ListenerNotification(Packet packet) { - this.packet = packet; - } - - public void run() { - for (ListenerWrapper listenerWrapper : connection.recvListeners.values()) { - try { - listenerWrapper.notifyListener(packet); - } catch (Exception e) { - LOGGER.log(Level.SEVERE, "Exception in packet listener", e); - } - } - } - } }