mirror of
https://github.com/vanitasvitae/Smack.git
synced 2024-11-22 12:02:05 +01:00
Notify packet sending listeners in new thread
sendListeners are now invoked *after* the packet has been put on the wire. Also sending listener exceptions are not catched and not only NotConnectedExceptions. And a exception does not cause a 'break' but a 'continue' now. Log level is WARNING now.
This commit is contained in:
parent
51d84647f3
commit
9f56842ee4
3 changed files with 57 additions and 35 deletions
|
@ -309,6 +309,9 @@ public class XMPPBOSHConnection extends AbstractXMPPConnection {
|
||||||
private void sendElement(Element element) {
|
private void sendElement(Element element) {
|
||||||
try {
|
try {
|
||||||
send(ComposableBody.builder().setPayloadXML(element.toXML().toString()).build());
|
send(ComposableBody.builder().setPayloadXML(element.toXML().toString()).build());
|
||||||
|
if (element instanceof Packet) {
|
||||||
|
firePacketSendingListeners((Packet) element);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
catch (BOSHException e) {
|
catch (BOSHException e) {
|
||||||
LOGGER.log(Level.SEVERE, "BOSHException in sendPacketInternal", e);
|
LOGGER.log(Level.SEVERE, "BOSHException in sendPacketInternal", e);
|
||||||
|
|
|
@ -63,6 +63,7 @@ import org.jivesoftware.smack.packet.PlainStreamElement;
|
||||||
import org.jivesoftware.smack.provider.PacketExtensionProvider;
|
import org.jivesoftware.smack.provider.PacketExtensionProvider;
|
||||||
import org.jivesoftware.smack.provider.ProviderManager;
|
import org.jivesoftware.smack.provider.ProviderManager;
|
||||||
import org.jivesoftware.smack.rosterstore.RosterStore;
|
import org.jivesoftware.smack.rosterstore.RosterStore;
|
||||||
|
import org.jivesoftware.smack.util.Async;
|
||||||
import org.jivesoftware.smack.util.PacketParserUtils;
|
import org.jivesoftware.smack.util.PacketParserUtils;
|
||||||
import org.jxmpp.util.XmppStringUtils;
|
import org.jxmpp.util.XmppStringUtils;
|
||||||
import org.xmlpull.v1.XmlPullParser;
|
import org.xmlpull.v1.XmlPullParser;
|
||||||
|
@ -506,9 +507,6 @@ public abstract class AbstractXMPPConnection implements XMPPConnection {
|
||||||
// the content of the packet.
|
// the content of the packet.
|
||||||
firePacketInterceptors(packet);
|
firePacketInterceptors(packet);
|
||||||
sendPacketInternal(packet);
|
sendPacketInternal(packet);
|
||||||
// Process packet writer listeners. Note that we're using the sending thread so it's
|
|
||||||
// expected that listeners are fast.
|
|
||||||
firePacketSendingListeners(packet);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -703,11 +701,15 @@ public abstract class AbstractXMPPConnection implements XMPPConnection {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Process all packet listeners for sending packets.
|
* Process all packet listeners for sending packets.
|
||||||
|
* <p>
|
||||||
|
* Compared to {@link #firePacketInterceptors(Packet)}, the listeners will be invoked in a new thread.
|
||||||
|
* </p>
|
||||||
*
|
*
|
||||||
* @param packet the packet to process.
|
* @param packet the packet to process.
|
||||||
*/
|
*/
|
||||||
private void firePacketSendingListeners(Packet packet) {
|
@SuppressWarnings("javadoc")
|
||||||
List<PacketListener> listenersToNotify = new LinkedList<PacketListener>();
|
protected void firePacketSendingListeners(final Packet packet) {
|
||||||
|
final List<PacketListener> listenersToNotify = new LinkedList<PacketListener>();
|
||||||
synchronized (sendListeners) {
|
synchronized (sendListeners) {
|
||||||
for (ListenerWrapper listenerWrapper : sendListeners.values()) {
|
for (ListenerWrapper listenerWrapper : sendListeners.values()) {
|
||||||
if (listenerWrapper.filterMatches(packet)) {
|
if (listenerWrapper.filterMatches(packet)) {
|
||||||
|
@ -715,15 +717,24 @@ public abstract class AbstractXMPPConnection implements XMPPConnection {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for (PacketListener listener : listenersToNotify) {
|
if (listenersToNotify.isEmpty()) {
|
||||||
try {
|
return;
|
||||||
listener.processPacket(packet);
|
|
||||||
}
|
|
||||||
catch (NotConnectedException e) {
|
|
||||||
LOGGER.log(Level.WARNING, "Got not connected exception, aborting");
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
// Notify in a new thread, because we can
|
||||||
|
Async.go(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
for (PacketListener listener : listenersToNotify) {
|
||||||
|
try {
|
||||||
|
listener.processPacket(packet);
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
LOGGER.log(Level.WARNING, "Sending listener threw exception", e);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}}
|
||||||
|
, "Smack Sending Listeners Notification (" + getConnectionCounter() + ')');
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -1347,30 +1347,38 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
|
||||||
initalOpenStreamSend.reportSuccess();
|
initalOpenStreamSend.reportSuccess();
|
||||||
// Write out packets from the queue.
|
// Write out packets from the queue.
|
||||||
while (!done()) {
|
while (!done()) {
|
||||||
Element packet = nextStreamElement();
|
Element element = nextStreamElement();
|
||||||
if (packet != null) {
|
if (element == null) {
|
||||||
// Check if the stream element should be put to the unacknowledgedStanza
|
continue;
|
||||||
// queue. Note that we can not do the put() in sendPacketInternal() and the
|
}
|
||||||
// packet order is not stable at this point (sendPacketInternal() can be
|
Packet packet = null;
|
||||||
// called concurrently).
|
if (element instanceof Packet) {
|
||||||
if (isSmEnabled() && packet instanceof Packet) {
|
packet = (Packet) element;
|
||||||
// If the unacknowledgedStanza queue is nearly full, request an new ack
|
}
|
||||||
// from the server in order to drain it
|
// Check if the stream element should be put to the unacknowledgedStanza
|
||||||
if (unacknowledgedStanzas.size() == 0.8 * XMPPTCPConnection.QUEUE_SIZE) {
|
// queue. Note that we can not do the put() in sendPacketInternal() and the
|
||||||
writer.write(AckRequest.INSTANCE.toXML().toString());
|
// packet order is not stable at this point (sendPacketInternal() can be
|
||||||
writer.flush();
|
// called concurrently).
|
||||||
}
|
if (isSmEnabled() && packet != null) {
|
||||||
try {
|
// If the unacknowledgedStanza queue is nearly full, request an new ack
|
||||||
unacknowledgedStanzas.put((Packet) packet);
|
// from the server in order to drain it
|
||||||
}
|
if (unacknowledgedStanzas.size() == 0.8 * XMPPTCPConnection.QUEUE_SIZE) {
|
||||||
catch (InterruptedException e) {
|
writer.write(AckRequest.INSTANCE.toXML().toString());
|
||||||
throw new IllegalStateException(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
writer.write(packet.toXML().toString());
|
|
||||||
if (queue.isEmpty()) {
|
|
||||||
writer.flush();
|
writer.flush();
|
||||||
}
|
}
|
||||||
|
try {
|
||||||
|
unacknowledgedStanzas.put(packet);
|
||||||
|
}
|
||||||
|
catch (InterruptedException e) {
|
||||||
|
throw new IllegalStateException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
writer.write(element.toXML().toString());
|
||||||
|
if (queue.isEmpty()) {
|
||||||
|
writer.flush();
|
||||||
|
}
|
||||||
|
if (packet != null) {
|
||||||
|
firePacketSendingListeners(packet);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (!instantShutdown) {
|
if (!instantShutdown) {
|
||||||
|
|
Loading…
Reference in a new issue