Add an API to send and wait async for a response

SMACK-595

Also remove the unused AbstractXMPPConnection.getPacketListeners()
method.
This commit is contained in:
Florian Schmaus 2014-08-16 00:04:24 +02:00
parent 18a958e442
commit aeda0e4660
6 changed files with 221 additions and 27 deletions

View File

@ -28,6 +28,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
@ -697,17 +698,8 @@ public abstract class AbstractXMPPConnection implements XMPPConnection {
}
@Override
public void removePacketListener(PacketListener packetListener) {
recvListeners.remove(packetListener);
}
/**
* Get a map of all packet listeners for received packets of this connection.
*
* @return a map of all packet listeners for received packets.
*/
protected Map<PacketListener, ListenerWrapper> getPacketListeners() {
return recvListeners;
public boolean removePacketListener(PacketListener packetListener) {
return recvListeners.remove(packetListener) != null;
}
@Override
@ -1106,6 +1098,7 @@ public abstract class AbstractXMPPConnection implements XMPPConnection {
// gc'ed. It is possible that the XMPPConnection instance is gc'ed while the
// listenerExecutor ExecutorService call not be gc'ed until it got shut down.
executorService.shutdownNow();
removeCallbacksService.shutdownNow();
}
finally {
super.finalize();
@ -1121,4 +1114,86 @@ public abstract class AbstractXMPPConnection implements XMPPConnection {
public boolean isRosterLoadedAtLogin() {
return config.isRosterLoadedAtLogin();
}
private final ScheduledExecutorService removeCallbacksService = new ScheduledThreadPoolExecutor(1,
new SmackExecutorThreadFactory(connectionCounterValue));
@Override
public void sendStanzaWithResponseCallback(Packet stanza, PacketFilter replyFilter,
PacketListener callback) throws NotConnectedException {
sendStanzaWithResponseCallback(stanza, replyFilter, callback, null);
}
@Override
public void sendStanzaWithResponseCallback(Packet stanza, PacketFilter replyFilter,
PacketListener callback, ExceptionCallback exceptionCallback)
throws NotConnectedException {
sendStanzaWithResponseCallback(stanza, replyFilter, callback, exceptionCallback,
getPacketReplyTimeout());
}
@Override
public void sendStanzaWithResponseCallback(Packet stanza, PacketFilter replyFilter,
final PacketListener callback, final ExceptionCallback exceptionCallback,
long timeout) throws NotConnectedException {
if (stanza == null) {
throw new IllegalArgumentException("stanza must not be null");
}
if (replyFilter == null) {
// While Smack allows to add PacketListeners with a PacketFilter value of 'null', we
// disallow it here in the async API as it makes no sense
throw new IllegalArgumentException("replyFilter must not be null");
}
if (callback == null) {
throw new IllegalArgumentException("callback must not be null");
}
final PacketListener packetListener = new PacketListener() {
@Override
public void processPacket(Packet packet) throws NotConnectedException {
try {
XMPPErrorException.ifHasErrorThenThrow(packet);
callback.processPacket(packet);
}
catch (XMPPErrorException e) {
if (exceptionCallback != null) {
exceptionCallback.processException(e);
}
}
finally {
removePacketListener(this);
}
}
};
removeCallbacksService.schedule(new Runnable() {
@Override
public void run() {
boolean removed = removePacketListener(packetListener);
if (!removed) {
exceptionCallback.processException(new NoResponseException());
}
}
}, timeout, TimeUnit.MILLISECONDS);
addPacketListener(packetListener, replyFilter);
sendPacket(stanza);
}
@Override
public void sendIqWithResponseCallback(IQ iqRequest, PacketListener callback)
throws NotConnectedException {
sendIqWithResponseCallback(iqRequest, callback, null);
}
@Override
public void sendIqWithResponseCallback(IQ iqRequest, PacketListener callback,
ExceptionCallback exceptionCallback) throws NotConnectedException {
sendIqWithResponseCallback(iqRequest, callback, exceptionCallback, getPacketReplyTimeout());
}
@Override
public void sendIqWithResponseCallback(IQ iqRequest, final PacketListener callback,
final ExceptionCallback exceptionCallback, long timeout)
throws NotConnectedException {
PacketFilter replyFilter = new IQReplyFilter(iqRequest, this);
sendStanzaWithResponseCallback(iqRequest, replyFilter, callback, exceptionCallback, timeout);
}
}

View File

@ -0,0 +1,23 @@
/**
*
* Copyright © 2014 Florian Schmaus
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jivesoftware.smack;
public interface ExceptionCallback {
public void processException(Exception exception);
}

View File

@ -24,7 +24,6 @@ import org.jivesoftware.smack.SmackException.NoResponseException;
import org.jivesoftware.smack.XMPPException.XMPPErrorException;
import org.jivesoftware.smack.filter.PacketFilter;
import org.jivesoftware.smack.packet.Packet;
import org.jivesoftware.smack.packet.XMPPError;
/**
* Provides a mechanism to collect packets into a result queue that pass a
@ -180,10 +179,7 @@ public class PacketCollector {
throw new NoResponseException();
}
XMPPError xmppError = result.getError();
if (xmppError != null) {
throw new XMPPErrorException(xmppError);
}
XMPPErrorException.ifHasErrorThenThrow(result);
return result;
}

View File

@ -16,6 +16,7 @@
*/
package org.jivesoftware.smack;
import org.jivesoftware.smack.SmackException.NoResponseException;
import org.jivesoftware.smack.SmackException.NotConnectedException;
import org.jivesoftware.smack.filter.IQReplyFilter;
@ -225,8 +226,9 @@ public interface XMPPConnection {
* Removes a packet listener for received packets from this connection.
*
* @param packetListener the packet listener to remove.
* @return true if the packet listener was removed
*/
public void removePacketListener(PacketListener packetListener);
public boolean removePacketListener(PacketListener packetListener);
/**
* Registers a packet listener with this connection. The listener will be
@ -357,4 +359,101 @@ public interface XMPPConnection {
* @return true if the roster will be loaded from the server when logging in.
*/
public boolean isRosterLoadedAtLogin();
/**
* Send a stanza and wait asynchronously for a response by using <code>replyFilter</code>.
* <p>
* If there is a response, then <code>callback</code> will be invoked. The callback will be
* invoked at most once and it will be not invoked after the connections default reply timeout
* has been elapsed.
* </p>
*
* @param stanza the stanza to send (required)
* @param replyFilter the filter used to determine response stanza (required)
* @param callback the callback invoked if there is a response (required)
* @throws NotConnectedException
*/
public void sendStanzaWithResponseCallback(Packet stanza, PacketFilter replyFilter,
PacketListener callback) throws NotConnectedException;
/**
* Send a stanza and wait asynchronously for a response by using <code>replyFilter</code>.
* <p>
* If there is a response, then <code>callback</code> will be invoked. If there is no response
* after the connections default reply timeout, then <code>exceptionCallback</code> will be invoked
* with a {@link SmackException.NoResponseException}. The callback will be invoked at most once.
* </p>
*
* @param stanza the stanza to send (required)
* @param replyFilter the filter used to determine response stanza (required)
* @param callback the callback invoked if there is a response (required)
* @param exceptionCallback the callback invoked if there is an exception (optional)
* @throws NotConnectedException
*/
public void sendStanzaWithResponseCallback(Packet stanza, PacketFilter replyFilter, PacketListener callback,
ExceptionCallback exceptionCallback) throws NotConnectedException;
/**
* Send a stanza and wait asynchronously for a response by using <code>replyFilter</code>.
* <p>
* If there is a response, then <code>callback</code> will be invoked. If there is no response
* after <code>timeout</code> milliseconds, then <code>exceptionCallback</code> will be invoked
* with a {@link SmackException.NoResponseException}. The callback will be invoked at most once.
* </p>
*
* @param stanza the stanza to send (required)
* @param replyFilter the filter used to determine response stanza (required)
* @param callback the callback invoked if there is a response (required)
* @param exceptionCallback the callback invoked if there is an exception (optional)
* @param timeout the timeout in milliseconds to wait for a response
* @throws NotConnectedException
*/
public void sendStanzaWithResponseCallback(Packet stanza, PacketFilter replyFilter,
final PacketListener callback, final ExceptionCallback exceptionCallback,
long timeout) throws NotConnectedException;
/**
* Send a IQ stanza and invoke <code>callback</code> if there is a result of
* {@link org.jivesoftware.smack.packet.IQ.Type#result} with that result IQ. The callback will
* not be invoked after the connections default reply timeout has been elapsed.
*
* @param iqRequest the IQ stanza to send (required)
* @param callback the callback invoked if there is result response (required)
* @throws NotConnectedException
*/
public void sendIqWithResponseCallback(IQ iqRequest, PacketListener callback) throws NotConnectedException;
/**
* Send a IQ stanza and invoke <code>callback</code> if there is a result of
* {@link org.jivesoftware.smack.packet.IQ.Type#result} with that result IQ. If there is an
* error response <code>exceptionCallback</code> will be invoked, if not null, with the received
* error as {@link XMPPException.XMPPErrorException}. If there is no response after the
* connections default reply timeout, then <code>exceptionCallback</code> will be invoked with a
* {@link SmackException.NoResponseException}.
*
* @param iqRequest the IQ stanza to send (required)
* @param callback the callback invoked if there is result response (required)
* @param exceptionCallback the callback invoked if there is an Exception optional
* @throws NotConnectedException
*/
public void sendIqWithResponseCallback(IQ iqRequest, PacketListener callback,
ExceptionCallback exceptionCallback) throws NotConnectedException;
/**
* Send a IQ stanza and invoke <code>callback</code> if there is a result of
* {@link org.jivesoftware.smack.packet.IQ.Type#result} with that result IQ. If there is an
* error response <code>exceptionCallback</code> will be invoked, if not null, with the received
* error as {@link XMPPException.XMPPErrorException}. If there is no response after
* <code>timeout</code>, then <code>exceptionCallback</code> will be invoked with a
* {@link SmackException.NoResponseException}.
*
* @param iqRequest the IQ stanza to send (required)
* @param callback the callback invoked if there is result response (required)
* @param exceptionCallback the callback invoked if there is an Exception optional
* @param timeout the timeout in milliseconds to wait for a response
* @throws NotConnectedException
*/
public void sendIqWithResponseCallback(IQ iqRequest, final PacketListener callback,
final ExceptionCallback exceptionCallback, long timeout)
throws NotConnectedException;
}

View File

@ -17,6 +17,7 @@
package org.jivesoftware.smack;
import org.jivesoftware.smack.packet.Packet;
import org.jivesoftware.smack.packet.StreamError;
import org.jivesoftware.smack.packet.XMPPError;
@ -133,6 +134,13 @@ public abstract class XMPPException extends Exception {
public String toString() {
return getMessage();
}
public static void ifHasErrorThenThrow(Packet packet) throws XMPPErrorException {
XMPPError xmppError = packet.getError();
if (xmppError != null) {
throw new XMPPErrorException(xmppError);
}
}
}
public static class StreamErrorException extends XMPPException {

View File

@ -29,7 +29,6 @@ import org.jivesoftware.smack.PacketListener;
import org.jivesoftware.smack.XMPPConnectionRegistry;
import org.jivesoftware.smack.XMPPException;
import org.jivesoftware.smack.XMPPException.XMPPErrorException;
import org.jivesoftware.smack.filter.IQReplyFilter;
import org.jivesoftware.smack.packet.IQ;
import org.jivesoftware.smack.packet.Message;
import org.jivesoftware.smack.packet.Packet;
@ -121,17 +120,11 @@ public class CarbonManager extends Manager {
public void sendCarbonsEnabled(final boolean new_state) throws NotConnectedException {
IQ setIQ = carbonsEnabledIQ(new_state);
connection().addPacketListener(new PacketListener() {
connection().sendIqWithResponseCallback(setIQ, new PacketListener() {
public void processPacket(Packet packet) {
IQ result = (IQ)packet;
if (result.getType() == IQ.Type.result) {
enabled_state = new_state;
}
connection().removePacketListener(this);
enabled_state = new_state;
}
}, new IQReplyFilter(setIQ, connection()));
connection().sendPacket(setIQ);
});
}
/**