From dde0cfd7f6f6b9e8c9fade51fa2d9a80ed242caa Mon Sep 17 00:00:00 2001 From: Florian Schmaus Date: Fri, 27 Feb 2015 10:02:48 +0100 Subject: [PATCH] Fix incoming file transfers With bb8dcc9874641ec1f833ba6f33ccab8a86f91fa8 the concept if IQ request handlers was introduced in Smack. This doesn't allow packet/stanza collectors/listeners to filter for incoming IQ requests. Unfortunately the file transfer code relied on this being able, so it broke with the change. There were two places where the file transfer code was listening for incoming IQ requests: - InitationListener(s) - Negotiator(s) With this change, we let the InitiationListener signal the existence of an incoming initation request, send by an IQ of type 'set', using the newly created EventManager utility. The negotiator waits for those events to arrive and proceedes as it would have done when the packet collector was used. --- .../jivesoftware/smack/util/EventManger.java | 97 ++++++++++++++ .../bytestreams/ibb/InitiationListener.java | 3 + .../socks5/InitiationListener.java | 3 + .../filetransfer/FaultTolerantNegotiator.java | 118 ++++-------------- .../filetransfer/IBBTransferNegotiator.java | 31 +---- .../Socks5TransferNegotiator.java | 29 +---- .../smackx/filetransfer/StreamNegotiator.java | 57 ++++++--- 7 files changed, 171 insertions(+), 167 deletions(-) create mode 100644 smack-core/src/main/java/org/jivesoftware/smack/util/EventManger.java diff --git a/smack-core/src/main/java/org/jivesoftware/smack/util/EventManger.java b/smack-core/src/main/java/org/jivesoftware/smack/util/EventManger.java new file mode 100644 index 000000000..69ca3d6eb --- /dev/null +++ b/smack-core/src/main/java/org/jivesoftware/smack/util/EventManger.java @@ -0,0 +1,97 @@ +/** + * + * Copyright 2015 Florian Schmaus + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.jivesoftware.smack.util; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * The event manager class is used to perform actions and wait for an event, which is usually caused by the action (or maybe never occurs). + *

+ * Events are distinguished by an unique event key. They can produce an event result, which can simply be null. + *

+ *

+ * The action is able to throw an exception. + *

+ * + * @param the event key. + * @param the event result. + * @param the exception which could be thrown by the action. + */ +public class EventManger { + + private final Map> events = new ConcurrentHashMap<>(); + + /** + * Perform an action and wait for an event. + *

+ * The event is signaled with {@link #signalEvent(Object, Object)}. + *

+ * + * @param eventKey the event key, must not be null. + * @param timeout the timeout to wait for the event in milliseconds. + * @param action the action to perform prior waiting for the event, must not be null. + * @return the event value, may be null. + * @throws InterruptedException if interrupted while waiting for the event. + * @throws E + */ + public R performActionAndWaitForEvent(K eventKey, long timeout, Callback action) throws InterruptedException, E { + final Reference reference = new Reference<>(); + events.put(eventKey, reference); + try { + synchronized (reference) { + action.action(); + reference.wait(timeout); + } + return reference.eventResult; + } + finally { + events.remove(eventKey); + } + } + + /** + * Signal an event and the event result. + *

+ * This method will return false if the event was not created with + * {@link #performActionAndWaitForEvent(Object, long, Callback)}. + *

+ * + * @param eventKey the event key, must not be null. + * @param eventResult the event result, may be null. + * @return true if the event was found and signaled, false otherwise. + */ + public boolean signalEvent(K eventKey, R eventResult) { + final Reference reference = events.get(eventKey); + if (reference == null) { + return false; + } + reference.eventResult = eventResult; + synchronized(reference) { + reference.notifyAll(); + } + return true; + } + + private static class Reference { + volatile V eventResult; + } + + public interface Callback { + public void action() throws E; + } +} diff --git a/smack-extensions/src/main/java/org/jivesoftware/smackx/bytestreams/ibb/InitiationListener.java b/smack-extensions/src/main/java/org/jivesoftware/smackx/bytestreams/ibb/InitiationListener.java index fc0dd2264..5aca64342 100644 --- a/smack-extensions/src/main/java/org/jivesoftware/smackx/bytestreams/ibb/InitiationListener.java +++ b/smack-extensions/src/main/java/org/jivesoftware/smackx/bytestreams/ibb/InitiationListener.java @@ -27,6 +27,7 @@ import org.jivesoftware.smack.packet.IQ; import org.jivesoftware.smack.packet.Stanza; import org.jivesoftware.smackx.bytestreams.BytestreamListener; import org.jivesoftware.smackx.bytestreams.ibb.packet.Open; +import org.jivesoftware.smackx.filetransfer.StreamNegotiator; /** @@ -86,6 +87,8 @@ class InitiationListener extends AbstractIqRequestHandler { return; } + StreamNegotiator.signal(ibbRequest.getFrom() + '\t' + ibbRequest.getSessionID(), ibbRequest); + // ignore request if in ignore list if (this.manager.getIgnoredBytestreamRequests().remove(ibbRequest.getSessionID())) return; diff --git a/smack-extensions/src/main/java/org/jivesoftware/smackx/bytestreams/socks5/InitiationListener.java b/smack-extensions/src/main/java/org/jivesoftware/smackx/bytestreams/socks5/InitiationListener.java index 27c8a5c11..4b3608ae6 100644 --- a/smack-extensions/src/main/java/org/jivesoftware/smackx/bytestreams/socks5/InitiationListener.java +++ b/smack-extensions/src/main/java/org/jivesoftware/smackx/bytestreams/socks5/InitiationListener.java @@ -27,6 +27,7 @@ import org.jivesoftware.smack.packet.IQ; import org.jivesoftware.smack.packet.Stanza; import org.jivesoftware.smackx.bytestreams.BytestreamListener; import org.jivesoftware.smackx.bytestreams.socks5.packet.Bytestream; +import org.jivesoftware.smackx.filetransfer.StreamNegotiator; /** * InitiationListener handles all incoming SOCKS5 Bytestream initiation requests. If there are no @@ -77,6 +78,8 @@ final class InitiationListener extends AbstractIqRequestHandler { private void processRequest(Stanza packet) throws NotConnectedException { Bytestream byteStreamRequest = (Bytestream) packet; + StreamNegotiator.signal(byteStreamRequest.getFrom() + '\t' + byteStreamRequest.getSessionID(), byteStreamRequest); + // ignore request if in ignore list if (this.manager.getIgnoredBytestreamRequests().remove(byteStreamRequest.getSessionID())) { return; diff --git a/smack-extensions/src/main/java/org/jivesoftware/smackx/filetransfer/FaultTolerantNegotiator.java b/smack-extensions/src/main/java/org/jivesoftware/smackx/filetransfer/FaultTolerantNegotiator.java index cd957db1d..d3c47ef47 100644 --- a/smack-extensions/src/main/java/org/jivesoftware/smackx/filetransfer/FaultTolerantNegotiator.java +++ b/smack-extensions/src/main/java/org/jivesoftware/smackx/filetransfer/FaultTolerantNegotiator.java @@ -18,26 +18,15 @@ package org.jivesoftware.smackx.filetransfer; import java.io.InputStream; import java.io.OutputStream; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.CompletionService; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorCompletionService; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import org.jivesoftware.smack.PacketCollector; import org.jivesoftware.smack.SmackException; -import org.jivesoftware.smack.SmackException.NoResponseException; import org.jivesoftware.smack.XMPPConnection; import org.jivesoftware.smack.XMPPException; import org.jivesoftware.smack.XMPPException.XMPPErrorException; -import org.jivesoftware.smack.filter.OrFilter; -import org.jivesoftware.smack.filter.StanzaFilter; +import org.jivesoftware.smack.packet.IQ; import org.jivesoftware.smack.packet.Stanza; +import org.jivesoftware.smackx.bytestreams.ibb.packet.Open; +import org.jivesoftware.smackx.bytestreams.socks5.packet.Bytestream; import org.jivesoftware.smackx.si.packet.StreamInitiation; @@ -51,8 +40,6 @@ public class FaultTolerantNegotiator extends StreamNegotiator { private final StreamNegotiator primaryNegotiator; private final StreamNegotiator secondaryNegotiator; private final XMPPConnection connection; - private StanzaFilter primaryFilter; - private StanzaFilter secondaryFilter; public FaultTolerantNegotiator(XMPPConnection connection, StreamNegotiator primary, StreamNegotiator secondary) { @@ -61,12 +48,10 @@ public class FaultTolerantNegotiator extends StreamNegotiator { this.connection = connection; } - public StanzaFilter getInitiationPacketFilter(String from, String streamID) { - if (primaryFilter == null || secondaryFilter == null) { - primaryFilter = primaryNegotiator.getInitiationPacketFilter(from, streamID); - secondaryFilter = secondaryNegotiator.getInitiationPacketFilter(from, streamID); - } - return new OrFilter(primaryFilter, secondaryFilter); + @Override + public void newStreamInitiation(String from, String streamID) { + primaryNegotiator.newStreamInitiation(from, streamID); + secondaryNegotiator.newStreamInitiation(from, streamID); } InputStream negotiateIncomingStream(Stanza streamInitiation) { @@ -74,73 +59,28 @@ public class FaultTolerantNegotiator extends StreamNegotiator { "stream method."); } - final Stanza initiateIncomingStream(XMPPConnection connection, StreamInitiation initiation) { - throw new UnsupportedOperationException("Initiation handled by createIncomingStream " + - "method"); - } + public InputStream createIncomingStream(final StreamInitiation initiation) throws SmackException, XMPPErrorException { + // This could be either an xep47 ibb 'open' iq or an xep65 streamhost iq + IQ initationSet = initiateIncomingStream(connection, initiation); - public InputStream createIncomingStream(StreamInitiation initiation) throws SmackException { - PacketCollector collector = connection.createPacketCollectorAndSend( - getInitiationPacketFilter(initiation.getFrom(), initiation.getSessionID()), - super.createInitiationAccept(initiation, getNamespaces())); - - ExecutorService threadPoolExecutor = Executors.newFixedThreadPool(2); - CompletionService service - = new ExecutorCompletionService(threadPoolExecutor); - List> futures = new ArrayList>(); - InputStream stream = null; - SmackException exception = null; + StreamNegotiator streamNegotiator = determineNegotiator(initationSet); try { - futures.add(service.submit(new NegotiatorService(collector))); - futures.add(service.submit(new NegotiatorService(collector))); - - int i = 0; - while (stream == null && i < futures.size()) { - Future future; - try { - i++; - future = service.poll(connection.getPacketReplyTimeout(), TimeUnit.MILLISECONDS); - } - catch (InterruptedException e) { - continue; - } - - if (future == null) { - continue; - } - - try { - stream = future.get(); - } - catch (InterruptedException e) { - /* Do Nothing */ - } - catch (ExecutionException e) { - exception = new SmackException(e.getCause()); - } - } + return streamNegotiator.negotiateIncomingStream(initationSet); } - finally { - for (Future future : futures) { - future.cancel(true); - } - collector.cancel(); - threadPoolExecutor.shutdownNow(); + catch (InterruptedException e) { + // TODO remove this try/catch once merged into 4.2's master branch + throw new IllegalStateException(e); } - if (stream == null) { - if (exception != null) { - throw exception; - } - else { - throw new SmackException("File transfer negotiation failed."); - } - } - - return stream; } private StreamNegotiator determineNegotiator(Stanza streamInitiation) { - return primaryFilter.accept(streamInitiation) ? primaryNegotiator : secondaryNegotiator; + if (streamInitiation instanceof Bytestream) { + return primaryNegotiator; + } else if (streamInitiation instanceof Open){ + return secondaryNegotiator; + } else { + throw new IllegalStateException("Unknown stream initation type"); + } } public OutputStream createOutgoingStream(String streamID, String initiator, String target) @@ -167,18 +107,4 @@ public class FaultTolerantNegotiator extends StreamNegotiator { return namespaces; } - private class NegotiatorService implements Callable { - - private PacketCollector collector; - - NegotiatorService(PacketCollector collector) { - this.collector = collector; - } - - public InputStream call() throws XMPPErrorException, InterruptedException, NoResponseException, SmackException { - Stanza streamInitiation = collector.nextResultOrThrow(); - StreamNegotiator negotiator = determineNegotiator(streamInitiation); - return negotiator.negotiateIncomingStream(streamInitiation); - } - } } diff --git a/smack-extensions/src/main/java/org/jivesoftware/smackx/filetransfer/IBBTransferNegotiator.java b/smack-extensions/src/main/java/org/jivesoftware/smackx/filetransfer/IBBTransferNegotiator.java index c2895b2f5..d0c1b681d 100644 --- a/smack-extensions/src/main/java/org/jivesoftware/smackx/filetransfer/IBBTransferNegotiator.java +++ b/smack-extensions/src/main/java/org/jivesoftware/smackx/filetransfer/IBBTransferNegotiator.java @@ -23,13 +23,7 @@ import org.jivesoftware.smack.SmackException.NoResponseException; import org.jivesoftware.smack.SmackException.NotConnectedException; import org.jivesoftware.smack.XMPPConnection; import org.jivesoftware.smack.XMPPException.XMPPErrorException; -import org.jivesoftware.smack.filter.AndFilter; -import org.jivesoftware.smack.filter.FlexibleStanzaTypeFilter; -import org.jivesoftware.smack.filter.FromMatchesFilter; -import org.jivesoftware.smack.filter.StanzaFilter; -import org.jivesoftware.smack.packet.IQ; import org.jivesoftware.smack.packet.Stanza; -import org.jivesoftware.smack.util.Objects; import org.jivesoftware.smackx.bytestreams.ibb.InBandBytestreamManager; import org.jivesoftware.smackx.bytestreams.ibb.InBandBytestreamRequest; import org.jivesoftware.smackx.bytestreams.ibb.InBandBytestreamSession; @@ -82,15 +76,14 @@ public class IBBTransferNegotiator extends StreamNegotiator { return negotiateIncomingStream(streamInitiation); } - public StanzaFilter getInitiationPacketFilter(String from, String streamID) { + @Override + public void newStreamInitiation(String from, String streamID) { /* * this method is always called prior to #negotiateIncomingStream() so * the In-Band Bytestream initiation listener must ignore the next * In-Band Bytestream request with the given session ID */ this.manager.ignoreBytestreamRequestOnce(streamID); - - return new AndFilter(FromMatchesFilter.create(from), new IBBOpenSidFilter(streamID)); } public String[] getNamespaces() { @@ -108,26 +101,6 @@ public class IBBTransferNegotiator extends StreamNegotiator { return session.getInputStream(); } - /** - * This PacketFilter accepts an incoming In-Band Bytestream open request - * with a specified session ID. - */ - private static class IBBOpenSidFilter extends FlexibleStanzaTypeFilter { - - private final String sessionID; - - public IBBOpenSidFilter(String sessionID) { - this.sessionID = Objects.requireNonNull(sessionID, "sessionID must not be null"); - } - - @Override - protected boolean acceptSpecific(Open bytestream) { - // packet must by of type SET and contains the given session ID - return this.sessionID.equals(bytestream.getSessionID()) - && IQ.Type.set.equals(bytestream.getType()); - } - } - /** * Derive from InBandBytestreamRequest to access protected constructor. */ diff --git a/smack-extensions/src/main/java/org/jivesoftware/smackx/filetransfer/Socks5TransferNegotiator.java b/smack-extensions/src/main/java/org/jivesoftware/smackx/filetransfer/Socks5TransferNegotiator.java index 0cc7aa400..ef539b883 100644 --- a/smack-extensions/src/main/java/org/jivesoftware/smackx/filetransfer/Socks5TransferNegotiator.java +++ b/smack-extensions/src/main/java/org/jivesoftware/smackx/filetransfer/Socks5TransferNegotiator.java @@ -26,13 +26,7 @@ import org.jivesoftware.smack.SmackException.NoResponseException; import org.jivesoftware.smack.XMPPConnection; import org.jivesoftware.smack.XMPPException; import org.jivesoftware.smack.XMPPException.XMPPErrorException; -import org.jivesoftware.smack.filter.AndFilter; -import org.jivesoftware.smack.filter.FlexibleStanzaTypeFilter; -import org.jivesoftware.smack.filter.FromMatchesFilter; -import org.jivesoftware.smack.filter.StanzaFilter; -import org.jivesoftware.smack.packet.IQ; import org.jivesoftware.smack.packet.Stanza; -import org.jivesoftware.smack.util.Objects; import org.jivesoftware.smackx.bytestreams.socks5.Socks5BytestreamManager; import org.jivesoftware.smackx.bytestreams.socks5.Socks5BytestreamRequest; import org.jivesoftware.smackx.bytestreams.socks5.Socks5BytestreamSession; @@ -85,15 +79,13 @@ public class Socks5TransferNegotiator extends StreamNegotiator { } @Override - public StanzaFilter getInitiationPacketFilter(final String from, String streamID) { + public void newStreamInitiation(String from, String streamID) { /* * this method is always called prior to #negotiateIncomingStream() so the SOCKS5 * InitiationListener must ignore the next SOCKS5 Bytestream request with the given session * ID */ this.manager.ignoreBytestreamRequestOnce(streamID); - - return new AndFilter(FromMatchesFilter.create(from), new BytestreamSIDFilter(streamID)); } @Override @@ -123,25 +115,6 @@ public class Socks5TransferNegotiator extends StreamNegotiator { } } - /** - * This PacketFilter accepts an incoming SOCKS5 Bytestream request with a specified session ID. - */ - private static class BytestreamSIDFilter extends FlexibleStanzaTypeFilter { - - private final String sessionID; - - public BytestreamSIDFilter(String sessionID) { - this.sessionID = Objects.requireNonNull(sessionID, "SessionID cannot be null"); - } - - @Override - protected boolean acceptSpecific(Bytestream bytestream) { - // packet must by of type SET and contains the given session ID - return this.sessionID.equals(bytestream.getSessionID()) - && IQ.Type.set.equals(bytestream.getType()); - } - } - /** * Derive from Socks5BytestreamRequest to access protected constructor. */ diff --git a/smack-extensions/src/main/java/org/jivesoftware/smackx/filetransfer/StreamNegotiator.java b/smack-extensions/src/main/java/org/jivesoftware/smackx/filetransfer/StreamNegotiator.java index 0fff1304c..5b42ca15a 100644 --- a/smack-extensions/src/main/java/org/jivesoftware/smackx/filetransfer/StreamNegotiator.java +++ b/smack-extensions/src/main/java/org/jivesoftware/smackx/filetransfer/StreamNegotiator.java @@ -16,16 +16,16 @@ */ package org.jivesoftware.smackx.filetransfer; -import org.jivesoftware.smack.PacketCollector; import org.jivesoftware.smack.SmackException; import org.jivesoftware.smack.SmackException.NoResponseException; import org.jivesoftware.smack.SmackException.NotConnectedException; import org.jivesoftware.smack.XMPPConnection; import org.jivesoftware.smack.XMPPException; import org.jivesoftware.smack.XMPPException.XMPPErrorException; -import org.jivesoftware.smack.filter.StanzaFilter; import org.jivesoftware.smack.packet.IQ; import org.jivesoftware.smack.packet.Stanza; +import org.jivesoftware.smack.util.EventManger; +import org.jivesoftware.smack.util.EventManger.Callback; import org.jivesoftware.smackx.si.packet.StreamInitiation; import org.jivesoftware.smackx.xdata.FormField; import org.jivesoftware.smackx.xdata.packet.DataForm; @@ -43,6 +43,19 @@ import java.io.OutputStream; */ public abstract class StreamNegotiator { + /** + * A event manager for stream initiation requests send to us. + *

+ * Those are typical XEP-45 Open or XEP-65 Bytestream IQ requests. The even key is in the format + * "initiationFrom + '\t' + streamId" + *

+ */ + // TODO This field currently being static is considered a quick hack. Ideally this should take + // the local connection into account, for example by changing the key to + // "localJid + '\t' + initiationFrom + '\t' + streamId" or making the field non-static (but then + // you need to provide access to the InitiationListeners, which could get tricky) + protected static final EventManger initationSetEvents = new EventManger<>(); + /** * Creates the initiation acceptance packet to forward to the stream * initiator. @@ -51,7 +64,7 @@ public abstract class StreamNegotiator { * @param namespaces The namespace that relates to the accepted means of transfer. * @return The response to be forwarded to the initiator. */ - public StreamInitiation createInitiationAccept( + protected static StreamInitiation createInitiationAccept( StreamInitiation streamInitiationOffer, String[] namespaces) { StreamInitiation response = new StreamInitiation(); @@ -72,29 +85,42 @@ public abstract class StreamNegotiator { return response; } - Stanza initiateIncomingStream(XMPPConnection connection, StreamInitiation initiation) throws NoResponseException, XMPPErrorException, NotConnectedException { - StreamInitiation response = createInitiationAccept(initiation, + protected final IQ initiateIncomingStream(final XMPPConnection connection, StreamInitiation initiation) + throws NoResponseException, XMPPErrorException, NotConnectedException { + final StreamInitiation response = createInitiationAccept(initiation, getNamespaces()); - // establish collector to await response - PacketCollector collector = connection - .createPacketCollectorAndSend(getInitiationPacketFilter(initiation.getFrom(), initiation.getSessionID()), response); + newStreamInitiation(initiation.getFrom(), initiation.getSessionID()); - Stanza streamMethodInitiation = collector.nextResultOrThrow(); + final String eventKey = initiation.getFrom().toString() + '\t' + initiation.getSessionID(); + IQ streamMethodInitiation; + try { + streamMethodInitiation = initationSetEvents.performActionAndWaitForEvent(eventKey, connection.getPacketReplyTimeout(), new Callback() { + @Override + public void action() throws NotConnectedException { + connection.sendPacket(response); + } + }); + } + catch (InterruptedException e) { + // TODO remove this try/catch once merged into 4.2's master branch + throw new IllegalStateException(e); + } + if (streamMethodInitiation == null) { + throw NoResponseException.newWith(connection); + } + XMPPErrorException.ifHasErrorThenThrow(streamMethodInitiation); return streamMethodInitiation; } /** - * Returns the packet filter that will return the initiation packet for the appropriate stream - * initiation. + * Signal that a new stream initiation arrived. The negotiator may needs to prepare for it. * * @param from The initiator of the file transfer. * @param streamID The stream ID related to the transfer. - * @return The PacketFilter that will return the packet relatable to the stream - * initiation. */ - public abstract StanzaFilter getInitiationPacketFilter(String from, String streamID); + protected abstract void newStreamInitiation(String from, String streamID); abstract InputStream negotiateIncomingStream(Stanza streamInitiation) throws XMPPErrorException, @@ -147,4 +173,7 @@ public abstract class StreamNegotiator { */ public abstract String[] getNamespaces(); + public static void signal(String eventKey, IQ eventValue) { + initationSetEvents.signalEvent(eventKey, eventValue); + } }