From 9f56842ee4371f45c5d1320eaa87442a395a79cd Mon Sep 17 00:00:00 2001 From: Florian Schmaus Date: Sat, 25 Oct 2014 11:39:16 +0200 Subject: [PATCH] Notify packet sending listeners in new thread sendListeners are now invoked *after* the packet has been put on the wire. Also sending listener exceptions are not catched and not only NotConnectedExceptions. And a exception does not cause a 'break' but a 'continue' now. Log level is WARNING now. --- .../smack/bosh/XMPPBOSHConnection.java | 3 ++ .../smack/AbstractXMPPConnection.java | 37 ++++++++----- .../smack/tcp/XMPPTCPConnection.java | 52 +++++++++++-------- 3 files changed, 57 insertions(+), 35 deletions(-) diff --git a/smack-bosh/src/main/java/org/jivesoftware/smack/bosh/XMPPBOSHConnection.java b/smack-bosh/src/main/java/org/jivesoftware/smack/bosh/XMPPBOSHConnection.java index 6539686fb..b03f03c33 100644 --- a/smack-bosh/src/main/java/org/jivesoftware/smack/bosh/XMPPBOSHConnection.java +++ b/smack-bosh/src/main/java/org/jivesoftware/smack/bosh/XMPPBOSHConnection.java @@ -309,6 +309,9 @@ public class XMPPBOSHConnection extends AbstractXMPPConnection { private void sendElement(Element element) { try { send(ComposableBody.builder().setPayloadXML(element.toXML().toString()).build()); + if (element instanceof Packet) { + firePacketSendingListeners((Packet) element); + } } catch (BOSHException e) { LOGGER.log(Level.SEVERE, "BOSHException in sendPacketInternal", e); diff --git a/smack-core/src/main/java/org/jivesoftware/smack/AbstractXMPPConnection.java b/smack-core/src/main/java/org/jivesoftware/smack/AbstractXMPPConnection.java index 5904ace57..4cf52260e 100644 --- a/smack-core/src/main/java/org/jivesoftware/smack/AbstractXMPPConnection.java +++ b/smack-core/src/main/java/org/jivesoftware/smack/AbstractXMPPConnection.java @@ -63,6 +63,7 @@ import org.jivesoftware.smack.packet.PlainStreamElement; import org.jivesoftware.smack.provider.PacketExtensionProvider; import org.jivesoftware.smack.provider.ProviderManager; import org.jivesoftware.smack.rosterstore.RosterStore; +import org.jivesoftware.smack.util.Async; import org.jivesoftware.smack.util.PacketParserUtils; import org.jxmpp.util.XmppStringUtils; import org.xmlpull.v1.XmlPullParser; @@ -506,9 +507,6 @@ public abstract class AbstractXMPPConnection implements XMPPConnection { // the content of the packet. firePacketInterceptors(packet); sendPacketInternal(packet); - // Process packet writer listeners. Note that we're using the sending thread so it's - // expected that listeners are fast. - firePacketSendingListeners(packet); } @Override @@ -703,11 +701,15 @@ public abstract class AbstractXMPPConnection implements XMPPConnection { /** * Process all packet listeners for sending packets. + *

+ * Compared to {@link #firePacketInterceptors(Packet)}, the listeners will be invoked in a new thread. + *

* * @param packet the packet to process. */ - private void firePacketSendingListeners(Packet packet) { - List listenersToNotify = new LinkedList(); + @SuppressWarnings("javadoc") + protected void firePacketSendingListeners(final Packet packet) { + final List listenersToNotify = new LinkedList(); synchronized (sendListeners) { for (ListenerWrapper listenerWrapper : sendListeners.values()) { if (listenerWrapper.filterMatches(packet)) { @@ -715,15 +717,24 @@ public abstract class AbstractXMPPConnection implements XMPPConnection { } } } - for (PacketListener listener : listenersToNotify) { - try { - listener.processPacket(packet); - } - catch (NotConnectedException e) { - LOGGER.log(Level.WARNING, "Got not connected exception, aborting"); - break; - } + if (listenersToNotify.isEmpty()) { + return; } + // Notify in a new thread, because we can + Async.go(new Runnable() { + @Override + public void run() { + for (PacketListener listener : listenersToNotify) { + try { + listener.processPacket(packet); + } + catch (Exception e) { + LOGGER.log(Level.WARNING, "Sending listener threw exception", e); + continue; + } + } + }} + , "Smack Sending Listeners Notification (" + getConnectionCounter() + ')'); } @Override diff --git a/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java b/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java index 404b7b8f2..07c31b170 100644 --- a/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java +++ b/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java @@ -1347,30 +1347,38 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { initalOpenStreamSend.reportSuccess(); // Write out packets from the queue. while (!done()) { - Element packet = nextStreamElement(); - if (packet != null) { - // Check if the stream element should be put to the unacknowledgedStanza - // queue. Note that we can not do the put() in sendPacketInternal() and the - // packet order is not stable at this point (sendPacketInternal() can be - // called concurrently). - if (isSmEnabled() && packet instanceof Packet) { - // If the unacknowledgedStanza queue is nearly full, request an new ack - // from the server in order to drain it - if (unacknowledgedStanzas.size() == 0.8 * XMPPTCPConnection.QUEUE_SIZE) { - writer.write(AckRequest.INSTANCE.toXML().toString()); - writer.flush(); - } - try { - unacknowledgedStanzas.put((Packet) packet); - } - catch (InterruptedException e) { - throw new IllegalStateException(e); - } - } - writer.write(packet.toXML().toString()); - if (queue.isEmpty()) { + Element element = nextStreamElement(); + if (element == null) { + continue; + } + Packet packet = null; + if (element instanceof Packet) { + packet = (Packet) element; + } + // Check if the stream element should be put to the unacknowledgedStanza + // queue. Note that we can not do the put() in sendPacketInternal() and the + // packet order is not stable at this point (sendPacketInternal() can be + // called concurrently). + if (isSmEnabled() && packet != null) { + // If the unacknowledgedStanza queue is nearly full, request an new ack + // from the server in order to drain it + if (unacknowledgedStanzas.size() == 0.8 * XMPPTCPConnection.QUEUE_SIZE) { + writer.write(AckRequest.INSTANCE.toXML().toString()); writer.flush(); } + try { + unacknowledgedStanzas.put(packet); + } + catch (InterruptedException e) { + throw new IllegalStateException(e); + } + } + writer.write(element.toXML().toString()); + if (queue.isEmpty()) { + writer.flush(); + } + if (packet != null) { + firePacketSendingListeners(packet); } } if (!instantShutdown) {