Add XMPPConnection.createPacketcollectorAndSend(PacketFilter, Packet)

Using

createPacketCollector(filter);
sendPacket(packet);

was error prone, i.e. the PacketCollector could leak if sendPacket()
would throw an exception and the user forgot to call
PacketCollector.cancel(). For cases where
createPacketCollectorAndSend(IQ) is not sufficient (because we don't
send IQs), createPacketCollectorAndSend(PacketFilter, Packet) is now
used, which does take care that the PacketCollector does not leak if
sendPacket() throws an Exception.
This commit is contained in:
Florian Schmaus 2014-10-13 10:45:00 +02:00
parent 8ce474b0df
commit 98a3c46e9a
8 changed files with 47 additions and 47 deletions

View File

@ -422,26 +422,14 @@ public abstract class AbstractXMPPConnection implements XMPPConnection {
// Note that we can not use IQReplyFilter here, since the users full JID is not yet // Note that we can not use IQReplyFilter here, since the users full JID is not yet
// available. It will become available right after the resource has been successfully bound. // available. It will become available right after the resource has been successfully bound.
Bind bindResource = Bind.newSet(resource); Bind bindResource = Bind.newSet(resource);
PacketCollector packetCollector = createPacketCollector(new PacketIDFilter(bindResource)); PacketCollector packetCollector = createPacketCollectorAndSend(new PacketIDFilter(bindResource), bindResource);
try {
sendPacket(bindResource);
} catch (NotConnectedException e) {
packetCollector.cancel();
throw e;
}
Bind response = packetCollector.nextResultOrThrow(); Bind response = packetCollector.nextResultOrThrow();
user = response.getJid(); user = response.getJid();
setServiceName(XmppStringUtils.parseDomain(user)); setServiceName(XmppStringUtils.parseDomain(user));
if (hasFeature(Session.ELEMENT, Session.NAMESPACE) && !getConfiguration().isLegacySessionDisabled()) { if (hasFeature(Session.ELEMENT, Session.NAMESPACE) && !getConfiguration().isLegacySessionDisabled()) {
Session session = new Session(); Session session = new Session();
packetCollector = createPacketCollector(new PacketIDFilter(session)); packetCollector = createPacketCollectorAndSend(new PacketIDFilter(session), session);
try {
sendPacket(session);
} catch (NotConnectedException e) {
packetCollector.cancel();
throw e;
}
packetCollector.nextResultOrThrow(); packetCollector.nextResultOrThrow();
} }
} }
@ -639,12 +627,20 @@ public abstract class AbstractXMPPConnection implements XMPPConnection {
public PacketCollector createPacketCollectorAndSend(IQ packet) throws NotConnectedException { public PacketCollector createPacketCollectorAndSend(IQ packet) throws NotConnectedException {
PacketFilter packetFilter = new IQReplyFilter(packet, this); PacketFilter packetFilter = new IQReplyFilter(packet, this);
// Create the packet collector before sending the packet // Create the packet collector before sending the packet
PacketCollector packetCollector = createPacketCollectorAndSend(packetFilter, packet);
return packetCollector;
}
@Override
public PacketCollector createPacketCollectorAndSend(PacketFilter packetFilter, Packet packet)
throws NotConnectedException {
// Create the packet collector before sending the packet
PacketCollector packetCollector = createPacketCollector(packetFilter); PacketCollector packetCollector = createPacketCollector(packetFilter);
try { try {
// Now we can send the packet as the collector has been created // Now we can send the packet as the collector has been created
sendPacket(packet); sendPacket(packet);
} }
catch (NotConnectedException e) { catch (NotConnectedException | RuntimeException e) {
packetCollector.cancel(); packetCollector.cancel();
throw e; throw e;
} }

View File

@ -208,6 +208,26 @@ public interface XMPPConnection {
* a specific result. * a specific result.
* *
* @param packetFilter the packet filter to use. * @param packetFilter the packet filter to use.
* @param packet the packet to send right after the collector got created
* @return a new packet collector.
*/
public PacketCollector createPacketCollectorAndSend(PacketFilter packetFilter, Packet packet)
throws NotConnectedException;
/**
* Creates a new packet collector for this connection. A packet filter
* determines which packets will be accumulated by the collector. A
* PacketCollector is more suitable to use than a {@link PacketListener}
* when you need to wait for a specific result.
* <p>
* <b>Note:</b> If you send a Packet right after using this method, then
* consider using
* {@link #createPacketCollectorAndSend(PacketFilter, Packet)} instead.
* Otherwise make sure cancel the PacketCollector in every case, e.g. even
* if an exception is thrown, or otherwise you may leak the PacketCollector.
* </p>
*
* @param packetFilter the packet filter to use.
* @return a new packet collector. * @return a new packet collector.
*/ */
public PacketCollector createPacketCollector(PacketFilter packetFilter); public PacketCollector createPacketCollector(PacketFilter packetFilter);

View File

@ -80,10 +80,9 @@ public class FaultTolerantNegotiator extends StreamNegotiator {
} }
public InputStream createIncomingStream(StreamInitiation initiation) throws SmackException { public InputStream createIncomingStream(StreamInitiation initiation) throws SmackException {
PacketCollector collector = connection.createPacketCollector( PacketCollector collector = connection.createPacketCollectorAndSend(
getInitiationPacketFilter(initiation.getFrom(), initiation.getSessionID())); getInitiationPacketFilter(initiation.getFrom(), initiation.getSessionID()),
super.createInitiationAccept(initiation, getNamespaces()));
connection.sendPacket(super.createInitiationAccept(initiation, getNamespaces()));
ExecutorService threadPoolExecutor = Executors.newFixedThreadPool(2); ExecutorService threadPoolExecutor = Executors.newFixedThreadPool(2);
CompletionService<InputStream> service CompletionService<InputStream> service

View File

@ -79,8 +79,7 @@ public abstract class StreamNegotiator {
// establish collector to await response // establish collector to await response
PacketCollector collector = connection PacketCollector collector = connection
.createPacketCollector(getInitiationPacketFilter(initiation.getFrom(), initiation.getSessionID())); .createPacketCollectorAndSend(getInitiationPacketFilter(initiation.getFrom(), initiation.getSessionID()), response);
connection.sendPacket(response);
Packet streamMethodInitiation = collector.nextResultOrThrow(); Packet streamMethodInitiation = collector.nextResultOrThrow();

View File

@ -290,8 +290,7 @@ public class AccountManager extends Manager {
} }
private PacketCollector createPacketCollectorAndSend(IQ req) throws NotConnectedException { private PacketCollector createPacketCollectorAndSend(IQ req) throws NotConnectedException {
PacketCollector collector = connection().createPacketCollector(new PacketIDFilter(req.getPacketID())); PacketCollector collector = connection().createPacketCollectorAndSend(new PacketIDFilter(req.getPacketID()), req);
connection().sendPacket(req);
return collector; return collector;
} }
} }

View File

@ -469,9 +469,7 @@ public class MultiUserChat {
+ nickname), new PacketTypeFilter(Presence.class)); + nickname), new PacketTypeFilter(Presence.class));
PacketCollector response = null; PacketCollector response = null;
response = connection.createPacketCollector(responseFilter); response = connection.createPacketCollectorAndSend(responseFilter, joinPresence);
// Send join packet.
connection.sendPacket(joinPresence);
// Wait up to a certain number of seconds for a reply. // Wait up to a certain number of seconds for a reply.
Presence presence = (Presence) response.nextResultOrThrow(timeout); Presence presence = (Presence) response.nextResultOrThrow(timeout);
@ -1080,9 +1078,7 @@ public class MultiUserChat {
new AndFilter( new AndFilter(
FromMatchesFilter.createFull(room + "/" + nickname), FromMatchesFilter.createFull(room + "/" + nickname),
new PacketTypeFilter(Presence.class)); new PacketTypeFilter(Presence.class));
PacketCollector response = connection.createPacketCollector(responseFilter); PacketCollector response = connection.createPacketCollectorAndSend(responseFilter, joinPresence);
// Send join packet.
connection.sendPacket(joinPresence);
// Wait up to a certain number of seconds for a reply. If there is a negative reply, an // Wait up to a certain number of seconds for a reply. If there is a negative reply, an
// exception will be thrown // exception will be thrown
response.nextResultOrThrow(); response.nextResultOrThrow();
@ -1905,9 +1901,7 @@ public class MultiUserChat {
return subject.equals(msg.getSubject()); return subject.equals(msg.getSubject());
} }
}); });
PacketCollector response = connection.createPacketCollector(responseFilter); PacketCollector response = connection.createPacketCollectorAndSend(responseFilter, message);
// Send change subject packet.
connection.sendPacket(message);
// Wait up to a certain number of seconds for a reply. // Wait up to a certain number of seconds for a reply.
response.nextResultOrThrow(); response.nextResultOrThrow();
} }

View File

@ -301,9 +301,8 @@ public class AgentSession {
presence.addExtension(new DefaultPacketExtension(AgentStatus.ELEMENT_NAME, presence.addExtension(new DefaultPacketExtension(AgentStatus.ELEMENT_NAME,
AgentStatus.NAMESPACE)); AgentStatus.NAMESPACE));
PacketCollector collector = this.connection.createPacketCollector(new AndFilter(new PacketTypeFilter(Presence.class), FromMatchesFilter.create(workgroupJID))); PacketCollector collector = this.connection.createPacketCollectorAndSend(new AndFilter(
new PacketTypeFilter(Presence.class), FromMatchesFilter.create(workgroupJID)), presence);
connection.sendPacket(presence);
presence = (Presence)collector.nextResultOrThrow(); presence = (Presence)collector.nextResultOrThrow();
@ -401,11 +400,9 @@ public class AgentSession {
presence.addExtension(agentStatus); presence.addExtension(agentStatus);
presence.addExtension(new MetaData(this.metaData)); presence.addExtension(new MetaData(this.metaData));
PacketCollector collector = this.connection.createPacketCollector(new AndFilter( PacketCollector collector = this.connection.createPacketCollectorAndSend(new AndFilter(
new PacketTypeFilter(Presence.class), new PacketTypeFilter(Presence.class),
FromMatchesFilter.create(workgroupJID))); FromMatchesFilter.create(workgroupJID)), presence);
this.connection.sendPacket(presence);
collector.nextResultOrThrow(); collector.nextResultOrThrow();
} }
@ -447,10 +444,8 @@ public class AgentSession {
} }
presence.addExtension(new MetaData(this.metaData)); presence.addExtension(new MetaData(this.metaData));
PacketCollector collector = this.connection.createPacketCollector(new AndFilter(new PacketTypeFilter(Presence.class), PacketCollector collector = this.connection.createPacketCollectorAndSend(new AndFilter(new PacketTypeFilter(Presence.class),
FromMatchesFilter.create(workgroupJID))); FromMatchesFilter.create(workgroupJID)), presence);
this.connection.sendPacket(presence);
collector.nextResultOrThrow(); collector.nextResultOrThrow();
} }

View File

@ -180,10 +180,8 @@ public class Workgroup {
directedPresence.setTo(workgroupJID); directedPresence.setTo(workgroupJID);
PacketFilter typeFilter = new PacketTypeFilter(Presence.class); PacketFilter typeFilter = new PacketTypeFilter(Presence.class);
PacketFilter fromFilter = FromMatchesFilter.create(workgroupJID); PacketFilter fromFilter = FromMatchesFilter.create(workgroupJID);
PacketCollector collector = connection.createPacketCollector(new AndFilter(fromFilter, PacketCollector collector = connection.createPacketCollectorAndSend(new AndFilter(fromFilter,
typeFilter)); typeFilter), directedPresence);
connection.sendPacket(directedPresence);
Presence response = (Presence)collector.nextResultOrThrow(); Presence response = (Presence)collector.nextResultOrThrow();
return Presence.Type.available == response.getType(); return Presence.Type.available == response.getType();