Add Stream Management javadoc

- rename idStanzaAcknowledged to stanzaIdAcknowledged
- introduce smWasEnabledAtLeastOnce and throw
  StreamManagementNotEnabledExceptions
- enable SM is SM resumption is enabled
This commit is contained in:
Florian Schmaus 2014-10-29 10:06:33 +01:00
parent 92bc3452da
commit ac41fda1e5
1 changed files with 162 additions and 11 deletions

View File

@ -214,9 +214,24 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
private boolean useSm = useSmDefault; private boolean useSm = useSmDefault;
private boolean useSmResumption = useSmResumptionDefault; private boolean useSmResumption = useSmResumptionDefault;
private long serverHandledStanzasCount = 0; private long serverHandledStanzasCount = 0;
/**
* The counter for stanzas handled ("received") by the client.
* <p>
* Note that we don't need to synchronize this counter. Although JLS 17.7 states that reads and writes to longs are
* not atomic, it guarantees that there are at most 2 separate writes, one to each 32-bit half. And since
* {@link SMUtils#incrementHeight(long)} masks the lower 32 bit, we only operate on one half of the long and
* therefore have no concurrency problem because the read/write operations on one half are guaranteed to be atomic.
* </p>
*/
private long clientHandledStanzasCount = 0; private long clientHandledStanzasCount = 0;
private BlockingQueue<Packet> unacknowledgedStanzas; private BlockingQueue<Packet> unacknowledgedStanzas;
/**
* Set to true if Stream Management was at least once enabled for this connection.
*/
private boolean smWasEnabledAtLeastOnce = false;
/** /**
* This listeners are invoked for every stanza that got acknowledged. * This listeners are invoked for every stanza that got acknowledged.
* <p> * <p>
@ -230,7 +245,7 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
* This listeners are invoked for a acknowledged stanza that has the given stanza ID. They will * This listeners are invoked for a acknowledged stanza that has the given stanza ID. They will
* only be invoked once and automatically removed after that. * only be invoked once and automatically removed after that.
*/ */
private final Map<String, PacketListener> idStanzaAcknowledgedListeners = new ConcurrentHashMap<String, PacketListener>(); private final Map<String, PacketListener> stanzaIdAcknowledgedListeners = new ConcurrentHashMap<String, PacketListener>();
/** /**
* Predicates that determine if an stream management ack should be requested from the server. * Predicates that determine if an stream management ack should be requested from the server.
@ -1156,6 +1171,7 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
smSessionId = null; smSessionId = null;
} }
smEnabledSyncPoint.reportSuccess(); smEnabledSyncPoint.reportSuccess();
smWasEnabledAtLeastOnce = true;
LOGGER.fine("Stream Management (XEP-198): succesfully enabled"); LOGGER.fine("Stream Management (XEP-198): succesfully enabled");
break; break;
case Failed.ELEMENT: case Failed.ELEMENT:
@ -1462,19 +1478,47 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
} }
} }
/**
* Set if Stream Management should be used by default for new connections.
*
* @param useSmDefault true to use Stream Management for new connections.
*/
public static void setUseStreamManagementDefault(boolean useSmDefault) { public static void setUseStreamManagementDefault(boolean useSmDefault) {
XMPPTCPConnection.useSmDefault = useSmDefault; XMPPTCPConnection.useSmDefault = useSmDefault;
} }
public static void setUseStreamManagementResumptiodDefault(boolean useSmResupmptionDefault) { /**
XMPPTCPConnection.useSmResumptionDefault = useSmResupmptionDefault; * Set if Stream Management resumption should be used by default for new connections.
*
* @param useSmResumptionDefault true to use Stream Management resumption for new connections.
*/
public static void setUseStreamManagementResumptiodDefault(boolean useSmResumptionDefault) {
if (useSmResumptionDefault) {
// Also enable SM is resumption is enabled
setUseStreamManagementDefault(useSmResumptionDefault);
}
XMPPTCPConnection.useSmResumptionDefault = useSmResumptionDefault;
} }
/**
* Set if Stream Management should be used if supported by the server.
*
* @param useSm true to use Stream Management.
*/
public void setUseStreamManagement(boolean useSm) { public void setUseStreamManagement(boolean useSm) {
this.useSm = useSm; this.useSm = useSm;
} }
/**
* Set if Stream Management resumption should be used if supported by the server.
*
* @param useSmResumption true to use Stream Management resumption.
*/
public void setUseStreamManagementResumption(boolean useSmResumption) { public void setUseStreamManagementResumption(boolean useSmResumption) {
if (useSmResumption) {
// Also enable SM is resumption is enabled
setUseStreamManagement(useSmResumption);
}
this.useSmResumption = useSmResumption; this.useSmResumption = useSmResumption;
} }
@ -1486,24 +1530,51 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
smClientMaxResumptionTime = resumptionTime; smClientMaxResumptionTime = resumptionTime;
} }
/**
* Add a predicate for Stream Management acknowledgment requests.
* <p>
* Those predicates are used to determine when a Stream Management acknowledgement request is send to the server.
* Some pre-defined predicates are found in the <code>org.jivesoftware.smack.tcp.sm.predicates</code> package.
* </p>
* <p>
* If not predicate is configured, the {@link Predicate#forMessagesOrAfter5Stanzas()} will be used.
* </p>
*
* @param predicate the predicate to add.
* @return if the predicate was not already active.
*/
public boolean addRequestAckPredicate(PacketFilter predicate) { public boolean addRequestAckPredicate(PacketFilter predicate) {
synchronized (requestAckPredicates) { synchronized (requestAckPredicates) {
return requestAckPredicates.add(predicate); return requestAckPredicates.add(predicate);
} }
} }
/**
* Remove the given predicate for Stream Management acknowledgment request.
* @param predicate the predicate to remove.
* @return true if the predicate was removed.
*/
public boolean removeRequestAckPredicate(PacketFilter predicate) { public boolean removeRequestAckPredicate(PacketFilter predicate) {
synchronized (requestAckPredicates) { synchronized (requestAckPredicates) {
return requestAckPredicates.remove(predicate); return requestAckPredicates.remove(predicate);
} }
} }
/**
* Remove all predicates for Stream Management acknowledgment requests.
*/
public void removeAllRequestAckPredicates() { public void removeAllRequestAckPredicates() {
synchronized (requestAckPredicates) { synchronized (requestAckPredicates) {
requestAckPredicates.clear(); requestAckPredicates.clear();
} }
} }
/**
* Send an unconditional Stream Management acknowledgement request to the server.
*
* @throws StreamManagementNotEnabledException if Stream Mangement is not enabled.
* @throws NotConnectedException if the connection is not connected.
*/
public void requestSmAcknowledgement() throws StreamManagementNotEnabledException, NotConnectedException { public void requestSmAcknowledgement() throws StreamManagementNotEnabledException, NotConnectedException {
if (!isSmEnabled()) { if (!isSmEnabled()) {
throw new StreamManagementException.StreamManagementNotEnabledException(); throw new StreamManagementException.StreamManagementNotEnabledException();
@ -1515,6 +1586,17 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
packetWriter.sendStreamElement(AckRequest.INSTANCE); packetWriter.sendStreamElement(AckRequest.INSTANCE);
} }
/**
* Send a unconditional Stream Management acknowledgment to the server.
* <p>
* See <a href="http://xmpp.org/extensions/xep-0198.html#acking">XEP-198: Stream Management § 4. Acks</a>:
* "Either party MAY send an <a/> element at any time (e.g., after it has received a certain number of stanzas,
* or after a certain period of time), even if it has not received an <r/> element from the other party."
* </p>
*
* @throws StreamManagementNotEnabledException if Stream Management is not enabled.
* @throws NotConnectedException if the connection is not connected.
*/
public void sendSmAcknowledgement() throws StreamManagementNotEnabledException, NotConnectedException { public void sendSmAcknowledgement() throws StreamManagementNotEnabledException, NotConnectedException {
if (!isSmEnabled()) { if (!isSmEnabled()) {
throw new StreamManagementException.StreamManagementNotEnabledException(); throw new StreamManagementException.StreamManagementNotEnabledException();
@ -1526,42 +1608,111 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
packetWriter.sendStreamElement(new AckAnswer(clientHandledStanzasCount)); packetWriter.sendStreamElement(new AckAnswer(clientHandledStanzasCount));
} }
public void addStanzaAcknowledgedListener(PacketListener listener) { /**
* Add a Stanza acknowledged listener.
* <p>
* Those listeners will be invoked every time a Stanza has been acknowledged by the server. The will not get
* automatically removed. Consider using {@link #addStanzaIdAcknowledgedListener(String, PacketListener)} when
* possible.
* </p>
*
* @param listener the listener to add.
* @throws StreamManagementNotEnabledException if Stream Management is not enabled.
*/
public void addStanzaAcknowledgedListener(PacketListener listener) throws StreamManagementNotEnabledException {
// Prevent users from adding callbacks that will never get removed
if (!smWasEnabledAtLeastOnce) {
throw new StreamManagementException.StreamManagementNotEnabledException();
}
stanzaAcknowledgedListeners.add(listener); stanzaAcknowledgedListeners.add(listener);
} }
/**
* Remove the given Stanza acknowledged listener.
*
* @param listener the listener.
* @return true if the listener was removed.
*/
public boolean removeStanzaAcknowledgedListener(PacketListener listener) { public boolean removeStanzaAcknowledgedListener(PacketListener listener) {
return stanzaAcknowledgedListeners.remove(listener); return stanzaAcknowledgedListeners.remove(listener);
} }
/**
* Remove all stanza acknowledged listeners.
*/
public void removeAllStanzaAcknowledgedListeners() { public void removeAllStanzaAcknowledgedListeners() {
stanzaAcknowledgedListeners.clear(); stanzaAcknowledgedListeners.clear();
} }
public PacketListener addIdStanzaAcknowledgedListener(String id, PacketListener listener) { /**
return idStanzaAcknowledgedListeners.put(id, listener); * Add a new Stanza ID acknowledged listener for the given ID.
* <p>
* The listener will be invoked if the stanza with the given ID was acknowledged by the server. It will
* automatically be removed after the listener was run.
* </p>
*
* @param id the stanza ID.
* @param listener the listener to invoke.
* @return the previous listener for this stanza ID or null.
* @throws StreamManagementNotEnabledException if Stream Management is not enabled.
*/
public PacketListener addStanzaIdAcknowledgedListener(String id, PacketListener listener) throws StreamManagementNotEnabledException {
// Prevent users from adding callbacks that will never get removed
if (!smWasEnabledAtLeastOnce) {
throw new StreamManagementException.StreamManagementNotEnabledException();
}
return stanzaIdAcknowledgedListeners.put(id, listener);
} }
public PacketListener removeIdStanzaAcknowledgedListener(String id) { /**
return idStanzaAcknowledgedListeners.remove(id); * Remove the Stanza ID acknowledged listener for the given ID.
*
* @param id the stanza ID.
* @return true if the listener was found and removed, false otherwise.
*/
public PacketListener removeStanzaIdAcknowledgedListener(String id) {
return stanzaIdAcknowledgedListeners.remove(id);
} }
public void removeAllIdStanzaAcknowledgedListeners() { /**
idStanzaAcknowledgedListeners.clear(); * Removes all Stanza ID acknowledged listeners.
*/
public void removeAllStanzaIdAcknowledgedListeners() {
stanzaIdAcknowledgedListeners.clear();
} }
/**
* Returns true if Stream Management is supported by the server.
*
* @return true if Stream Management is supported by the server.
*/
public boolean isSmAvailable() { public boolean isSmAvailable() {
return hasFeature(StreamManagementFeature.ELEMENT, StreamManagement.NAMESPACE); return hasFeature(StreamManagementFeature.ELEMENT, StreamManagement.NAMESPACE);
} }
/**
* Returns true if Stream Management was successfully negotiated with the server.
*
* @return true if Stream Management was negotiated.
*/
public boolean isSmEnabled() { public boolean isSmEnabled() {
return smEnabledSyncPoint.wasSuccessful(); return smEnabledSyncPoint.wasSuccessful();
} }
/**
* Returns true if the connection is disconnected by a Stream resumption via Stream Management is possible.
*
* @return true if disconnected but resumption possible.
*/
public boolean isDisconnectedButSmResumptionPossible() { public boolean isDisconnectedButSmResumptionPossible() {
return disconnectedButResumeable && isSmResumptionPossible(); return disconnectedButResumeable && isSmResumptionPossible();
} }
/**
* Returns true if the stream is resumable.
*
* @return true if the stream is resumable.
*/
public boolean isSmResumptionPossible() { public boolean isSmResumptionPossible() {
// There is no resumable stream available // There is no resumable stream available
if (smSessionId == null) if (smSessionId == null)
@ -1603,7 +1754,7 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
} }
String id = ackedStanza.getPacketID(); String id = ackedStanza.getPacketID();
if (id != null) { if (id != null) {
PacketListener listener = idStanzaAcknowledgedListeners.remove(id); PacketListener listener = stanzaIdAcknowledgedListeners.remove(id);
if (listener != null) { if (listener != null) {
listener.processPacket(ackedStanza); listener.processPacket(ackedStanza);
} }