From ac41fda1e535f2648336f3e8478a9e6c5c006b72 Mon Sep 17 00:00:00 2001 From: Florian Schmaus Date: Wed, 29 Oct 2014 10:06:33 +0100 Subject: [PATCH] Add Stream Management javadoc - rename idStanzaAcknowledged to stanzaIdAcknowledged - introduce smWasEnabledAtLeastOnce and throw StreamManagementNotEnabledExceptions - enable SM is SM resumption is enabled --- .../smack/tcp/XMPPTCPConnection.java | 173 ++++++++++++++++-- 1 file changed, 162 insertions(+), 11 deletions(-) diff --git a/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java b/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java index da961fd85..940e88b11 100644 --- a/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java +++ b/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java @@ -214,9 +214,24 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { private boolean useSm = useSmDefault; private boolean useSmResumption = useSmResumptionDefault; private long serverHandledStanzasCount = 0; + + /** + * The counter for stanzas handled ("received") by the client. + *

+ * Note that we don't need to synchronize this counter. Although JLS 17.7 states that reads and writes to longs are + * not atomic, it guarantees that there are at most 2 separate writes, one to each 32-bit half. And since + * {@link SMUtils#incrementHeight(long)} masks the lower 32 bit, we only operate on one half of the long and + * therefore have no concurrency problem because the read/write operations on one half are guaranteed to be atomic. + *

+ */ private long clientHandledStanzasCount = 0; private BlockingQueue unacknowledgedStanzas; + /** + * Set to true if Stream Management was at least once enabled for this connection. + */ + private boolean smWasEnabledAtLeastOnce = false; + /** * This listeners are invoked for every stanza that got acknowledged. *

@@ -230,7 +245,7 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { * This listeners are invoked for a acknowledged stanza that has the given stanza ID. They will * only be invoked once and automatically removed after that. */ - private final Map idStanzaAcknowledgedListeners = new ConcurrentHashMap(); + private final Map stanzaIdAcknowledgedListeners = new ConcurrentHashMap(); /** * Predicates that determine if an stream management ack should be requested from the server. @@ -1156,6 +1171,7 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { smSessionId = null; } smEnabledSyncPoint.reportSuccess(); + smWasEnabledAtLeastOnce = true; LOGGER.fine("Stream Management (XEP-198): succesfully enabled"); break; case Failed.ELEMENT: @@ -1462,19 +1478,47 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { } } + /** + * Set if Stream Management should be used by default for new connections. + * + * @param useSmDefault true to use Stream Management for new connections. + */ public static void setUseStreamManagementDefault(boolean useSmDefault) { XMPPTCPConnection.useSmDefault = useSmDefault; } - public static void setUseStreamManagementResumptiodDefault(boolean useSmResupmptionDefault) { - XMPPTCPConnection.useSmResumptionDefault = useSmResupmptionDefault; + /** + * Set if Stream Management resumption should be used by default for new connections. + * + * @param useSmResumptionDefault true to use Stream Management resumption for new connections. + */ + public static void setUseStreamManagementResumptiodDefault(boolean useSmResumptionDefault) { + if (useSmResumptionDefault) { + // Also enable SM is resumption is enabled + setUseStreamManagementDefault(useSmResumptionDefault); + } + XMPPTCPConnection.useSmResumptionDefault = useSmResumptionDefault; } + /** + * Set if Stream Management should be used if supported by the server. + * + * @param useSm true to use Stream Management. + */ public void setUseStreamManagement(boolean useSm) { this.useSm = useSm; } + /** + * Set if Stream Management resumption should be used if supported by the server. + * + * @param useSmResumption true to use Stream Management resumption. + */ public void setUseStreamManagementResumption(boolean useSmResumption) { + if (useSmResumption) { + // Also enable SM is resumption is enabled + setUseStreamManagement(useSmResumption); + } this.useSmResumption = useSmResumption; } @@ -1486,24 +1530,51 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { smClientMaxResumptionTime = resumptionTime; } + /** + * Add a predicate for Stream Management acknowledgment requests. + *

+ * Those predicates are used to determine when a Stream Management acknowledgement request is send to the server. + * Some pre-defined predicates are found in the org.jivesoftware.smack.tcp.sm.predicates package. + *

+ *

+ * If not predicate is configured, the {@link Predicate#forMessagesOrAfter5Stanzas()} will be used. + *

+ * + * @param predicate the predicate to add. + * @return if the predicate was not already active. + */ public boolean addRequestAckPredicate(PacketFilter predicate) { synchronized (requestAckPredicates) { return requestAckPredicates.add(predicate); } } + /** + * Remove the given predicate for Stream Management acknowledgment request. + * @param predicate the predicate to remove. + * @return true if the predicate was removed. + */ public boolean removeRequestAckPredicate(PacketFilter predicate) { synchronized (requestAckPredicates) { return requestAckPredicates.remove(predicate); } } + /** + * Remove all predicates for Stream Management acknowledgment requests. + */ public void removeAllRequestAckPredicates() { synchronized (requestAckPredicates) { requestAckPredicates.clear(); } } + /** + * Send an unconditional Stream Management acknowledgement request to the server. + * + * @throws StreamManagementNotEnabledException if Stream Mangement is not enabled. + * @throws NotConnectedException if the connection is not connected. + */ public void requestSmAcknowledgement() throws StreamManagementNotEnabledException, NotConnectedException { if (!isSmEnabled()) { throw new StreamManagementException.StreamManagementNotEnabledException(); @@ -1515,6 +1586,17 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { packetWriter.sendStreamElement(AckRequest.INSTANCE); } + /** + * Send a unconditional Stream Management acknowledgment to the server. + *

+ * See XEP-198: Stream Management ยง 4. Acks: + * "Either party MAY send an element at any time (e.g., after it has received a certain number of stanzas, + * or after a certain period of time), even if it has not received an element from the other party." + *

+ * + * @throws StreamManagementNotEnabledException if Stream Management is not enabled. + * @throws NotConnectedException if the connection is not connected. + */ public void sendSmAcknowledgement() throws StreamManagementNotEnabledException, NotConnectedException { if (!isSmEnabled()) { throw new StreamManagementException.StreamManagementNotEnabledException(); @@ -1526,42 +1608,111 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { packetWriter.sendStreamElement(new AckAnswer(clientHandledStanzasCount)); } - public void addStanzaAcknowledgedListener(PacketListener listener) { + /** + * Add a Stanza acknowledged listener. + *

+ * Those listeners will be invoked every time a Stanza has been acknowledged by the server. The will not get + * automatically removed. Consider using {@link #addStanzaIdAcknowledgedListener(String, PacketListener)} when + * possible. + *

+ * + * @param listener the listener to add. + * @throws StreamManagementNotEnabledException if Stream Management is not enabled. + */ + public void addStanzaAcknowledgedListener(PacketListener listener) throws StreamManagementNotEnabledException { + // Prevent users from adding callbacks that will never get removed + if (!smWasEnabledAtLeastOnce) { + throw new StreamManagementException.StreamManagementNotEnabledException(); + } stanzaAcknowledgedListeners.add(listener); } + /** + * Remove the given Stanza acknowledged listener. + * + * @param listener the listener. + * @return true if the listener was removed. + */ public boolean removeStanzaAcknowledgedListener(PacketListener listener) { return stanzaAcknowledgedListeners.remove(listener); } + /** + * Remove all stanza acknowledged listeners. + */ public void removeAllStanzaAcknowledgedListeners() { stanzaAcknowledgedListeners.clear(); } - public PacketListener addIdStanzaAcknowledgedListener(String id, PacketListener listener) { - return idStanzaAcknowledgedListeners.put(id, listener); + /** + * Add a new Stanza ID acknowledged listener for the given ID. + *

+ * The listener will be invoked if the stanza with the given ID was acknowledged by the server. It will + * automatically be removed after the listener was run. + *

+ * + * @param id the stanza ID. + * @param listener the listener to invoke. + * @return the previous listener for this stanza ID or null. + * @throws StreamManagementNotEnabledException if Stream Management is not enabled. + */ + public PacketListener addStanzaIdAcknowledgedListener(String id, PacketListener listener) throws StreamManagementNotEnabledException { + // Prevent users from adding callbacks that will never get removed + if (!smWasEnabledAtLeastOnce) { + throw new StreamManagementException.StreamManagementNotEnabledException(); + } + return stanzaIdAcknowledgedListeners.put(id, listener); } - public PacketListener removeIdStanzaAcknowledgedListener(String id) { - return idStanzaAcknowledgedListeners.remove(id); + /** + * Remove the Stanza ID acknowledged listener for the given ID. + * + * @param id the stanza ID. + * @return true if the listener was found and removed, false otherwise. + */ + public PacketListener removeStanzaIdAcknowledgedListener(String id) { + return stanzaIdAcknowledgedListeners.remove(id); } - public void removeAllIdStanzaAcknowledgedListeners() { - idStanzaAcknowledgedListeners.clear(); + /** + * Removes all Stanza ID acknowledged listeners. + */ + public void removeAllStanzaIdAcknowledgedListeners() { + stanzaIdAcknowledgedListeners.clear(); } + /** + * Returns true if Stream Management is supported by the server. + * + * @return true if Stream Management is supported by the server. + */ public boolean isSmAvailable() { return hasFeature(StreamManagementFeature.ELEMENT, StreamManagement.NAMESPACE); } + /** + * Returns true if Stream Management was successfully negotiated with the server. + * + * @return true if Stream Management was negotiated. + */ public boolean isSmEnabled() { return smEnabledSyncPoint.wasSuccessful(); } + /** + * Returns true if the connection is disconnected by a Stream resumption via Stream Management is possible. + * + * @return true if disconnected but resumption possible. + */ public boolean isDisconnectedButSmResumptionPossible() { return disconnectedButResumeable && isSmResumptionPossible(); } + /** + * Returns true if the stream is resumable. + * + * @return true if the stream is resumable. + */ public boolean isSmResumptionPossible() { // There is no resumable stream available if (smSessionId == null) @@ -1603,7 +1754,7 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { } String id = ackedStanza.getPacketID(); if (id != null) { - PacketListener listener = idStanzaAcknowledgedListeners.remove(id); + PacketListener listener = stanzaIdAcknowledgedListeners.remove(id); if (listener != null) { listener.processPacket(ackedStanza); }