diff --git a/smack-core/src/main/java/org/jivesoftware/smack/AbstractXMPPConnection.java b/smack-core/src/main/java/org/jivesoftware/smack/AbstractXMPPConnection.java index 6ca816092..4f415c763 100644 --- a/smack-core/src/main/java/org/jivesoftware/smack/AbstractXMPPConnection.java +++ b/smack-core/src/main/java/org/jivesoftware/smack/AbstractXMPPConnection.java @@ -715,7 +715,13 @@ public abstract class AbstractXMPPConnection implements XMPPConnection { @Override public PacketCollector createPacketCollector(PacketFilter packetFilter) { - PacketCollector collector = new PacketCollector(this, packetFilter); + PacketCollector.Configuration configuration = PacketCollector.newConfiguration().setPacketFilter(packetFilter); + return createPacketCollector(configuration); + } + + @Override + public PacketCollector createPacketCollector(PacketCollector.Configuration configuration) { + PacketCollector collector = new PacketCollector(this, configuration); // Add the collector to the list of active collectors. collectors.add(collector); return collector; diff --git a/smack-core/src/main/java/org/jivesoftware/smack/PacketCollector.java b/smack-core/src/main/java/org/jivesoftware/smack/PacketCollector.java index 0f2e09fef..f3b4f0169 100644 --- a/smack-core/src/main/java/org/jivesoftware/smack/PacketCollector.java +++ b/smack-core/src/main/java/org/jivesoftware/smack/PacketCollector.java @@ -47,6 +47,12 @@ public class PacketCollector { private final PacketFilter packetFilter; private final ArrayBlockingQueue resultQueue; + + /** + * The packet collector which timeout for the next result will get reset once this collector collects a stanza. + */ + private final PacketCollector collectorToReset; + private final XMPPConnection connection; private boolean cancelled = false; @@ -56,24 +62,13 @@ public class PacketCollector { * all packets will match this collector. * * @param connection the connection the collector is tied to. - * @param packetFilter determines which packets will be returned by this collector. + * @param configuration the configuration used to construct this collector */ - protected PacketCollector(XMPPConnection connection, PacketFilter packetFilter) { - this(connection, packetFilter, SmackConfiguration.getPacketCollectorSize()); - } - - /** - * Creates a new packet collector. If the packet filter is null, then - * all packets will match this collector. - * - * @param connection the connection the collector is tied to. - * @param packetFilter determines which packets will be returned by this collector. - * @param maxSize the maximum number of packets that will be stored in the collector. - */ - protected PacketCollector(XMPPConnection connection, PacketFilter packetFilter, int maxSize) { + protected PacketCollector(XMPPConnection connection, Configuration configuration) { this.connection = connection; - this.packetFilter = packetFilter; - this.resultQueue = new ArrayBlockingQueue(maxSize); + this.packetFilter = configuration.packetFilter; + this.resultQueue = new ArrayBlockingQueue<>(configuration.size); + this.collectorToReset = configuration.collectorToReset; } /** @@ -156,12 +151,14 @@ public class PacketCollector { * Returns the next available packet. The method call will block until the connection's default * timeout has elapsed. * - * @return the next availabe packet. + * @return the next available packet. */ public

P nextResult() { return nextResult(connection.getPacketReplyTimeout()); } + private volatile long waitStart; + /** * Returns the next available packet. The method call will block (not return) * until a packet is available or the timeout has elapsed. If the @@ -175,16 +172,20 @@ public class PacketCollector { throwIfCancelled(); P res = null; long remainingWait = timeout; - final long waitStart = System.currentTimeMillis(); - while (res == null && remainingWait > 0) { + waitStart = System.currentTimeMillis(); + do { try { res = (P) resultQueue.poll(remainingWait, TimeUnit.MILLISECONDS); - remainingWait = timeout - (System.currentTimeMillis() - waitStart); - } catch (InterruptedException e) { + } + catch (InterruptedException e) { LOGGER.log(Level.FINE, "nextResult was interrupted", e); } - } - return res; + if (res != null) { + return res; + } + remainingWait = timeout - (System.currentTimeMillis() - waitStart); + } while (remainingWait > 0); + return null; } /** @@ -243,6 +244,9 @@ public class PacketCollector { // Since we know the queue is full, this poll should never actually block. resultQueue.poll(); } + if (collectorToReset != null) { + collectorToReset.waitStart = System.currentTimeMillis(); + } } } @@ -251,4 +255,58 @@ public class PacketCollector { throw new IllegalStateException("Packet collector already cancelled"); } } + + /** + * Get a new packet collector configuration instance. + * + * @return a new packet collector configuration. + */ + public static Configuration newConfiguration() { + return new Configuration(); + } + + public static class Configuration { + private PacketFilter packetFilter; + private int size = SmackConfiguration.getPacketCollectorSize(); + private PacketCollector collectorToReset; + + private Configuration() { + } + + /** + * Set the packet filter used by this collector. If null, then all packets will + * get collected by this collector. + * + * @param packetFilter + * @return a reference to this configuration. + */ + public Configuration setPacketFilter(PacketFilter packetFilter) { + this.packetFilter = packetFilter; + return this; + } + + /** + * Set the maximum size of this collector, i.e. how many stanzas this collector will collect + * before dropping old ones. + * + * @param size + * @return a reference to this configuration. + */ + public Configuration setSize(int size) { + this.size = size; + return this; + } + + /** + * Set the collector which timeout for the next result is reset once this collector collects + * a packet. + * + * @param collector + * @return a reference to this configuration. + */ + public Configuration setCollectorToReset(PacketCollector collector) { + this.collectorToReset = collector; + return this; + } + } } diff --git a/smack-core/src/main/java/org/jivesoftware/smack/XMPPConnection.java b/smack-core/src/main/java/org/jivesoftware/smack/XMPPConnection.java index fb4f25a17..5b12adb8a 100644 --- a/smack-core/src/main/java/org/jivesoftware/smack/XMPPConnection.java +++ b/smack-core/src/main/java/org/jivesoftware/smack/XMPPConnection.java @@ -239,6 +239,19 @@ public interface XMPPConnection { */ public PacketCollector createPacketCollector(PacketFilter packetFilter); + /** + * Create a new packet collector with the given packet collector configuration. + *

+ * Please make sure to cancel the collector when it is no longer required. See also + * {@link #createPacketCollector(PacketFilter)}. + *

+ * + * @param configuration the packet collector configuration. + * @return a new packet collector. + * @since 4.1 + */ + public PacketCollector createPacketCollector(PacketCollector.Configuration configuration); + /** * Remove a packet collector of this connection. * diff --git a/smack-core/src/test/java/org/jivesoftware/smack/PacketCollectorTest.java b/smack-core/src/test/java/org/jivesoftware/smack/PacketCollectorTest.java index cb6a5067f..81b897816 100644 --- a/smack-core/src/test/java/org/jivesoftware/smack/PacketCollectorTest.java +++ b/smack-core/src/test/java/org/jivesoftware/smack/PacketCollectorTest.java @@ -189,7 +189,7 @@ public class PacketCollectorTest { protected TestPacketCollector(XMPPConnection conection, PacketFilter packetFilter, int size) { - super(conection, packetFilter, size); + super(conection, PacketCollector.newConfiguration().setPacketFilter(packetFilter).setSize(size)); } } diff --git a/smack-extensions/src/main/java/org/jivesoftware/smackx/offline/OfflineMessageManager.java b/smack-extensions/src/main/java/org/jivesoftware/smackx/offline/OfflineMessageManager.java index 7cd707364..2fedb9fcd 100644 --- a/smack-extensions/src/main/java/org/jivesoftware/smackx/offline/OfflineMessageManager.java +++ b/smack-extensions/src/main/java/org/jivesoftware/smackx/offline/OfflineMessageManager.java @@ -61,7 +61,7 @@ public class OfflineMessageManager { private final XMPPConnection connection; private static final PacketFilter PACKET_FILTER = new AndFilter(new PacketExtensionFilter( - new OfflineMessageInfo()), new PacketTypeFilter(Message.class)); + new OfflineMessageInfo()), PacketTypeFilter.MESSAGE); public OfflineMessageManager(XMPPConnection connection) { this.connection = connection; @@ -182,36 +182,30 @@ public class OfflineMessageManager { * @throws NotConnectedException */ public List getMessages() throws NoResponseException, XMPPErrorException, NotConnectedException { - List messages = new ArrayList(); OfflineMessageRequest request = new OfflineMessageRequest(); request.setFetch(true); - PacketCollector messageCollector = connection.createPacketCollector(PACKET_FILTER); PacketCollector resultCollector = connection.createPacketCollectorAndSend(request); + PacketCollector.Configuration messageCollectorConfiguration = PacketCollector.newConfiguration().setPacketFilter(PACKET_FILTER).setCollectorToReset(resultCollector); + PacketCollector messageCollector = connection.createPacketCollector(messageCollectorConfiguration); + List messages = null; try { - // Collect the received offline messages - Message message = messageCollector.nextResult(); - while (message != null) { - messages.add(message); - // It is important that we query the resultCollector before the messageCollector - Packet result = resultCollector.pollResultOrThrow(); - message = messageCollector.pollResult(); - if (message == null && result != null) { - // No new messages, but we have a non-error IQ response, we are done - return messages; - } else if (message != null) { - // We have received a message without waiting, great, continue to add this message and proceed with - // the loop - continue; - } - message = messageCollector.nextResult(); - } resultCollector.nextResultOrThrow(); + // Be extra safe, cancel the message collector right here so that it does not collector + // other messages that eventually match (although I've no idea how this could happen in + // case of XEP-13). + messageCollector.cancel(); + messages = new ArrayList<>(messageCollector.getCollectedCount()); + Message message; + while ((message = messageCollector.pollResult()) != null) { + messages.add(message); + } } finally { + // Ensure that the message collector is canceled even if nextResultOrThrow threw. It + // doesn't matter if we cancel the message collector twice messageCollector.cancel(); - resultCollector.cancel(); } return messages; }