mirror of
https://github.com/vanitasvitae/Smack.git
synced 2024-11-23 04:22:05 +01:00
Add XMPPConnection.addAsyncPacketListener(PacketListener, PacketFilter)
and use this method in packet listeners that previously used Async.go().
This commit is contained in:
parent
1d2f9749b6
commit
f029b576a5
4 changed files with 75 additions and 25 deletions
|
@ -131,6 +131,8 @@ public abstract class AbstractXMPPConnection implements XMPPConnection {
|
||||||
private final Map<PacketListener, ListenerWrapper> recvListeners =
|
private final Map<PacketListener, ListenerWrapper> recvListeners =
|
||||||
new HashMap<PacketListener, ListenerWrapper>();
|
new HashMap<PacketListener, ListenerWrapper>();
|
||||||
|
|
||||||
|
private final Map<PacketListener, ListenerWrapper> asyncRecvListeners = new HashMap<>();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* List of PacketListeners that will be notified when a new packet was sent.
|
* List of PacketListeners that will be notified when a new packet was sent.
|
||||||
*/
|
*/
|
||||||
|
@ -741,6 +743,24 @@ public abstract class AbstractXMPPConnection implements XMPPConnection {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void addAsyncPacketListener(PacketListener packetListener, PacketFilter packetFilter) {
|
||||||
|
if (packetListener == null) {
|
||||||
|
throw new NullPointerException("Packet listener is null.");
|
||||||
|
}
|
||||||
|
ListenerWrapper wrapper = new ListenerWrapper(packetListener, packetFilter);
|
||||||
|
synchronized (asyncRecvListeners) {
|
||||||
|
asyncRecvListeners.put(packetListener, wrapper);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean removeAsyncPacketListener(PacketListener packetListener) {
|
||||||
|
synchronized (asyncRecvListeners) {
|
||||||
|
return asyncRecvListeners.remove(packetListener) != null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void addPacketSendingListener(PacketListener packetListener, PacketFilter packetFilter) {
|
public void addPacketSendingListener(PacketListener packetListener, PacketFilter packetFilter) {
|
||||||
if (packetListener == null) {
|
if (packetListener == null) {
|
||||||
|
@ -937,14 +957,39 @@ public abstract class AbstractXMPPConnection implements XMPPConnection {
|
||||||
*
|
*
|
||||||
* @param packet the packet to notify the PacketCollectors and receive listeners about.
|
* @param packet the packet to notify the PacketCollectors and receive listeners about.
|
||||||
*/
|
*/
|
||||||
protected void invokePacketCollectorsAndNotifyRecvListeners(Packet packet) {
|
protected void invokePacketCollectorsAndNotifyRecvListeners(final Packet packet) {
|
||||||
|
// First handle the async recv listeners. Note that this code is very similar to what follows a few lines below,
|
||||||
|
// the only difference is that asyncRecvListeners is used here and that the packet listeners are started in
|
||||||
|
// their own thread.
|
||||||
|
Collection<PacketListener> listenersToNotify = new LinkedList<PacketListener>();
|
||||||
|
synchronized (asyncRecvListeners) {
|
||||||
|
for (ListenerWrapper listenerWrapper : asyncRecvListeners.values()) {
|
||||||
|
if (listenerWrapper.filterMatches(packet)) {
|
||||||
|
listenersToNotify.add(listenerWrapper.getListener());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (final PacketListener listener : listenersToNotify) {
|
||||||
|
asyncGo(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
listener.processPacket(packet);
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOGGER.log(Level.SEVERE, "Exception in async packet listener", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
// Loop through all collectors and notify the appropriate ones.
|
// Loop through all collectors and notify the appropriate ones.
|
||||||
for (PacketCollector collector: collectors) {
|
for (PacketCollector collector: collectors) {
|
||||||
collector.processPacket(packet);
|
collector.processPacket(packet);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Notify the receive listeners interested in the packet
|
// Notify the receive listeners interested in the packet
|
||||||
List<PacketListener> listenersToNotify = new LinkedList<PacketListener>();
|
listenersToNotify = new LinkedList<PacketListener>();
|
||||||
synchronized (recvListeners) {
|
synchronized (recvListeners) {
|
||||||
for (ListenerWrapper listenerWrapper : recvListeners.values()) {
|
for (ListenerWrapper listenerWrapper : recvListeners.values()) {
|
||||||
if (listenerWrapper.filterMatches(packet)) {
|
if (listenerWrapper.filterMatches(packet)) {
|
||||||
|
|
|
@ -266,6 +266,29 @@ public interface XMPPConnection {
|
||||||
*/
|
*/
|
||||||
public boolean removePacketListener(PacketListener packetListener);
|
public boolean removePacketListener(PacketListener packetListener);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Registers an <b>asynchronous</b> packet listener with this connection. A packet listener will be invoked only
|
||||||
|
* when an incoming packet is received. A packet filter determines which packets will be delivered to the listener.
|
||||||
|
* If the same packet listener is added again with a different filter, only the new filter will be used.
|
||||||
|
* <p>
|
||||||
|
* Unlike {@link #addPacketListener(PacketListener, PacketFilter)} packet listeners added with this method will be
|
||||||
|
* invoked asynchronously in their own thread. Use this method if the order of the packet listeners must not depend
|
||||||
|
* on the order how the stanzas where received.
|
||||||
|
* </p>
|
||||||
|
*
|
||||||
|
* @param packetListener the packet listener to notify of new received packets.
|
||||||
|
* @param packetFilter the packet filter to use.
|
||||||
|
*/
|
||||||
|
public void addAsyncPacketListener(PacketListener packetListener, PacketFilter packetFilter);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Removes an <b>asynchronous</b> packet listener for received packets from this connection.
|
||||||
|
*
|
||||||
|
* @param packetListener the packet listener to remove.
|
||||||
|
* @return true if the packet listener was removed
|
||||||
|
*/
|
||||||
|
public boolean removeAsyncPacketListener(PacketListener packetListener);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Registers a packet listener with this connection. The listener will be
|
* Registers a packet listener with this connection. The listener will be
|
||||||
* notified of every packet that this connection sends. A packet filter determines
|
* notified of every packet that this connection sends. A packet filter determines
|
||||||
|
|
|
@ -26,7 +26,6 @@ import org.jivesoftware.smack.filter.PacketTypeFilter;
|
||||||
import org.jivesoftware.smack.packet.IQ;
|
import org.jivesoftware.smack.packet.IQ;
|
||||||
import org.jivesoftware.smack.packet.Packet;
|
import org.jivesoftware.smack.packet.Packet;
|
||||||
import org.jivesoftware.smack.packet.XMPPError;
|
import org.jivesoftware.smack.packet.XMPPError;
|
||||||
import org.jivesoftware.smack.util.Async;
|
|
||||||
import org.jivesoftware.smackx.si.packet.StreamInitiation;
|
import org.jivesoftware.smackx.si.packet.StreamInitiation;
|
||||||
import org.jxmpp.util.XmppStringUtils;
|
import org.jxmpp.util.XmppStringUtils;
|
||||||
|
|
||||||
|
@ -74,22 +73,13 @@ public class FileTransferManager extends Manager {
|
||||||
super(connection);
|
super(connection);
|
||||||
this.fileTransferNegotiator = FileTransferNegotiator
|
this.fileTransferNegotiator = FileTransferNegotiator
|
||||||
.getInstanceFor(connection);
|
.getInstanceFor(connection);
|
||||||
connection.addPacketListener(new PacketListener() {
|
connection.addAsyncPacketListener(new PacketListener() {
|
||||||
public void processPacket(Packet packet) {
|
public void processPacket(Packet packet) {
|
||||||
StreamInitiation si = (StreamInitiation) packet;
|
StreamInitiation si = (StreamInitiation) packet;
|
||||||
final FileTransferRequest request = new FileTransferRequest(FileTransferManager.this, si);
|
final FileTransferRequest request = new FileTransferRequest(FileTransferManager.this, si);
|
||||||
for (final FileTransferListener listener : listeners) {
|
for (final FileTransferListener listener : listeners) {
|
||||||
// Those listeners need to be called asynchronously, in
|
|
||||||
// order to not block further processing of incoming
|
|
||||||
// stanzas. They also may send further requests, whose
|
|
||||||
// responses are not processed if we do not call them
|
|
||||||
// asynchronously.
|
|
||||||
Async.go(new Runnable() {
|
|
||||||
public void run() {
|
|
||||||
listener.fileTransferRequest(request);
|
listener.fileTransferRequest(request);
|
||||||
}
|
}
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}, new AndFilter(new PacketTypeFilter(StreamInitiation.class), IQTypeFilter.SET));
|
}, new AndFilter(new PacketTypeFilter(StreamInitiation.class), IQTypeFilter.SET));
|
||||||
}
|
}
|
||||||
|
|
|
@ -43,7 +43,6 @@ import org.jivesoftware.smack.filter.NotFilter;
|
||||||
import org.jivesoftware.smack.filter.PacketTypeFilter;
|
import org.jivesoftware.smack.filter.PacketTypeFilter;
|
||||||
import org.jivesoftware.smack.packet.Message;
|
import org.jivesoftware.smack.packet.Message;
|
||||||
import org.jivesoftware.smack.packet.Packet;
|
import org.jivesoftware.smack.packet.Packet;
|
||||||
import org.jivesoftware.smack.util.Async;
|
|
||||||
import org.jivesoftware.smackx.disco.AbstractNodeInformationProvider;
|
import org.jivesoftware.smackx.disco.AbstractNodeInformationProvider;
|
||||||
import org.jivesoftware.smackx.disco.ServiceDiscoveryManager;
|
import org.jivesoftware.smackx.disco.ServiceDiscoveryManager;
|
||||||
import org.jivesoftware.smackx.disco.packet.DiscoverInfo;
|
import org.jivesoftware.smackx.disco.packet.DiscoverInfo;
|
||||||
|
@ -114,13 +113,11 @@ public class MultiUserChatManager extends Manager {
|
||||||
*/
|
*/
|
||||||
private final Map<String, WeakReference<MultiUserChat>> multiUserChats = new HashMap<String, WeakReference<MultiUserChat>>();
|
private final Map<String, WeakReference<MultiUserChat>> multiUserChats = new HashMap<String, WeakReference<MultiUserChat>>();
|
||||||
|
|
||||||
private final PacketListener invitationPacketListener;
|
|
||||||
|
|
||||||
private MultiUserChatManager(XMPPConnection connection) {
|
private MultiUserChatManager(XMPPConnection connection) {
|
||||||
super(connection);
|
super(connection);
|
||||||
// Listens for all messages that include a MUCUser extension and fire the invitation
|
// Listens for all messages that include a MUCUser extension and fire the invitation
|
||||||
// listeners if the message includes an invitation.
|
// listeners if the message includes an invitation.
|
||||||
invitationPacketListener = new PacketListener() {
|
PacketListener invitationPacketListener = new PacketListener() {
|
||||||
public void processPacket(Packet packet) {
|
public void processPacket(Packet packet) {
|
||||||
final Message message = (Message) packet;
|
final Message message = (Message) packet;
|
||||||
// Get the MUCUser extension
|
// Get the MUCUser extension
|
||||||
|
@ -130,18 +127,13 @@ public class MultiUserChatManager extends Manager {
|
||||||
// Fire event for invitation listeners
|
// Fire event for invitation listeners
|
||||||
final MultiUserChat muc = getMultiUserChat(packet.getFrom());
|
final MultiUserChat muc = getMultiUserChat(packet.getFrom());
|
||||||
for (final InvitationListener listener : invitationsListeners) {
|
for (final InvitationListener listener : invitationsListeners) {
|
||||||
Async.go(new Runnable() {
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
listener.invitationReceived(connection(), muc, mucUser.getInvite().getFrom(),
|
listener.invitationReceived(connection(), muc, mucUser.getInvite().getFrom(),
|
||||||
mucUser.getInvite().getReason(), mucUser.getPassword(), message);
|
mucUser.getInvite().getReason(), mucUser.getPassword(), message);
|
||||||
}
|
}
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
connection.addPacketListener(invitationPacketListener, INVITATION_FILTER);
|
connection.addAsyncPacketListener(invitationPacketListener, INVITATION_FILTER);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
Loading…
Reference in a new issue