From aeda0e46600fb1b4470de1c48f8c1d49b9c29a21 Mon Sep 17 00:00:00 2001 From: Florian Schmaus Date: Sat, 16 Aug 2014 00:04:24 +0200 Subject: [PATCH] Add an API to send and wait async for a response SMACK-595 Also remove the unused AbstractXMPPConnection.getPacketListeners() method. --- .../smack/AbstractXMPPConnection.java | 97 +++++++++++++++-- .../jivesoftware/smack/ExceptionCallback.java | 23 ++++ .../jivesoftware/smack/PacketCollector.java | 6 +- .../jivesoftware/smack/XMPPConnection.java | 101 +++++++++++++++++- .../org/jivesoftware/smack/XMPPException.java | 8 ++ .../smackx/carbons/CarbonManager.java | 13 +-- 6 files changed, 221 insertions(+), 27 deletions(-) create mode 100644 smack-core/src/main/java/org/jivesoftware/smack/ExceptionCallback.java diff --git a/smack-core/src/main/java/org/jivesoftware/smack/AbstractXMPPConnection.java b/smack-core/src/main/java/org/jivesoftware/smack/AbstractXMPPConnection.java index 9aa5fc156..fbd29f04e 100644 --- a/smack-core/src/main/java/org/jivesoftware/smack/AbstractXMPPConnection.java +++ b/smack-core/src/main/java/org/jivesoftware/smack/AbstractXMPPConnection.java @@ -28,6 +28,7 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Level; @@ -697,17 +698,8 @@ public abstract class AbstractXMPPConnection implements XMPPConnection { } @Override - public void removePacketListener(PacketListener packetListener) { - recvListeners.remove(packetListener); - } - - /** - * Get a map of all packet listeners for received packets of this connection. - * - * @return a map of all packet listeners for received packets. - */ - protected Map getPacketListeners() { - return recvListeners; + public boolean removePacketListener(PacketListener packetListener) { + return recvListeners.remove(packetListener) != null; } @Override @@ -1106,6 +1098,7 @@ public abstract class AbstractXMPPConnection implements XMPPConnection { // gc'ed. It is possible that the XMPPConnection instance is gc'ed while the // listenerExecutor ExecutorService call not be gc'ed until it got shut down. executorService.shutdownNow(); + removeCallbacksService.shutdownNow(); } finally { super.finalize(); @@ -1121,4 +1114,86 @@ public abstract class AbstractXMPPConnection implements XMPPConnection { public boolean isRosterLoadedAtLogin() { return config.isRosterLoadedAtLogin(); } + + private final ScheduledExecutorService removeCallbacksService = new ScheduledThreadPoolExecutor(1, + new SmackExecutorThreadFactory(connectionCounterValue)); + + @Override + public void sendStanzaWithResponseCallback(Packet stanza, PacketFilter replyFilter, + PacketListener callback) throws NotConnectedException { + sendStanzaWithResponseCallback(stanza, replyFilter, callback, null); + } + + @Override + public void sendStanzaWithResponseCallback(Packet stanza, PacketFilter replyFilter, + PacketListener callback, ExceptionCallback exceptionCallback) + throws NotConnectedException { + sendStanzaWithResponseCallback(stanza, replyFilter, callback, exceptionCallback, + getPacketReplyTimeout()); + } + + @Override + public void sendStanzaWithResponseCallback(Packet stanza, PacketFilter replyFilter, + final PacketListener callback, final ExceptionCallback exceptionCallback, + long timeout) throws NotConnectedException { + if (stanza == null) { + throw new IllegalArgumentException("stanza must not be null"); + } + if (replyFilter == null) { + // While Smack allows to add PacketListeners with a PacketFilter value of 'null', we + // disallow it here in the async API as it makes no sense + throw new IllegalArgumentException("replyFilter must not be null"); + } + if (callback == null) { + throw new IllegalArgumentException("callback must not be null"); + } + final PacketListener packetListener = new PacketListener() { + @Override + public void processPacket(Packet packet) throws NotConnectedException { + try { + XMPPErrorException.ifHasErrorThenThrow(packet); + callback.processPacket(packet); + } + catch (XMPPErrorException e) { + if (exceptionCallback != null) { + exceptionCallback.processException(e); + } + } + finally { + removePacketListener(this); + } + } + }; + removeCallbacksService.schedule(new Runnable() { + @Override + public void run() { + boolean removed = removePacketListener(packetListener); + if (!removed) { + exceptionCallback.processException(new NoResponseException()); + } + } + }, timeout, TimeUnit.MILLISECONDS); + addPacketListener(packetListener, replyFilter); + sendPacket(stanza); + } + + @Override + public void sendIqWithResponseCallback(IQ iqRequest, PacketListener callback) + throws NotConnectedException { + sendIqWithResponseCallback(iqRequest, callback, null); + } + + @Override + public void sendIqWithResponseCallback(IQ iqRequest, PacketListener callback, + ExceptionCallback exceptionCallback) throws NotConnectedException { + sendIqWithResponseCallback(iqRequest, callback, exceptionCallback, getPacketReplyTimeout()); + } + + @Override + public void sendIqWithResponseCallback(IQ iqRequest, final PacketListener callback, + final ExceptionCallback exceptionCallback, long timeout) + throws NotConnectedException { + PacketFilter replyFilter = new IQReplyFilter(iqRequest, this); + sendStanzaWithResponseCallback(iqRequest, replyFilter, callback, exceptionCallback, timeout); + } } diff --git a/smack-core/src/main/java/org/jivesoftware/smack/ExceptionCallback.java b/smack-core/src/main/java/org/jivesoftware/smack/ExceptionCallback.java new file mode 100644 index 000000000..cc1b98bdb --- /dev/null +++ b/smack-core/src/main/java/org/jivesoftware/smack/ExceptionCallback.java @@ -0,0 +1,23 @@ +/** + * + * Copyright © 2014 Florian Schmaus + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.jivesoftware.smack; + +public interface ExceptionCallback { + + public void processException(Exception exception); + +} diff --git a/smack-core/src/main/java/org/jivesoftware/smack/PacketCollector.java b/smack-core/src/main/java/org/jivesoftware/smack/PacketCollector.java index c95ffda6c..5174a26c8 100644 --- a/smack-core/src/main/java/org/jivesoftware/smack/PacketCollector.java +++ b/smack-core/src/main/java/org/jivesoftware/smack/PacketCollector.java @@ -24,7 +24,6 @@ import org.jivesoftware.smack.SmackException.NoResponseException; import org.jivesoftware.smack.XMPPException.XMPPErrorException; import org.jivesoftware.smack.filter.PacketFilter; import org.jivesoftware.smack.packet.Packet; -import org.jivesoftware.smack.packet.XMPPError; /** * Provides a mechanism to collect packets into a result queue that pass a @@ -180,10 +179,7 @@ public class PacketCollector { throw new NoResponseException(); } - XMPPError xmppError = result.getError(); - if (xmppError != null) { - throw new XMPPErrorException(xmppError); - } + XMPPErrorException.ifHasErrorThenThrow(result); return result; } diff --git a/smack-core/src/main/java/org/jivesoftware/smack/XMPPConnection.java b/smack-core/src/main/java/org/jivesoftware/smack/XMPPConnection.java index 538a676e8..eb21f5e97 100644 --- a/smack-core/src/main/java/org/jivesoftware/smack/XMPPConnection.java +++ b/smack-core/src/main/java/org/jivesoftware/smack/XMPPConnection.java @@ -16,6 +16,7 @@ */ package org.jivesoftware.smack; + import org.jivesoftware.smack.SmackException.NoResponseException; import org.jivesoftware.smack.SmackException.NotConnectedException; import org.jivesoftware.smack.filter.IQReplyFilter; @@ -225,8 +226,9 @@ public interface XMPPConnection { * Removes a packet listener for received packets from this connection. * * @param packetListener the packet listener to remove. + * @return true if the packet listener was removed */ - public void removePacketListener(PacketListener packetListener); + public boolean removePacketListener(PacketListener packetListener); /** * Registers a packet listener with this connection. The listener will be @@ -357,4 +359,101 @@ public interface XMPPConnection { * @return true if the roster will be loaded from the server when logging in. */ public boolean isRosterLoadedAtLogin(); + + /** + * Send a stanza and wait asynchronously for a response by using replyFilter. + *

+ * If there is a response, then callback will be invoked. The callback will be + * invoked at most once and it will be not invoked after the connections default reply timeout + * has been elapsed. + *

+ * + * @param stanza the stanza to send (required) + * @param replyFilter the filter used to determine response stanza (required) + * @param callback the callback invoked if there is a response (required) + * @throws NotConnectedException + */ + public void sendStanzaWithResponseCallback(Packet stanza, PacketFilter replyFilter, + PacketListener callback) throws NotConnectedException; + + /** + * Send a stanza and wait asynchronously for a response by using replyFilter. + *

+ * If there is a response, then callback will be invoked. If there is no response + * after the connections default reply timeout, then exceptionCallback will be invoked + * with a {@link SmackException.NoResponseException}. The callback will be invoked at most once. + *

+ * + * @param stanza the stanza to send (required) + * @param replyFilter the filter used to determine response stanza (required) + * @param callback the callback invoked if there is a response (required) + * @param exceptionCallback the callback invoked if there is an exception (optional) + * @throws NotConnectedException + */ + public void sendStanzaWithResponseCallback(Packet stanza, PacketFilter replyFilter, PacketListener callback, + ExceptionCallback exceptionCallback) throws NotConnectedException; + + /** + * Send a stanza and wait asynchronously for a response by using replyFilter. + *

+ * If there is a response, then callback will be invoked. If there is no response + * after timeout milliseconds, then exceptionCallback will be invoked + * with a {@link SmackException.NoResponseException}. The callback will be invoked at most once. + *

+ * + * @param stanza the stanza to send (required) + * @param replyFilter the filter used to determine response stanza (required) + * @param callback the callback invoked if there is a response (required) + * @param exceptionCallback the callback invoked if there is an exception (optional) + * @param timeout the timeout in milliseconds to wait for a response + * @throws NotConnectedException + */ + public void sendStanzaWithResponseCallback(Packet stanza, PacketFilter replyFilter, + final PacketListener callback, final ExceptionCallback exceptionCallback, + long timeout) throws NotConnectedException; + + /** + * Send a IQ stanza and invoke callback if there is a result of + * {@link org.jivesoftware.smack.packet.IQ.Type#result} with that result IQ. The callback will + * not be invoked after the connections default reply timeout has been elapsed. + * + * @param iqRequest the IQ stanza to send (required) + * @param callback the callback invoked if there is result response (required) + * @throws NotConnectedException + */ + public void sendIqWithResponseCallback(IQ iqRequest, PacketListener callback) throws NotConnectedException; + + /** + * Send a IQ stanza and invoke callback if there is a result of + * {@link org.jivesoftware.smack.packet.IQ.Type#result} with that result IQ. If there is an + * error response exceptionCallback will be invoked, if not null, with the received + * error as {@link XMPPException.XMPPErrorException}. If there is no response after the + * connections default reply timeout, then exceptionCallback will be invoked with a + * {@link SmackException.NoResponseException}. + * + * @param iqRequest the IQ stanza to send (required) + * @param callback the callback invoked if there is result response (required) + * @param exceptionCallback the callback invoked if there is an Exception optional + * @throws NotConnectedException + */ + public void sendIqWithResponseCallback(IQ iqRequest, PacketListener callback, + ExceptionCallback exceptionCallback) throws NotConnectedException; + + /** + * Send a IQ stanza and invoke callback if there is a result of + * {@link org.jivesoftware.smack.packet.IQ.Type#result} with that result IQ. If there is an + * error response exceptionCallback will be invoked, if not null, with the received + * error as {@link XMPPException.XMPPErrorException}. If there is no response after + * timeout, then exceptionCallback will be invoked with a + * {@link SmackException.NoResponseException}. + * + * @param iqRequest the IQ stanza to send (required) + * @param callback the callback invoked if there is result response (required) + * @param exceptionCallback the callback invoked if there is an Exception optional + * @param timeout the timeout in milliseconds to wait for a response + * @throws NotConnectedException + */ + public void sendIqWithResponseCallback(IQ iqRequest, final PacketListener callback, + final ExceptionCallback exceptionCallback, long timeout) + throws NotConnectedException; } diff --git a/smack-core/src/main/java/org/jivesoftware/smack/XMPPException.java b/smack-core/src/main/java/org/jivesoftware/smack/XMPPException.java index 694a0cb00..a27819f02 100644 --- a/smack-core/src/main/java/org/jivesoftware/smack/XMPPException.java +++ b/smack-core/src/main/java/org/jivesoftware/smack/XMPPException.java @@ -17,6 +17,7 @@ package org.jivesoftware.smack; +import org.jivesoftware.smack.packet.Packet; import org.jivesoftware.smack.packet.StreamError; import org.jivesoftware.smack.packet.XMPPError; @@ -133,6 +134,13 @@ public abstract class XMPPException extends Exception { public String toString() { return getMessage(); } + + public static void ifHasErrorThenThrow(Packet packet) throws XMPPErrorException { + XMPPError xmppError = packet.getError(); + if (xmppError != null) { + throw new XMPPErrorException(xmppError); + } + } } public static class StreamErrorException extends XMPPException { diff --git a/smack-experimental/src/main/java/org/jivesoftware/smackx/carbons/CarbonManager.java b/smack-experimental/src/main/java/org/jivesoftware/smackx/carbons/CarbonManager.java index b56a64b36..19860e14f 100644 --- a/smack-experimental/src/main/java/org/jivesoftware/smackx/carbons/CarbonManager.java +++ b/smack-experimental/src/main/java/org/jivesoftware/smackx/carbons/CarbonManager.java @@ -29,7 +29,6 @@ import org.jivesoftware.smack.PacketListener; import org.jivesoftware.smack.XMPPConnectionRegistry; import org.jivesoftware.smack.XMPPException; import org.jivesoftware.smack.XMPPException.XMPPErrorException; -import org.jivesoftware.smack.filter.IQReplyFilter; import org.jivesoftware.smack.packet.IQ; import org.jivesoftware.smack.packet.Message; import org.jivesoftware.smack.packet.Packet; @@ -121,17 +120,11 @@ public class CarbonManager extends Manager { public void sendCarbonsEnabled(final boolean new_state) throws NotConnectedException { IQ setIQ = carbonsEnabledIQ(new_state); - connection().addPacketListener(new PacketListener() { + connection().sendIqWithResponseCallback(setIQ, new PacketListener() { public void processPacket(Packet packet) { - IQ result = (IQ)packet; - if (result.getType() == IQ.Type.result) { - enabled_state = new_state; - } - connection().removePacketListener(this); + enabled_state = new_state; } - }, new IQReplyFilter(setIQ, connection())); - - connection().sendPacket(setIQ); + }); } /**