Allows PacketCollector's to cancel each other

This is useful for cases where a result set is requested, as it's the
case in XEP-13 and XEP-313.

Also adds
XMPPConnection.createPacketCollector(PacketCollector.Configuration).
This commit is contained in:
Florian Schmaus 2015-01-16 17:24:59 +01:00
parent d099e7b16d
commit 2e23a6f150
5 changed files with 117 additions and 46 deletions

View File

@ -715,7 +715,13 @@ public abstract class AbstractXMPPConnection implements XMPPConnection {
@Override
public PacketCollector createPacketCollector(PacketFilter packetFilter) {
PacketCollector collector = new PacketCollector(this, packetFilter);
PacketCollector.Configuration configuration = PacketCollector.newConfiguration().setPacketFilter(packetFilter);
return createPacketCollector(configuration);
}
@Override
public PacketCollector createPacketCollector(PacketCollector.Configuration configuration) {
PacketCollector collector = new PacketCollector(this, configuration);
// Add the collector to the list of active collectors.
collectors.add(collector);
return collector;

View File

@ -47,6 +47,12 @@ public class PacketCollector {
private final PacketFilter packetFilter;
private final ArrayBlockingQueue<Packet> resultQueue;
/**
* The packet collector which timeout for the next result will get reset once this collector collects a stanza.
*/
private final PacketCollector collectorToReset;
private final XMPPConnection connection;
private boolean cancelled = false;
@ -56,24 +62,13 @@ public class PacketCollector {
* all packets will match this collector.
*
* @param connection the connection the collector is tied to.
* @param packetFilter determines which packets will be returned by this collector.
* @param configuration the configuration used to construct this collector
*/
protected PacketCollector(XMPPConnection connection, PacketFilter packetFilter) {
this(connection, packetFilter, SmackConfiguration.getPacketCollectorSize());
}
/**
* Creates a new packet collector. If the packet filter is <tt>null</tt>, then
* all packets will match this collector.
*
* @param connection the connection the collector is tied to.
* @param packetFilter determines which packets will be returned by this collector.
* @param maxSize the maximum number of packets that will be stored in the collector.
*/
protected PacketCollector(XMPPConnection connection, PacketFilter packetFilter, int maxSize) {
protected PacketCollector(XMPPConnection connection, Configuration configuration) {
this.connection = connection;
this.packetFilter = packetFilter;
this.resultQueue = new ArrayBlockingQueue<Packet>(maxSize);
this.packetFilter = configuration.packetFilter;
this.resultQueue = new ArrayBlockingQueue<>(configuration.size);
this.collectorToReset = configuration.collectorToReset;
}
/**
@ -156,12 +151,14 @@ public class PacketCollector {
* Returns the next available packet. The method call will block until the connection's default
* timeout has elapsed.
*
* @return the next availabe packet.
* @return the next available packet.
*/
public <P extends Packet> P nextResult() {
return nextResult(connection.getPacketReplyTimeout());
}
private volatile long waitStart;
/**
* Returns the next available packet. The method call will block (not return)
* until a packet is available or the <tt>timeout</tt> has elapsed. If the
@ -175,16 +172,20 @@ public class PacketCollector {
throwIfCancelled();
P res = null;
long remainingWait = timeout;
final long waitStart = System.currentTimeMillis();
while (res == null && remainingWait > 0) {
waitStart = System.currentTimeMillis();
do {
try {
res = (P) resultQueue.poll(remainingWait, TimeUnit.MILLISECONDS);
remainingWait = timeout - (System.currentTimeMillis() - waitStart);
} catch (InterruptedException e) {
}
catch (InterruptedException e) {
LOGGER.log(Level.FINE, "nextResult was interrupted", e);
}
}
return res;
if (res != null) {
return res;
}
remainingWait = timeout - (System.currentTimeMillis() - waitStart);
} while (remainingWait > 0);
return null;
}
/**
@ -243,6 +244,9 @@ public class PacketCollector {
// Since we know the queue is full, this poll should never actually block.
resultQueue.poll();
}
if (collectorToReset != null) {
collectorToReset.waitStart = System.currentTimeMillis();
}
}
}
@ -251,4 +255,58 @@ public class PacketCollector {
throw new IllegalStateException("Packet collector already cancelled");
}
}
/**
* Get a new packet collector configuration instance.
*
* @return a new packet collector configuration.
*/
public static Configuration newConfiguration() {
return new Configuration();
}
public static class Configuration {
private PacketFilter packetFilter;
private int size = SmackConfiguration.getPacketCollectorSize();
private PacketCollector collectorToReset;
private Configuration() {
}
/**
* Set the packet filter used by this collector. If <code>null</code>, then all packets will
* get collected by this collector.
*
* @param packetFilter
* @return a reference to this configuration.
*/
public Configuration setPacketFilter(PacketFilter packetFilter) {
this.packetFilter = packetFilter;
return this;
}
/**
* Set the maximum size of this collector, i.e. how many stanzas this collector will collect
* before dropping old ones.
*
* @param size
* @return a reference to this configuration.
*/
public Configuration setSize(int size) {
this.size = size;
return this;
}
/**
* Set the collector which timeout for the next result is reset once this collector collects
* a packet.
*
* @param collector
* @return a reference to this configuration.
*/
public Configuration setCollectorToReset(PacketCollector collector) {
this.collectorToReset = collector;
return this;
}
}
}

View File

@ -239,6 +239,19 @@ public interface XMPPConnection {
*/
public PacketCollector createPacketCollector(PacketFilter packetFilter);
/**
* Create a new packet collector with the given packet collector configuration.
* <p>
* Please make sure to cancel the collector when it is no longer required. See also
* {@link #createPacketCollector(PacketFilter)}.
* </p>
*
* @param configuration the packet collector configuration.
* @return a new packet collector.
* @since 4.1
*/
public PacketCollector createPacketCollector(PacketCollector.Configuration configuration);
/**
* Remove a packet collector of this connection.
*

View File

@ -189,7 +189,7 @@ public class PacketCollectorTest
{
protected TestPacketCollector(XMPPConnection conection, PacketFilter packetFilter, int size)
{
super(conection, packetFilter, size);
super(conection, PacketCollector.newConfiguration().setPacketFilter(packetFilter).setSize(size));
}
}

View File

@ -61,7 +61,7 @@ public class OfflineMessageManager {
private final XMPPConnection connection;
private static final PacketFilter PACKET_FILTER = new AndFilter(new PacketExtensionFilter(
new OfflineMessageInfo()), new PacketTypeFilter(Message.class));
new OfflineMessageInfo()), PacketTypeFilter.MESSAGE);
public OfflineMessageManager(XMPPConnection connection) {
this.connection = connection;
@ -182,36 +182,30 @@ public class OfflineMessageManager {
* @throws NotConnectedException
*/
public List<Message> getMessages() throws NoResponseException, XMPPErrorException, NotConnectedException {
List<Message> messages = new ArrayList<Message>();
OfflineMessageRequest request = new OfflineMessageRequest();
request.setFetch(true);
PacketCollector messageCollector = connection.createPacketCollector(PACKET_FILTER);
PacketCollector resultCollector = connection.createPacketCollectorAndSend(request);
PacketCollector.Configuration messageCollectorConfiguration = PacketCollector.newConfiguration().setPacketFilter(PACKET_FILTER).setCollectorToReset(resultCollector);
PacketCollector messageCollector = connection.createPacketCollector(messageCollectorConfiguration);
List<Message> messages = null;
try {
// Collect the received offline messages
Message message = messageCollector.nextResult();
while (message != null) {
messages.add(message);
// It is important that we query the resultCollector before the messageCollector
Packet result = resultCollector.pollResultOrThrow();
message = messageCollector.pollResult();
if (message == null && result != null) {
// No new messages, but we have a non-error IQ response, we are done
return messages;
} else if (message != null) {
// We have received a message without waiting, great, continue to add this message and proceed with
// the loop
continue;
}
message = messageCollector.nextResult();
}
resultCollector.nextResultOrThrow();
// Be extra safe, cancel the message collector right here so that it does not collector
// other messages that eventually match (although I've no idea how this could happen in
// case of XEP-13).
messageCollector.cancel();
messages = new ArrayList<>(messageCollector.getCollectedCount());
Message message;
while ((message = messageCollector.pollResult()) != null) {
messages.add(message);
}
}
finally {
// Ensure that the message collector is canceled even if nextResultOrThrow threw. It
// doesn't matter if we cancel the message collector twice
messageCollector.cancel();
resultCollector.cancel();
}
return messages;
}