From c77948bb916a086408892ec6248fd958a3cab0d3 Mon Sep 17 00:00:00 2001 From: Florian Schmaus Date: Thu, 2 Jun 2022 15:55:24 +0200 Subject: [PATCH] Add non-blocking send --- .../smack/bosh/XMPPBOSHConnection.java | 140 +++++++++++++----- .../smack/AbstractXMPPConnection.java | 56 ++++--- .../smack/SmackConfiguration.java | 9 -- .../jivesoftware/smack/SmackException.java | 6 + .../jivesoftware/smack/XMPPConnection.java | 11 ++ .../ModularXmppClientToServerConnection.java | 30 ++-- .../jivesoftware/smack/DummyConnection.java | 13 +- .../smack/ThreadedDummyConnection.java | 14 +- .../smack/tcp/XMPPTCPConnection.java | 59 ++++++-- 9 files changed, 237 insertions(+), 101 deletions(-) diff --git a/smack-bosh/src/main/java/org/jivesoftware/smack/bosh/XMPPBOSHConnection.java b/smack-bosh/src/main/java/org/jivesoftware/smack/bosh/XMPPBOSHConnection.java index 76d8271ec..70ff246b6 100644 --- a/smack-bosh/src/main/java/org/jivesoftware/smack/bosh/XMPPBOSHConnection.java +++ b/smack-bosh/src/main/java/org/jivesoftware/smack/bosh/XMPPBOSHConnection.java @@ -29,17 +29,19 @@ import org.jivesoftware.smack.AbstractXMPPConnection; import org.jivesoftware.smack.SmackException; import org.jivesoftware.smack.SmackException.GenericConnectionException; import org.jivesoftware.smack.SmackException.NotConnectedException; +import org.jivesoftware.smack.SmackException.OutgoingQueueFullException; import org.jivesoftware.smack.SmackException.SmackWrappedException; import org.jivesoftware.smack.XMPPConnection; import org.jivesoftware.smack.XMPPException; import org.jivesoftware.smack.XMPPException.StreamErrorException; -import org.jivesoftware.smack.packet.Element; import org.jivesoftware.smack.packet.IQ; import org.jivesoftware.smack.packet.Message; -import org.jivesoftware.smack.packet.Nonza; import org.jivesoftware.smack.packet.Presence; import org.jivesoftware.smack.packet.Stanza; import org.jivesoftware.smack.packet.StanzaError; +import org.jivesoftware.smack.packet.TopLevelStreamElement; +import org.jivesoftware.smack.util.ArrayBlockingQueueWithShutdown; +import org.jivesoftware.smack.util.Async; import org.jivesoftware.smack.util.CloseableUtil; import org.jivesoftware.smack.util.PacketParserUtils; import org.jivesoftware.smack.xml.XmlPullParser; @@ -90,6 +92,10 @@ public class XMPPBOSHConnection extends AbstractXMPPConnection { @SuppressWarnings("HidingField") private final BOSHConfiguration config; + private final ArrayBlockingQueueWithShutdown outgoingQueue = new ArrayBlockingQueueWithShutdown<>(100, true); + + private Thread writerThread; + // Some flags which provides some info about the current state. private boolean isFirstInitialization = true; private boolean done = false; @@ -194,11 +200,16 @@ public class XMPPBOSHConnection extends AbstractXMPPConnection { } } + assert writerThread == null || !writerThread.isAlive(); + outgoingQueue.start(); + writerThread = Async.go(this::writeElements, this + " Writer"); + // If there is no feedback, throw an remote server timeout error if (!connected && !done) { done = true; String errorMessage = "Timeout reached for the connection to " + getHost() + ":" + getPort() + "."; + instantShutdown(); throw new SmackException.SmackMessageException(errorMessage); } @@ -207,6 +218,7 @@ public class XMPPBOSHConnection extends AbstractXMPPConnection { ""); onStreamOpen(parser); } catch (XmlPullParserException | IOException e) { + instantShutdown(); throw new AssertionError("Failed to setup stream environment", e); } } @@ -234,40 +246,92 @@ public class XMPPBOSHConnection extends AbstractXMPPConnection { afterSuccessfulLogin(false); } - @Override - public void sendNonza(Nonza element) throws NotConnectedException { - if (done) { - throw new NotConnectedException(); - } - sendElement(element); - } + private volatile boolean writerThreadRunning; - @Override - protected void sendStanzaInternal(Stanza packet) throws NotConnectedException { - sendElement(packet); - } - - private void sendElement(Element element) { + private void writeElements() { + writerThreadRunning = true; try { - send(ComposableBody.builder().setPayloadXML(element.toXML(BOSH_URI).toString()).build()); - if (element instanceof Stanza) { - firePacketSendingListeners((Stanza) element); + while (true) { + TopLevelStreamElement element; + try { + element = outgoingQueue.take(); + } catch (InterruptedException e) { + LOGGER.log(Level.FINE, + "Writer thread exiting: Outgoing queue was shutdown as signalled by interrupted exception", + e); + return; + } + + String xmlPayload = element.toXML(BOSH_URI).toString(); + ComposableBody.Builder composableBodyBuilder = ComposableBody.builder().setPayloadXML(xmlPayload); + if (sessionID != null) { + BodyQName qName = BodyQName.create(BOSH_URI, "sid"); + composableBodyBuilder.setAttribute(qName, sessionID); + } + + ComposableBody composableBody = composableBodyBuilder.build(); + + try { + client.send(composableBody); + } catch (BOSHException e) { + LOGGER.log(Level.WARNING, this + " received BOSHException in writer thread, connection broke!", e); + // TODO: Signal the user that there was an unexpected exception. + return; + } + + if (element instanceof Stanza) { + Stanza stanza = (Stanza) element; + firePacketSendingListeners(stanza); + } } - } - catch (BOSHException e) { - LOGGER.log(Level.SEVERE, "BOSHException in sendStanzaInternal", e); + } catch (Exception exception) { + LOGGER.log(Level.WARNING, "BOSH writer thread threw", exception); + } finally { + writerThreadRunning = false; + notifyWaitingThreads(); + } + } + + @Override + protected void sendInternal(TopLevelStreamElement element) throws NotConnectedException, InterruptedException { + throwNotConnectedExceptionIfAppropriate(); + try { + outgoingQueue.put(element); + } catch (InterruptedException e) { + throwNotConnectedExceptionIfAppropriate(); + // If the method above did not throw, then the sending thread was interrupted + throw e; + } + } + + @Override + protected void sendNonBlockingInternal(TopLevelStreamElement element) + throws NotConnectedException, OutgoingQueueFullException { + throwNotConnectedExceptionIfAppropriate(); + boolean enqueued = outgoingQueue.offer(element); + if (!enqueued) { + throwNotConnectedExceptionIfAppropriate(); + throw new OutgoingQueueFullException(); } } - /** - * Closes the connection by setting presence to unavailable and closing the - * HTTP client. The shutdown logic will be used during a planned disconnection or when - * dealing with an unexpected disconnection. Unlike {@link #disconnect()} the connection's - * BOSH stanza reader will not be removed; thus connection's state is kept. - * - */ @Override protected void shutdown() { + instantShutdown(); + } + + @Override + public void instantShutdown() { + outgoingQueue.shutdown(); + + try { + boolean writerThreadTerminated = waitFor(() -> !writerThreadRunning); + if (!writerThreadTerminated) { + LOGGER.severe("Writer thread of " + this + " did not terminate timely"); + } + } catch (InterruptedException e) { + LOGGER.log(Level.FINE, "Interrupted while waiting for writer thread to terminate", e); + } if (client != null) { try { @@ -275,20 +339,15 @@ public class XMPPBOSHConnection extends AbstractXMPPConnection { } catch (Exception e) { LOGGER.log(Level.WARNING, "shutdown", e); } - client = null; } - instantShutdown(); - } - - @Override - public void instantShutdown() { setWasAuthenticated(); sessionID = null; done = true; authenticated = false; connected = false; isFirstInitialization = false; + client = null; // Close down the readers and writers. CloseableUtil.maybeClose(readerPipe, LOGGER); @@ -410,14 +469,15 @@ public class XMPPBOSHConnection extends AbstractXMPPConnection { // XMPP over BOSH is unusual when it comes to SASL authentication: Instead of sending a new stream open, it // requires a special XML element ot be send after successful SASL authentication. // See XEP-0206 ยง 5., especially the following is example 5 of XEP-0206. - ComposableBody composeableBody = ComposableBody.builder().setNamespaceDefinition("xmpp", - XMPPBOSHConnection.XMPP_BOSH_NS).setAttribute( - BodyQName.createWithPrefix(XMPPBOSHConnection.XMPP_BOSH_NS, "restart", - "xmpp"), "true").setAttribute( - BodyQName.create(XMPPBOSHConnection.BOSH_URI, "to"), getXMPPServiceDomain().toString()).build(); + ComposableBody composeableBody = ComposableBody.builder() + .setNamespaceDefinition("xmpp", XMPPBOSHConnection.XMPP_BOSH_NS) + .setAttribute(BodyQName.createWithPrefix(XMPPBOSHConnection.XMPP_BOSH_NS, "restart", "xmpp"), "true") + .setAttribute(BodyQName.create(XMPPBOSHConnection.BOSH_URI, "to"), getXMPPServiceDomain().toString()) + .setAttribute(BodyQName.create(BOSH_URI, "sid"), sessionID) + .build(); try { - send(composeableBody); + client.send(composeableBody); } catch (BOSHException e) { // jbosh's exception API does not really match the one of Smack. throw new SmackException.SmackWrappedException(e); diff --git a/smack-core/src/main/java/org/jivesoftware/smack/AbstractXMPPConnection.java b/smack-core/src/main/java/org/jivesoftware/smack/AbstractXMPPConnection.java index 244c741da..3cea10d69 100644 --- a/smack-core/src/main/java/org/jivesoftware/smack/AbstractXMPPConnection.java +++ b/smack-core/src/main/java/org/jivesoftware/smack/AbstractXMPPConnection.java @@ -51,6 +51,7 @@ import org.jivesoftware.smack.SmackException.AlreadyLoggedInException; import org.jivesoftware.smack.SmackException.NoResponseException; import org.jivesoftware.smack.SmackException.NotConnectedException; import org.jivesoftware.smack.SmackException.NotLoggedInException; +import org.jivesoftware.smack.SmackException.OutgoingQueueFullException; import org.jivesoftware.smack.SmackException.ResourceBindingNotOfferedException; import org.jivesoftware.smack.SmackException.SecurityRequiredByClientException; import org.jivesoftware.smack.SmackException.SecurityRequiredException; @@ -460,8 +461,17 @@ public abstract class AbstractXMPPConnection implements XMPPConnection { @Override public abstract boolean isSecureConnection(); - protected abstract void sendStanzaInternal(Stanza packet) throws NotConnectedException, InterruptedException; + // Usually batching is a good idea. So the two + // send(Internal|NonBlockingInternal) methods below could be using + // Collection as parameter type instead. + // TODO: Add "batched send" support. Note that for the non-blocking variant, this probably requires a change in + // return type, so that it is possible to signal which messages could be "send" and which not. + protected abstract void sendInternal(TopLevelStreamElement element) throws NotConnectedException, InterruptedException; + + protected abstract void sendNonBlockingInternal(TopLevelStreamElement element) throws NotConnectedException, OutgoingQueueFullException; + + @SuppressWarnings("deprecation") @Override public boolean trySendStanza(Stanza stanza) throws NotConnectedException { // Default implementation which falls back to sendStanza() as mentioned in the methods javadoc. May be @@ -476,6 +486,7 @@ public abstract class AbstractXMPPConnection implements XMPPConnection { return true; } + @SuppressWarnings("deprecation") @Override public boolean trySendStanza(Stanza stanza, long timeout, TimeUnit unit) throws NotConnectedException, InterruptedException { @@ -486,7 +497,14 @@ public abstract class AbstractXMPPConnection implements XMPPConnection { } @Override - public abstract void sendNonza(Nonza element) throws NotConnectedException, InterruptedException; + public final void sendNonza(Nonza nonza) throws NotConnectedException, InterruptedException { + sendInternal(nonza); + } + + @Override + public final void sendNonzaNonBlocking(Nonza nonza) throws NotConnectedException, OutgoingQueueFullException { + sendNonBlockingInternal(nonza); + } @Override public abstract boolean isUsingCompression(); @@ -853,8 +871,7 @@ public abstract class AbstractXMPPConnection implements XMPPConnection { return stanzaFactory; } - @Override - public final void sendStanza(Stanza stanza) throws NotConnectedException, InterruptedException { + private Stanza preSendStanza(Stanza stanza) throws NotConnectedException { Objects.requireNonNull(stanza, "Stanza must not be null"); assert stanza instanceof Message || stanza instanceof Presence || stanza instanceof IQ; @@ -873,7 +890,19 @@ public abstract class AbstractXMPPConnection implements XMPPConnection { // Invoke interceptors for the new stanza that is about to be sent. Interceptors may modify // the content of the stanza. Stanza stanzaAfterInterceptors = firePacketInterceptors(stanza); - sendStanzaInternal(stanzaAfterInterceptors); + return stanzaAfterInterceptors; + } + + @Override + public final void sendStanza(Stanza stanza) throws NotConnectedException, InterruptedException { + stanza = preSendStanza(stanza); + sendInternal(stanza); + } + + @Override + public final void sendStanzaNonBlocking(Stanza stanza) throws NotConnectedException, OutgoingQueueFullException { + stanza = preSendStanza(stanza); + sendNonBlockingInternal(stanza); } /** @@ -2006,18 +2035,11 @@ public abstract class AbstractXMPPConnection implements XMPPConnection { }, timeout, TimeUnit.MILLISECONDS); addAsyncStanzaListener(stanzaListener, replyFilter); - Runnable sendOperation = () -> { - try { - sendStanza(stanza); - } - catch (NotConnectedException | InterruptedException exception) { - future.setException(exception); - } - }; - if (SmackConfiguration.TRUELY_ASYNC_SENDS) { - Async.go(sendOperation); - } else { - sendOperation.run(); + try { + sendStanzaNonBlocking(stanza); + } + catch (NotConnectedException | OutgoingQueueFullException exception) { + future.setException(exception); } return future; diff --git a/smack-core/src/main/java/org/jivesoftware/smack/SmackConfiguration.java b/smack-core/src/main/java/org/jivesoftware/smack/SmackConfiguration.java index 907a288b6..f3316c8ed 100644 --- a/smack-core/src/main/java/org/jivesoftware/smack/SmackConfiguration.java +++ b/smack-core/src/main/java/org/jivesoftware/smack/SmackConfiguration.java @@ -387,13 +387,4 @@ public final class SmackConfiguration { } } - /** - * If enabled, causes {@link AbstractXMPPConnection} to create a thread for every asynchronous send operation. This - * is meant to work-around a shortcoming of Smack 4.4, where certain send operations are not asynchronous even if - * they should be. This is an expert setting, do not toggle if you do not understand the consequences or have been - * told to do so. Note that it is expected that this will not be needed in future Smack versions. - * - * @since 4.4.6 - */ - public static boolean TRUELY_ASYNC_SENDS = false; } diff --git a/smack-core/src/main/java/org/jivesoftware/smack/SmackException.java b/smack-core/src/main/java/org/jivesoftware/smack/SmackException.java index ea34d243b..91b6bb4c9 100644 --- a/smack-core/src/main/java/org/jivesoftware/smack/SmackException.java +++ b/smack-core/src/main/java/org/jivesoftware/smack/SmackException.java @@ -206,6 +206,12 @@ public abstract class SmackException extends Exception { } } + public static class OutgoingQueueFullException extends SmackException { + + private static final long serialVersionUID = 1L; + + } + public static class IllegalStateChangeException extends SmackException { /** diff --git a/smack-core/src/main/java/org/jivesoftware/smack/XMPPConnection.java b/smack-core/src/main/java/org/jivesoftware/smack/XMPPConnection.java index aea730058..6b12f57f8 100644 --- a/smack-core/src/main/java/org/jivesoftware/smack/XMPPConnection.java +++ b/smack-core/src/main/java/org/jivesoftware/smack/XMPPConnection.java @@ -22,6 +22,7 @@ import javax.xml.namespace.QName; import org.jivesoftware.smack.SmackException.NoResponseException; import org.jivesoftware.smack.SmackException.NotConnectedException; +import org.jivesoftware.smack.SmackException.OutgoingQueueFullException; import org.jivesoftware.smack.XMPPException.XMPPErrorException; import org.jivesoftware.smack.filter.IQReplyFilter; import org.jivesoftware.smack.filter.StanzaFilter; @@ -199,6 +200,8 @@ public interface XMPPConnection { * */ void sendStanza(Stanza stanza) throws NotConnectedException, InterruptedException; + void sendStanzaNonBlocking(Stanza stanza) throws NotConnectedException, OutgoingQueueFullException; + /** * Try to send the given stanza. Returns {@code true} if the stanza was successfully put into the outgoing stanza * queue, otherwise, if {@code false} is returned, the stanza could not be scheduled for sending (for example @@ -213,7 +216,10 @@ public interface XMPPConnection { * @return {@code true} if the stanza was successfully scheduled to be send, {@code false} otherwise. * @throws NotConnectedException if the connection is not connected. * @since 4.4.0 + * @deprecated use {@link #sendStanzaNonBlocking(Stanza)} instead. */ + // TODO: Remove in Smack 4.7. + @Deprecated boolean trySendStanza(Stanza stanza) throws NotConnectedException; /** @@ -234,7 +240,10 @@ public interface XMPPConnection { * @throws NotConnectedException if the connection is not connected. * @throws InterruptedException if the calling thread was interrupted. * @since 4.4.0 + * @deprecated use {@link #sendStanzaNonBlocking(Stanza)} instead. */ + // TODO: Remove in Smack 4.7. + @Deprecated boolean trySendStanza(Stanza stanza, long timeout, TimeUnit unit) throws NotConnectedException, InterruptedException; /** @@ -251,6 +260,8 @@ public interface XMPPConnection { */ void sendNonza(Nonza nonza) throws NotConnectedException, InterruptedException; + void sendNonzaNonBlocking(Nonza stanza) throws NotConnectedException, OutgoingQueueFullException; + /** * Adds a connection listener to this connection that will be notified when * the connection closes or fails. diff --git a/smack-core/src/main/java/org/jivesoftware/smack/c2s/ModularXmppClientToServerConnection.java b/smack-core/src/main/java/org/jivesoftware/smack/c2s/ModularXmppClientToServerConnection.java index d975f045a..8b7ae2570 100644 --- a/smack-core/src/main/java/org/jivesoftware/smack/c2s/ModularXmppClientToServerConnection.java +++ b/smack-core/src/main/java/org/jivesoftware/smack/c2s/ModularXmppClientToServerConnection.java @@ -1,6 +1,6 @@ /** * - * Copyright 2018-2021 Florian Schmaus + * Copyright 2018-2022 Florian Schmaus * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -37,6 +37,7 @@ import org.jivesoftware.smack.AbstractXMPPConnection; import org.jivesoftware.smack.SmackException; import org.jivesoftware.smack.SmackException.NoResponseException; import org.jivesoftware.smack.SmackException.NotConnectedException; +import org.jivesoftware.smack.SmackException.OutgoingQueueFullException; import org.jivesoftware.smack.SmackFuture; import org.jivesoftware.smack.XMPPException; import org.jivesoftware.smack.XMPPException.FailedNonzaException; @@ -67,7 +68,6 @@ import org.jivesoftware.smack.packet.IQ; import org.jivesoftware.smack.packet.Message; import org.jivesoftware.smack.packet.Nonza; import org.jivesoftware.smack.packet.Presence; -import org.jivesoftware.smack.packet.Stanza; import org.jivesoftware.smack.packet.StreamError; import org.jivesoftware.smack.packet.TopLevelStreamElement; import org.jivesoftware.smack.packet.XmlEnvironment; @@ -438,16 +438,7 @@ public final class ModularXmppClientToServerConnection extends AbstractXMPPConne } @Override - protected void sendStanzaInternal(Stanza stanza) throws NotConnectedException, InterruptedException { - sendTopLevelStreamElement(stanza); - } - - @Override - public void sendNonza(Nonza nonza) throws NotConnectedException, InterruptedException { - sendTopLevelStreamElement(nonza); - } - - private void sendTopLevelStreamElement(TopLevelStreamElement element) throws NotConnectedException, InterruptedException { + protected void sendInternal(TopLevelStreamElement element) throws NotConnectedException, InterruptedException { final XmppClientToServerTransport transport = activeTransport; if (transport == null) { throw new NotConnectedException(); @@ -457,6 +448,21 @@ public final class ModularXmppClientToServerConnection extends AbstractXMPPConne transport.notifyAboutNewOutgoingElements(); } + @Override + protected void sendNonBlockingInternal(TopLevelStreamElement element) throws NotConnectedException, OutgoingQueueFullException { + final XmppClientToServerTransport transport = activeTransport; + if (transport == null) { + throw new NotConnectedException(); + } + + boolean enqueued = outgoingElementsQueue.offer(element); + if (!enqueued) { + throw new OutgoingQueueFullException(); + } + + transport.notifyAboutNewOutgoingElements(); + } + @Override protected void shutdown() { shutdown(false); diff --git a/smack-core/src/testFixtures/java/org/jivesoftware/smack/DummyConnection.java b/smack-core/src/testFixtures/java/org/jivesoftware/smack/DummyConnection.java index d0864842b..b85ce62b5 100644 --- a/smack-core/src/testFixtures/java/org/jivesoftware/smack/DummyConnection.java +++ b/smack-core/src/testFixtures/java/org/jivesoftware/smack/DummyConnection.java @@ -1,6 +1,6 @@ /** * - * Copyright 2010 Jive Software. + * Copyright 2010 Jive Software, 2022 Florian Schmaus. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,8 +23,8 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import org.jivesoftware.smack.SmackException.OutgoingQueueFullException; import org.jivesoftware.smack.packet.ExtensionElement; -import org.jivesoftware.smack.packet.Nonza; import org.jivesoftware.smack.packet.Stanza; import org.jivesoftware.smack.packet.TopLevelStreamElement; @@ -127,13 +127,16 @@ public class DummyConnection extends AbstractXMPPConnection { } @Override - public void sendNonza(Nonza element) { + protected void sendInternal(TopLevelStreamElement element) { queue.add(element); } @Override - protected void sendStanzaInternal(Stanza packet) { - queue.add(packet); + protected void sendNonBlockingInternal(TopLevelStreamElement element) throws OutgoingQueueFullException { + boolean enqueued = queue.add(element); + if (!enqueued) { + throw new OutgoingQueueFullException(); + } } /** diff --git a/smack-core/src/testFixtures/java/org/jivesoftware/smack/ThreadedDummyConnection.java b/smack-core/src/testFixtures/java/org/jivesoftware/smack/ThreadedDummyConnection.java index c853d4808..28a74a7fc 100644 --- a/smack-core/src/testFixtures/java/org/jivesoftware/smack/ThreadedDummyConnection.java +++ b/smack-core/src/testFixtures/java/org/jivesoftware/smack/ThreadedDummyConnection.java @@ -26,6 +26,7 @@ import java.util.logging.Logger; import org.jivesoftware.smack.packet.IQ; import org.jivesoftware.smack.packet.Message; import org.jivesoftware.smack.packet.Stanza; +import org.jivesoftware.smack.packet.TopLevelStreamElement; /** * A threaded dummy connection. @@ -40,10 +41,11 @@ public class ThreadedDummyConnection extends DummyConnection { private volatile boolean timeout = false; @Override - protected void sendStanzaInternal(Stanza packet) { - super.sendStanzaInternal(packet); + protected void sendInternal(TopLevelStreamElement element) { + super.sendInternal(element); - if (packet instanceof IQ && !timeout) { + if (element instanceof IQ && !timeout) { + IQ iq = (IQ) element; timeout = false; // Set reply packet to match one being sent. We haven't started the // other thread yet so this is still safe. @@ -51,11 +53,11 @@ public class ThreadedDummyConnection extends DummyConnection { // If no reply has been set via addIQReply, then we create a simple reply if (replyPacket == null) { - replyPacket = IQ.createResultIQ((IQ) packet); + replyPacket = IQ.createResultIQ(iq); replyQ.add(replyPacket); } - replyPacket.setStanzaId(packet.getStanzaId()); - replyPacket.setTo(packet.getFrom()); + replyPacket.setStanzaId(iq.getStanzaId()); + replyPacket.setTo(iq.getFrom()); if (replyPacket.getType() == null) { replyPacket.setType(IQ.Type.result); } diff --git a/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java b/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java index 561978bb4..0314db7f8 100644 --- a/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java +++ b/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java @@ -62,6 +62,7 @@ import org.jivesoftware.smack.SmackException.ConnectionException; import org.jivesoftware.smack.SmackException.EndpointConnectionException; import org.jivesoftware.smack.SmackException.NotConnectedException; import org.jivesoftware.smack.SmackException.NotLoggedInException; +import org.jivesoftware.smack.SmackException.OutgoingQueueFullException; import org.jivesoftware.smack.SmackException.SecurityNotPossibleException; import org.jivesoftware.smack.SmackException.SecurityRequiredByServerException; import org.jivesoftware.smack.SmackFuture; @@ -79,12 +80,12 @@ import org.jivesoftware.smack.internal.SmackTlsContext; import org.jivesoftware.smack.packet.Element; import org.jivesoftware.smack.packet.IQ; import org.jivesoftware.smack.packet.Message; -import org.jivesoftware.smack.packet.Nonza; import org.jivesoftware.smack.packet.Presence; import org.jivesoftware.smack.packet.Stanza; import org.jivesoftware.smack.packet.StartTls; import org.jivesoftware.smack.packet.StreamError; import org.jivesoftware.smack.packet.StreamOpen; +import org.jivesoftware.smack.packet.TopLevelStreamElement; import org.jivesoftware.smack.proxy.ProxyInfo; import org.jivesoftware.smack.sasl.packet.SaslNonza; import org.jivesoftware.smack.sm.SMUtils; @@ -464,7 +465,7 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { } } else { for (Stanza stanza : previouslyUnackedStanzas) { - sendStanzaInternal(stanza); + sendInternal(stanza); } } @@ -570,24 +571,38 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { initState(); } - @Override - public void sendNonza(Nonza element) throws NotConnectedException, InterruptedException { - packetWriter.sendStreamElement(element); + private interface SmAckAction { + void run() throws NotConnectedException, E; } - @Override - protected void sendStanzaInternal(Stanza packet) throws NotConnectedException, InterruptedException { - packetWriter.sendStreamElement(packet); - if (isSmEnabled()) { + private void requestSmAckIfNecessary(TopLevelStreamElement element, + SmAckAction smAckAction) throws NotConnectedException, E { + if (!isSmEnabled()) + return; + + if (element instanceof Stanza) { + Stanza stanza = (Stanza) element; for (StanzaFilter requestAckPredicate : requestAckPredicates) { - if (requestAckPredicate.accept(packet)) { - requestSmAcknowledgementInternal(); + if (requestAckPredicate.accept(stanza)) { + smAckAction.run(); break; } } } } + @Override + protected void sendInternal(TopLevelStreamElement element) throws NotConnectedException, InterruptedException { + packetWriter.sendStreamElement(element); + requestSmAckIfNecessary(element, () -> requestSmAcknowledgementInternal()); + } + + @Override + protected void sendNonBlockingInternal(TopLevelStreamElement element) throws NotConnectedException, OutgoingQueueFullException { + packetWriter.sendNonBlocking(element); + requestSmAckIfNecessary(element, () -> requestSmAcknowledgementNonBlockingInternal()); + } + private void connectUsingConfiguration() throws ConnectionException, IOException, InterruptedException { RemoteXmppTcpConnectionEndpoints.Result result = RemoteXmppTcpConnectionEndpoints.lookup(config); @@ -1067,7 +1082,7 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { List stanzasToResend = new ArrayList<>(unacknowledgedStanzas.size()); unacknowledgedStanzas.drainTo(stanzasToResend); for (Stanza stanza : stanzasToResend) { - sendStanzaInternal(stanza); + XMPPTCPConnection.this.sendInternal(stanza); } // If there where stanzas resent, then request a SM ack for them. // Writer's sendStreamElement() won't do it automatically based on @@ -1270,6 +1285,22 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { } } + /** + * Sends the specified element to the server. + * + * @param element the element to send. + * @throws NotConnectedException if the XMPP connection is not connected. + * @throws OutgoingQueueFullException if there is no space in the outgoing queue. + */ + protected void sendNonBlocking(Element element) throws NotConnectedException, OutgoingQueueFullException { + throwNotConnectedExceptionIfDoneAndResumptionNotPossible(); + boolean enqueued = queue.offer(element); + if (!enqueued) { + throwNotConnectedExceptionIfDoneAndResumptionNotPossible(); + throw new OutgoingQueueFullException(); + } + } + /** * Shuts down the stanza writer. Once this method has been called, no further * packets will be written to the server. @@ -1588,6 +1619,10 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { packetWriter.sendStreamElement(AckRequest.INSTANCE); } + private void requestSmAcknowledgementNonBlockingInternal() throws NotConnectedException, OutgoingQueueFullException { + packetWriter.sendNonBlocking(AckRequest.INSTANCE); + } + /** * Send a unconditional Stream Management acknowledgment to the server. *