diff --git a/smack-bosh/src/main/java/org/jivesoftware/smack/bosh/XMPPBOSHConnection.java b/smack-bosh/src/main/java/org/jivesoftware/smack/bosh/XMPPBOSHConnection.java
index 6539686fb..b03f03c33 100644
--- a/smack-bosh/src/main/java/org/jivesoftware/smack/bosh/XMPPBOSHConnection.java
+++ b/smack-bosh/src/main/java/org/jivesoftware/smack/bosh/XMPPBOSHConnection.java
@@ -309,6 +309,9 @@ public class XMPPBOSHConnection extends AbstractXMPPConnection {
private void sendElement(Element element) {
try {
send(ComposableBody.builder().setPayloadXML(element.toXML().toString()).build());
+ if (element instanceof Packet) {
+ firePacketSendingListeners((Packet) element);
+ }
}
catch (BOSHException e) {
LOGGER.log(Level.SEVERE, "BOSHException in sendPacketInternal", e);
diff --git a/smack-core/src/main/java/org/jivesoftware/smack/AbstractXMPPConnection.java b/smack-core/src/main/java/org/jivesoftware/smack/AbstractXMPPConnection.java
index 5904ace57..4cf52260e 100644
--- a/smack-core/src/main/java/org/jivesoftware/smack/AbstractXMPPConnection.java
+++ b/smack-core/src/main/java/org/jivesoftware/smack/AbstractXMPPConnection.java
@@ -63,6 +63,7 @@ import org.jivesoftware.smack.packet.PlainStreamElement;
import org.jivesoftware.smack.provider.PacketExtensionProvider;
import org.jivesoftware.smack.provider.ProviderManager;
import org.jivesoftware.smack.rosterstore.RosterStore;
+import org.jivesoftware.smack.util.Async;
import org.jivesoftware.smack.util.PacketParserUtils;
import org.jxmpp.util.XmppStringUtils;
import org.xmlpull.v1.XmlPullParser;
@@ -506,9 +507,6 @@ public abstract class AbstractXMPPConnection implements XMPPConnection {
// the content of the packet.
firePacketInterceptors(packet);
sendPacketInternal(packet);
- // Process packet writer listeners. Note that we're using the sending thread so it's
- // expected that listeners are fast.
- firePacketSendingListeners(packet);
}
@Override
@@ -703,11 +701,15 @@ public abstract class AbstractXMPPConnection implements XMPPConnection {
/**
* Process all packet listeners for sending packets.
+ *
+ * Compared to {@link #firePacketInterceptors(Packet)}, the listeners will be invoked in a new thread.
+ *
*
* @param packet the packet to process.
*/
- private void firePacketSendingListeners(Packet packet) {
- List listenersToNotify = new LinkedList();
+ @SuppressWarnings("javadoc")
+ protected void firePacketSendingListeners(final Packet packet) {
+ final List listenersToNotify = new LinkedList();
synchronized (sendListeners) {
for (ListenerWrapper listenerWrapper : sendListeners.values()) {
if (listenerWrapper.filterMatches(packet)) {
@@ -715,15 +717,24 @@ public abstract class AbstractXMPPConnection implements XMPPConnection {
}
}
}
- for (PacketListener listener : listenersToNotify) {
- try {
- listener.processPacket(packet);
- }
- catch (NotConnectedException e) {
- LOGGER.log(Level.WARNING, "Got not connected exception, aborting");
- break;
- }
+ if (listenersToNotify.isEmpty()) {
+ return;
}
+ // Notify in a new thread, because we can
+ Async.go(new Runnable() {
+ @Override
+ public void run() {
+ for (PacketListener listener : listenersToNotify) {
+ try {
+ listener.processPacket(packet);
+ }
+ catch (Exception e) {
+ LOGGER.log(Level.WARNING, "Sending listener threw exception", e);
+ continue;
+ }
+ }
+ }}
+ , "Smack Sending Listeners Notification (" + getConnectionCounter() + ')');
}
@Override
diff --git a/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java b/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java
index 404b7b8f2..07c31b170 100644
--- a/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java
+++ b/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java
@@ -1347,30 +1347,38 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
initalOpenStreamSend.reportSuccess();
// Write out packets from the queue.
while (!done()) {
- Element packet = nextStreamElement();
- if (packet != null) {
- // Check if the stream element should be put to the unacknowledgedStanza
- // queue. Note that we can not do the put() in sendPacketInternal() and the
- // packet order is not stable at this point (sendPacketInternal() can be
- // called concurrently).
- if (isSmEnabled() && packet instanceof Packet) {
- // If the unacknowledgedStanza queue is nearly full, request an new ack
- // from the server in order to drain it
- if (unacknowledgedStanzas.size() == 0.8 * XMPPTCPConnection.QUEUE_SIZE) {
- writer.write(AckRequest.INSTANCE.toXML().toString());
- writer.flush();
- }
- try {
- unacknowledgedStanzas.put((Packet) packet);
- }
- catch (InterruptedException e) {
- throw new IllegalStateException(e);
- }
- }
- writer.write(packet.toXML().toString());
- if (queue.isEmpty()) {
+ Element element = nextStreamElement();
+ if (element == null) {
+ continue;
+ }
+ Packet packet = null;
+ if (element instanceof Packet) {
+ packet = (Packet) element;
+ }
+ // Check if the stream element should be put to the unacknowledgedStanza
+ // queue. Note that we can not do the put() in sendPacketInternal() and the
+ // packet order is not stable at this point (sendPacketInternal() can be
+ // called concurrently).
+ if (isSmEnabled() && packet != null) {
+ // If the unacknowledgedStanza queue is nearly full, request an new ack
+ // from the server in order to drain it
+ if (unacknowledgedStanzas.size() == 0.8 * XMPPTCPConnection.QUEUE_SIZE) {
+ writer.write(AckRequest.INSTANCE.toXML().toString());
writer.flush();
}
+ try {
+ unacknowledgedStanzas.put(packet);
+ }
+ catch (InterruptedException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+ writer.write(element.toXML().toString());
+ if (queue.isEmpty()) {
+ writer.flush();
+ }
+ if (packet != null) {
+ firePacketSendingListeners(packet);
}
}
if (!instantShutdown) {