Fix incoming file transfers

With bb8dcc9874 the concept if IQ request
handlers was introduced in Smack. This doesn't allow packet/stanza
collectors/listeners to filter for incoming IQ requests. Unfortunately
the file transfer code relied on this being able, so it broke with the
change.

There were two places where the file transfer code was listening for
incoming IQ requests:
- InitationListener(s)
- Negotiator(s)

With this change, we let the InitiationListener signal the existence of
an incoming initation request, send by an IQ of type 'set', using the
newly created EventManager utility.

The negotiator waits for those events to arrive and proceedes as it would
have done when the packet collector was used.
This commit is contained in:
Florian Schmaus 2015-02-27 10:02:48 +01:00
parent 21c0be5e2a
commit dde0cfd7f6
7 changed files with 171 additions and 167 deletions

View File

@ -0,0 +1,97 @@
/**
*
* Copyright 2015 Florian Schmaus
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jivesoftware.smack.util;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* The event manager class is used to perform actions and wait for an event, which is usually caused by the action (or maybe never occurs).
* <p>
* Events are distinguished by an unique event key. They can produce an event result, which can simply be null.
* </p>
* <p>
* The action is able to throw an exception.
* </p>
*
* @param <K> the event key.
* @param <R> the event result.
* @param <E> the exception which could be thrown by the action.
*/
public class EventManger<K, R, E extends Exception> {
private final Map<K,Reference<R>> events = new ConcurrentHashMap<>();
/**
* Perform an action and wait for an event.
* <p>
* The event is signaled with {@link #signalEvent(Object, Object)}.
* </p>
*
* @param eventKey the event key, must not be null.
* @param timeout the timeout to wait for the event in milliseconds.
* @param action the action to perform prior waiting for the event, must not be null.
* @return the event value, may be null.
* @throws InterruptedException if interrupted while waiting for the event.
* @throws E
*/
public R performActionAndWaitForEvent(K eventKey, long timeout, Callback<E> action) throws InterruptedException, E {
final Reference<R> reference = new Reference<>();
events.put(eventKey, reference);
try {
synchronized (reference) {
action.action();
reference.wait(timeout);
}
return reference.eventResult;
}
finally {
events.remove(eventKey);
}
}
/**
* Signal an event and the event result.
* <p>
* This method will return <code>false</code> if the event was not created with
* {@link #performActionAndWaitForEvent(Object, long, Callback)}.
* </p>
*
* @param eventKey the event key, must not be null.
* @param eventResult the event result, may be null.
* @return true if the event was found and signaled, false otherwise.
*/
public boolean signalEvent(K eventKey, R eventResult) {
final Reference<R> reference = events.get(eventKey);
if (reference == null) {
return false;
}
reference.eventResult = eventResult;
synchronized(reference) {
reference.notifyAll();
}
return true;
}
private static class Reference<V> {
volatile V eventResult;
}
public interface Callback<E extends Exception> {
public void action() throws E;
}
}

View File

@ -27,6 +27,7 @@ import org.jivesoftware.smack.packet.IQ;
import org.jivesoftware.smack.packet.Stanza;
import org.jivesoftware.smackx.bytestreams.BytestreamListener;
import org.jivesoftware.smackx.bytestreams.ibb.packet.Open;
import org.jivesoftware.smackx.filetransfer.StreamNegotiator;
/**
@ -86,6 +87,8 @@ class InitiationListener extends AbstractIqRequestHandler {
return;
}
StreamNegotiator.signal(ibbRequest.getFrom() + '\t' + ibbRequest.getSessionID(), ibbRequest);
// ignore request if in ignore list
if (this.manager.getIgnoredBytestreamRequests().remove(ibbRequest.getSessionID()))
return;

View File

@ -27,6 +27,7 @@ import org.jivesoftware.smack.packet.IQ;
import org.jivesoftware.smack.packet.Stanza;
import org.jivesoftware.smackx.bytestreams.BytestreamListener;
import org.jivesoftware.smackx.bytestreams.socks5.packet.Bytestream;
import org.jivesoftware.smackx.filetransfer.StreamNegotiator;
/**
* InitiationListener handles all incoming SOCKS5 Bytestream initiation requests. If there are no
@ -77,6 +78,8 @@ final class InitiationListener extends AbstractIqRequestHandler {
private void processRequest(Stanza packet) throws NotConnectedException {
Bytestream byteStreamRequest = (Bytestream) packet;
StreamNegotiator.signal(byteStreamRequest.getFrom() + '\t' + byteStreamRequest.getSessionID(), byteStreamRequest);
// ignore request if in ignore list
if (this.manager.getIgnoredBytestreamRequests().remove(byteStreamRequest.getSessionID())) {
return;

View File

@ -18,26 +18,15 @@ package org.jivesoftware.smackx.filetransfer;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.jivesoftware.smack.PacketCollector;
import org.jivesoftware.smack.SmackException;
import org.jivesoftware.smack.SmackException.NoResponseException;
import org.jivesoftware.smack.XMPPConnection;
import org.jivesoftware.smack.XMPPException;
import org.jivesoftware.smack.XMPPException.XMPPErrorException;
import org.jivesoftware.smack.filter.OrFilter;
import org.jivesoftware.smack.filter.StanzaFilter;
import org.jivesoftware.smack.packet.IQ;
import org.jivesoftware.smack.packet.Stanza;
import org.jivesoftware.smackx.bytestreams.ibb.packet.Open;
import org.jivesoftware.smackx.bytestreams.socks5.packet.Bytestream;
import org.jivesoftware.smackx.si.packet.StreamInitiation;
@ -51,8 +40,6 @@ public class FaultTolerantNegotiator extends StreamNegotiator {
private final StreamNegotiator primaryNegotiator;
private final StreamNegotiator secondaryNegotiator;
private final XMPPConnection connection;
private StanzaFilter primaryFilter;
private StanzaFilter secondaryFilter;
public FaultTolerantNegotiator(XMPPConnection connection, StreamNegotiator primary,
StreamNegotiator secondary) {
@ -61,12 +48,10 @@ public class FaultTolerantNegotiator extends StreamNegotiator {
this.connection = connection;
}
public StanzaFilter getInitiationPacketFilter(String from, String streamID) {
if (primaryFilter == null || secondaryFilter == null) {
primaryFilter = primaryNegotiator.getInitiationPacketFilter(from, streamID);
secondaryFilter = secondaryNegotiator.getInitiationPacketFilter(from, streamID);
}
return new OrFilter(primaryFilter, secondaryFilter);
@Override
public void newStreamInitiation(String from, String streamID) {
primaryNegotiator.newStreamInitiation(from, streamID);
secondaryNegotiator.newStreamInitiation(from, streamID);
}
InputStream negotiateIncomingStream(Stanza streamInitiation) {
@ -74,73 +59,28 @@ public class FaultTolerantNegotiator extends StreamNegotiator {
"stream method.");
}
final Stanza initiateIncomingStream(XMPPConnection connection, StreamInitiation initiation) {
throw new UnsupportedOperationException("Initiation handled by createIncomingStream " +
"method");
}
public InputStream createIncomingStream(final StreamInitiation initiation) throws SmackException, XMPPErrorException {
// This could be either an xep47 ibb 'open' iq or an xep65 streamhost iq
IQ initationSet = initiateIncomingStream(connection, initiation);
public InputStream createIncomingStream(StreamInitiation initiation) throws SmackException {
PacketCollector collector = connection.createPacketCollectorAndSend(
getInitiationPacketFilter(initiation.getFrom(), initiation.getSessionID()),
super.createInitiationAccept(initiation, getNamespaces()));
ExecutorService threadPoolExecutor = Executors.newFixedThreadPool(2);
CompletionService<InputStream> service
= new ExecutorCompletionService<InputStream>(threadPoolExecutor);
List<Future<InputStream>> futures = new ArrayList<Future<InputStream>>();
InputStream stream = null;
SmackException exception = null;
StreamNegotiator streamNegotiator = determineNegotiator(initationSet);
try {
futures.add(service.submit(new NegotiatorService(collector)));
futures.add(service.submit(new NegotiatorService(collector)));
int i = 0;
while (stream == null && i < futures.size()) {
Future<InputStream> future;
try {
i++;
future = service.poll(connection.getPacketReplyTimeout(), TimeUnit.MILLISECONDS);
}
catch (InterruptedException e) {
continue;
}
if (future == null) {
continue;
}
try {
stream = future.get();
}
catch (InterruptedException e) {
/* Do Nothing */
}
catch (ExecutionException e) {
exception = new SmackException(e.getCause());
}
}
return streamNegotiator.negotiateIncomingStream(initationSet);
}
finally {
for (Future<InputStream> future : futures) {
future.cancel(true);
}
collector.cancel();
threadPoolExecutor.shutdownNow();
catch (InterruptedException e) {
// TODO remove this try/catch once merged into 4.2's master branch
throw new IllegalStateException(e);
}
if (stream == null) {
if (exception != null) {
throw exception;
}
else {
throw new SmackException("File transfer negotiation failed.");
}
}
return stream;
}
private StreamNegotiator determineNegotiator(Stanza streamInitiation) {
return primaryFilter.accept(streamInitiation) ? primaryNegotiator : secondaryNegotiator;
if (streamInitiation instanceof Bytestream) {
return primaryNegotiator;
} else if (streamInitiation instanceof Open){
return secondaryNegotiator;
} else {
throw new IllegalStateException("Unknown stream initation type");
}
}
public OutputStream createOutgoingStream(String streamID, String initiator, String target)
@ -167,18 +107,4 @@ public class FaultTolerantNegotiator extends StreamNegotiator {
return namespaces;
}
private class NegotiatorService implements Callable<InputStream> {
private PacketCollector collector;
NegotiatorService(PacketCollector collector) {
this.collector = collector;
}
public InputStream call() throws XMPPErrorException, InterruptedException, NoResponseException, SmackException {
Stanza streamInitiation = collector.nextResultOrThrow();
StreamNegotiator negotiator = determineNegotiator(streamInitiation);
return negotiator.negotiateIncomingStream(streamInitiation);
}
}
}

View File

@ -23,13 +23,7 @@ import org.jivesoftware.smack.SmackException.NoResponseException;
import org.jivesoftware.smack.SmackException.NotConnectedException;
import org.jivesoftware.smack.XMPPConnection;
import org.jivesoftware.smack.XMPPException.XMPPErrorException;
import org.jivesoftware.smack.filter.AndFilter;
import org.jivesoftware.smack.filter.FlexibleStanzaTypeFilter;
import org.jivesoftware.smack.filter.FromMatchesFilter;
import org.jivesoftware.smack.filter.StanzaFilter;
import org.jivesoftware.smack.packet.IQ;
import org.jivesoftware.smack.packet.Stanza;
import org.jivesoftware.smack.util.Objects;
import org.jivesoftware.smackx.bytestreams.ibb.InBandBytestreamManager;
import org.jivesoftware.smackx.bytestreams.ibb.InBandBytestreamRequest;
import org.jivesoftware.smackx.bytestreams.ibb.InBandBytestreamSession;
@ -82,15 +76,14 @@ public class IBBTransferNegotiator extends StreamNegotiator {
return negotiateIncomingStream(streamInitiation);
}
public StanzaFilter getInitiationPacketFilter(String from, String streamID) {
@Override
public void newStreamInitiation(String from, String streamID) {
/*
* this method is always called prior to #negotiateIncomingStream() so
* the In-Band Bytestream initiation listener must ignore the next
* In-Band Bytestream request with the given session ID
*/
this.manager.ignoreBytestreamRequestOnce(streamID);
return new AndFilter(FromMatchesFilter.create(from), new IBBOpenSidFilter(streamID));
}
public String[] getNamespaces() {
@ -108,26 +101,6 @@ public class IBBTransferNegotiator extends StreamNegotiator {
return session.getInputStream();
}
/**
* This PacketFilter accepts an incoming In-Band Bytestream open request
* with a specified session ID.
*/
private static class IBBOpenSidFilter extends FlexibleStanzaTypeFilter<Open> {
private final String sessionID;
public IBBOpenSidFilter(String sessionID) {
this.sessionID = Objects.requireNonNull(sessionID, "sessionID must not be null");
}
@Override
protected boolean acceptSpecific(Open bytestream) {
// packet must by of type SET and contains the given session ID
return this.sessionID.equals(bytestream.getSessionID())
&& IQ.Type.set.equals(bytestream.getType());
}
}
/**
* Derive from InBandBytestreamRequest to access protected constructor.
*/

View File

@ -26,13 +26,7 @@ import org.jivesoftware.smack.SmackException.NoResponseException;
import org.jivesoftware.smack.XMPPConnection;
import org.jivesoftware.smack.XMPPException;
import org.jivesoftware.smack.XMPPException.XMPPErrorException;
import org.jivesoftware.smack.filter.AndFilter;
import org.jivesoftware.smack.filter.FlexibleStanzaTypeFilter;
import org.jivesoftware.smack.filter.FromMatchesFilter;
import org.jivesoftware.smack.filter.StanzaFilter;
import org.jivesoftware.smack.packet.IQ;
import org.jivesoftware.smack.packet.Stanza;
import org.jivesoftware.smack.util.Objects;
import org.jivesoftware.smackx.bytestreams.socks5.Socks5BytestreamManager;
import org.jivesoftware.smackx.bytestreams.socks5.Socks5BytestreamRequest;
import org.jivesoftware.smackx.bytestreams.socks5.Socks5BytestreamSession;
@ -85,15 +79,13 @@ public class Socks5TransferNegotiator extends StreamNegotiator {
}
@Override
public StanzaFilter getInitiationPacketFilter(final String from, String streamID) {
public void newStreamInitiation(String from, String streamID) {
/*
* this method is always called prior to #negotiateIncomingStream() so the SOCKS5
* InitiationListener must ignore the next SOCKS5 Bytestream request with the given session
* ID
*/
this.manager.ignoreBytestreamRequestOnce(streamID);
return new AndFilter(FromMatchesFilter.create(from), new BytestreamSIDFilter(streamID));
}
@Override
@ -123,25 +115,6 @@ public class Socks5TransferNegotiator extends StreamNegotiator {
}
}
/**
* This PacketFilter accepts an incoming SOCKS5 Bytestream request with a specified session ID.
*/
private static class BytestreamSIDFilter extends FlexibleStanzaTypeFilter<Bytestream> {
private final String sessionID;
public BytestreamSIDFilter(String sessionID) {
this.sessionID = Objects.requireNonNull(sessionID, "SessionID cannot be null");
}
@Override
protected boolean acceptSpecific(Bytestream bytestream) {
// packet must by of type SET and contains the given session ID
return this.sessionID.equals(bytestream.getSessionID())
&& IQ.Type.set.equals(bytestream.getType());
}
}
/**
* Derive from Socks5BytestreamRequest to access protected constructor.
*/

View File

@ -16,16 +16,16 @@
*/
package org.jivesoftware.smackx.filetransfer;
import org.jivesoftware.smack.PacketCollector;
import org.jivesoftware.smack.SmackException;
import org.jivesoftware.smack.SmackException.NoResponseException;
import org.jivesoftware.smack.SmackException.NotConnectedException;
import org.jivesoftware.smack.XMPPConnection;
import org.jivesoftware.smack.XMPPException;
import org.jivesoftware.smack.XMPPException.XMPPErrorException;
import org.jivesoftware.smack.filter.StanzaFilter;
import org.jivesoftware.smack.packet.IQ;
import org.jivesoftware.smack.packet.Stanza;
import org.jivesoftware.smack.util.EventManger;
import org.jivesoftware.smack.util.EventManger.Callback;
import org.jivesoftware.smackx.si.packet.StreamInitiation;
import org.jivesoftware.smackx.xdata.FormField;
import org.jivesoftware.smackx.xdata.packet.DataForm;
@ -43,6 +43,19 @@ import java.io.OutputStream;
*/
public abstract class StreamNegotiator {
/**
* A event manager for stream initiation requests send to us.
* <p>
* Those are typical XEP-45 Open or XEP-65 Bytestream IQ requests. The even key is in the format
* "initiationFrom + '\t' + streamId"
* </p>
*/
// TODO This field currently being static is considered a quick hack. Ideally this should take
// the local connection into account, for example by changing the key to
// "localJid + '\t' + initiationFrom + '\t' + streamId" or making the field non-static (but then
// you need to provide access to the InitiationListeners, which could get tricky)
protected static final EventManger<String, IQ, SmackException.NotConnectedException> initationSetEvents = new EventManger<>();
/**
* Creates the initiation acceptance packet to forward to the stream
* initiator.
@ -51,7 +64,7 @@ public abstract class StreamNegotiator {
* @param namespaces The namespace that relates to the accepted means of transfer.
* @return The response to be forwarded to the initiator.
*/
public StreamInitiation createInitiationAccept(
protected static StreamInitiation createInitiationAccept(
StreamInitiation streamInitiationOffer, String[] namespaces)
{
StreamInitiation response = new StreamInitiation();
@ -72,29 +85,42 @@ public abstract class StreamNegotiator {
return response;
}
Stanza initiateIncomingStream(XMPPConnection connection, StreamInitiation initiation) throws NoResponseException, XMPPErrorException, NotConnectedException {
StreamInitiation response = createInitiationAccept(initiation,
protected final IQ initiateIncomingStream(final XMPPConnection connection, StreamInitiation initiation)
throws NoResponseException, XMPPErrorException, NotConnectedException {
final StreamInitiation response = createInitiationAccept(initiation,
getNamespaces());
// establish collector to await response
PacketCollector collector = connection
.createPacketCollectorAndSend(getInitiationPacketFilter(initiation.getFrom(), initiation.getSessionID()), response);
newStreamInitiation(initiation.getFrom(), initiation.getSessionID());
Stanza streamMethodInitiation = collector.nextResultOrThrow();
final String eventKey = initiation.getFrom().toString() + '\t' + initiation.getSessionID();
IQ streamMethodInitiation;
try {
streamMethodInitiation = initationSetEvents.performActionAndWaitForEvent(eventKey, connection.getPacketReplyTimeout(), new Callback<NotConnectedException>() {
@Override
public void action() throws NotConnectedException {
connection.sendPacket(response);
}
});
}
catch (InterruptedException e) {
// TODO remove this try/catch once merged into 4.2's master branch
throw new IllegalStateException(e);
}
if (streamMethodInitiation == null) {
throw NoResponseException.newWith(connection);
}
XMPPErrorException.ifHasErrorThenThrow(streamMethodInitiation);
return streamMethodInitiation;
}
/**
* Returns the packet filter that will return the initiation packet for the appropriate stream
* initiation.
* Signal that a new stream initiation arrived. The negotiator may needs to prepare for it.
*
* @param from The initiator of the file transfer.
* @param streamID The stream ID related to the transfer.
* @return The <b><i>PacketFilter</b></i> that will return the packet relatable to the stream
* initiation.
*/
public abstract StanzaFilter getInitiationPacketFilter(String from, String streamID);
protected abstract void newStreamInitiation(String from, String streamID);
abstract InputStream negotiateIncomingStream(Stanza streamInitiation) throws XMPPErrorException,
@ -147,4 +173,7 @@ public abstract class StreamNegotiator {
*/
public abstract String[] getNamespaces();
public static void signal(String eventKey, IQ eventValue) {
initationSetEvents.signalEvent(eventKey, eventValue);
}
}