diff --git a/smack-core/src/main/java/org/jivesoftware/smack/util/EventManger.java b/smack-core/src/main/java/org/jivesoftware/smack/util/EventManger.java new file mode 100644 index 000000000..69ca3d6eb --- /dev/null +++ b/smack-core/src/main/java/org/jivesoftware/smack/util/EventManger.java @@ -0,0 +1,97 @@ +/** + * + * Copyright 2015 Florian Schmaus + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.jivesoftware.smack.util; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * The event manager class is used to perform actions and wait for an event, which is usually caused by the action (or maybe never occurs). + *

+ * Events are distinguished by an unique event key. They can produce an event result, which can simply be null. + *

+ *

+ * The action is able to throw an exception. + *

+ * + * @param the event key. + * @param the event result. + * @param the exception which could be thrown by the action. + */ +public class EventManger { + + private final Map> events = new ConcurrentHashMap<>(); + + /** + * Perform an action and wait for an event. + *

+ * The event is signaled with {@link #signalEvent(Object, Object)}. + *

+ * + * @param eventKey the event key, must not be null. + * @param timeout the timeout to wait for the event in milliseconds. + * @param action the action to perform prior waiting for the event, must not be null. + * @return the event value, may be null. + * @throws InterruptedException if interrupted while waiting for the event. + * @throws E + */ + public R performActionAndWaitForEvent(K eventKey, long timeout, Callback action) throws InterruptedException, E { + final Reference reference = new Reference<>(); + events.put(eventKey, reference); + try { + synchronized (reference) { + action.action(); + reference.wait(timeout); + } + return reference.eventResult; + } + finally { + events.remove(eventKey); + } + } + + /** + * Signal an event and the event result. + *

+ * This method will return false if the event was not created with + * {@link #performActionAndWaitForEvent(Object, long, Callback)}. + *

+ * + * @param eventKey the event key, must not be null. + * @param eventResult the event result, may be null. + * @return true if the event was found and signaled, false otherwise. + */ + public boolean signalEvent(K eventKey, R eventResult) { + final Reference reference = events.get(eventKey); + if (reference == null) { + return false; + } + reference.eventResult = eventResult; + synchronized(reference) { + reference.notifyAll(); + } + return true; + } + + private static class Reference { + volatile V eventResult; + } + + public interface Callback { + public void action() throws E; + } +} diff --git a/smack-extensions/src/main/java/org/jivesoftware/smackx/bytestreams/ibb/InitiationListener.java b/smack-extensions/src/main/java/org/jivesoftware/smackx/bytestreams/ibb/InitiationListener.java index fc0dd2264..5aca64342 100644 --- a/smack-extensions/src/main/java/org/jivesoftware/smackx/bytestreams/ibb/InitiationListener.java +++ b/smack-extensions/src/main/java/org/jivesoftware/smackx/bytestreams/ibb/InitiationListener.java @@ -27,6 +27,7 @@ import org.jivesoftware.smack.packet.IQ; import org.jivesoftware.smack.packet.Stanza; import org.jivesoftware.smackx.bytestreams.BytestreamListener; import org.jivesoftware.smackx.bytestreams.ibb.packet.Open; +import org.jivesoftware.smackx.filetransfer.StreamNegotiator; /** @@ -86,6 +87,8 @@ class InitiationListener extends AbstractIqRequestHandler { return; } + StreamNegotiator.signal(ibbRequest.getFrom() + '\t' + ibbRequest.getSessionID(), ibbRequest); + // ignore request if in ignore list if (this.manager.getIgnoredBytestreamRequests().remove(ibbRequest.getSessionID())) return; diff --git a/smack-extensions/src/main/java/org/jivesoftware/smackx/bytestreams/socks5/InitiationListener.java b/smack-extensions/src/main/java/org/jivesoftware/smackx/bytestreams/socks5/InitiationListener.java index 27c8a5c11..4b3608ae6 100644 --- a/smack-extensions/src/main/java/org/jivesoftware/smackx/bytestreams/socks5/InitiationListener.java +++ b/smack-extensions/src/main/java/org/jivesoftware/smackx/bytestreams/socks5/InitiationListener.java @@ -27,6 +27,7 @@ import org.jivesoftware.smack.packet.IQ; import org.jivesoftware.smack.packet.Stanza; import org.jivesoftware.smackx.bytestreams.BytestreamListener; import org.jivesoftware.smackx.bytestreams.socks5.packet.Bytestream; +import org.jivesoftware.smackx.filetransfer.StreamNegotiator; /** * InitiationListener handles all incoming SOCKS5 Bytestream initiation requests. If there are no @@ -77,6 +78,8 @@ final class InitiationListener extends AbstractIqRequestHandler { private void processRequest(Stanza packet) throws NotConnectedException { Bytestream byteStreamRequest = (Bytestream) packet; + StreamNegotiator.signal(byteStreamRequest.getFrom() + '\t' + byteStreamRequest.getSessionID(), byteStreamRequest); + // ignore request if in ignore list if (this.manager.getIgnoredBytestreamRequests().remove(byteStreamRequest.getSessionID())) { return; diff --git a/smack-extensions/src/main/java/org/jivesoftware/smackx/filetransfer/FaultTolerantNegotiator.java b/smack-extensions/src/main/java/org/jivesoftware/smackx/filetransfer/FaultTolerantNegotiator.java index cd957db1d..d3c47ef47 100644 --- a/smack-extensions/src/main/java/org/jivesoftware/smackx/filetransfer/FaultTolerantNegotiator.java +++ b/smack-extensions/src/main/java/org/jivesoftware/smackx/filetransfer/FaultTolerantNegotiator.java @@ -18,26 +18,15 @@ package org.jivesoftware.smackx.filetransfer; import java.io.InputStream; import java.io.OutputStream; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.CompletionService; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorCompletionService; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import org.jivesoftware.smack.PacketCollector; import org.jivesoftware.smack.SmackException; -import org.jivesoftware.smack.SmackException.NoResponseException; import org.jivesoftware.smack.XMPPConnection; import org.jivesoftware.smack.XMPPException; import org.jivesoftware.smack.XMPPException.XMPPErrorException; -import org.jivesoftware.smack.filter.OrFilter; -import org.jivesoftware.smack.filter.StanzaFilter; +import org.jivesoftware.smack.packet.IQ; import org.jivesoftware.smack.packet.Stanza; +import org.jivesoftware.smackx.bytestreams.ibb.packet.Open; +import org.jivesoftware.smackx.bytestreams.socks5.packet.Bytestream; import org.jivesoftware.smackx.si.packet.StreamInitiation; @@ -51,8 +40,6 @@ public class FaultTolerantNegotiator extends StreamNegotiator { private final StreamNegotiator primaryNegotiator; private final StreamNegotiator secondaryNegotiator; private final XMPPConnection connection; - private StanzaFilter primaryFilter; - private StanzaFilter secondaryFilter; public FaultTolerantNegotiator(XMPPConnection connection, StreamNegotiator primary, StreamNegotiator secondary) { @@ -61,12 +48,10 @@ public class FaultTolerantNegotiator extends StreamNegotiator { this.connection = connection; } - public StanzaFilter getInitiationPacketFilter(String from, String streamID) { - if (primaryFilter == null || secondaryFilter == null) { - primaryFilter = primaryNegotiator.getInitiationPacketFilter(from, streamID); - secondaryFilter = secondaryNegotiator.getInitiationPacketFilter(from, streamID); - } - return new OrFilter(primaryFilter, secondaryFilter); + @Override + public void newStreamInitiation(String from, String streamID) { + primaryNegotiator.newStreamInitiation(from, streamID); + secondaryNegotiator.newStreamInitiation(from, streamID); } InputStream negotiateIncomingStream(Stanza streamInitiation) { @@ -74,73 +59,28 @@ public class FaultTolerantNegotiator extends StreamNegotiator { "stream method."); } - final Stanza initiateIncomingStream(XMPPConnection connection, StreamInitiation initiation) { - throw new UnsupportedOperationException("Initiation handled by createIncomingStream " + - "method"); - } + public InputStream createIncomingStream(final StreamInitiation initiation) throws SmackException, XMPPErrorException { + // This could be either an xep47 ibb 'open' iq or an xep65 streamhost iq + IQ initationSet = initiateIncomingStream(connection, initiation); - public InputStream createIncomingStream(StreamInitiation initiation) throws SmackException { - PacketCollector collector = connection.createPacketCollectorAndSend( - getInitiationPacketFilter(initiation.getFrom(), initiation.getSessionID()), - super.createInitiationAccept(initiation, getNamespaces())); - - ExecutorService threadPoolExecutor = Executors.newFixedThreadPool(2); - CompletionService service - = new ExecutorCompletionService(threadPoolExecutor); - List> futures = new ArrayList>(); - InputStream stream = null; - SmackException exception = null; + StreamNegotiator streamNegotiator = determineNegotiator(initationSet); try { - futures.add(service.submit(new NegotiatorService(collector))); - futures.add(service.submit(new NegotiatorService(collector))); - - int i = 0; - while (stream == null && i < futures.size()) { - Future future; - try { - i++; - future = service.poll(connection.getPacketReplyTimeout(), TimeUnit.MILLISECONDS); - } - catch (InterruptedException e) { - continue; - } - - if (future == null) { - continue; - } - - try { - stream = future.get(); - } - catch (InterruptedException e) { - /* Do Nothing */ - } - catch (ExecutionException e) { - exception = new SmackException(e.getCause()); - } - } + return streamNegotiator.negotiateIncomingStream(initationSet); } - finally { - for (Future future : futures) { - future.cancel(true); - } - collector.cancel(); - threadPoolExecutor.shutdownNow(); + catch (InterruptedException e) { + // TODO remove this try/catch once merged into 4.2's master branch + throw new IllegalStateException(e); } - if (stream == null) { - if (exception != null) { - throw exception; - } - else { - throw new SmackException("File transfer negotiation failed."); - } - } - - return stream; } private StreamNegotiator determineNegotiator(Stanza streamInitiation) { - return primaryFilter.accept(streamInitiation) ? primaryNegotiator : secondaryNegotiator; + if (streamInitiation instanceof Bytestream) { + return primaryNegotiator; + } else if (streamInitiation instanceof Open){ + return secondaryNegotiator; + } else { + throw new IllegalStateException("Unknown stream initation type"); + } } public OutputStream createOutgoingStream(String streamID, String initiator, String target) @@ -167,18 +107,4 @@ public class FaultTolerantNegotiator extends StreamNegotiator { return namespaces; } - private class NegotiatorService implements Callable { - - private PacketCollector collector; - - NegotiatorService(PacketCollector collector) { - this.collector = collector; - } - - public InputStream call() throws XMPPErrorException, InterruptedException, NoResponseException, SmackException { - Stanza streamInitiation = collector.nextResultOrThrow(); - StreamNegotiator negotiator = determineNegotiator(streamInitiation); - return negotiator.negotiateIncomingStream(streamInitiation); - } - } } diff --git a/smack-extensions/src/main/java/org/jivesoftware/smackx/filetransfer/IBBTransferNegotiator.java b/smack-extensions/src/main/java/org/jivesoftware/smackx/filetransfer/IBBTransferNegotiator.java index c2895b2f5..d0c1b681d 100644 --- a/smack-extensions/src/main/java/org/jivesoftware/smackx/filetransfer/IBBTransferNegotiator.java +++ b/smack-extensions/src/main/java/org/jivesoftware/smackx/filetransfer/IBBTransferNegotiator.java @@ -23,13 +23,7 @@ import org.jivesoftware.smack.SmackException.NoResponseException; import org.jivesoftware.smack.SmackException.NotConnectedException; import org.jivesoftware.smack.XMPPConnection; import org.jivesoftware.smack.XMPPException.XMPPErrorException; -import org.jivesoftware.smack.filter.AndFilter; -import org.jivesoftware.smack.filter.FlexibleStanzaTypeFilter; -import org.jivesoftware.smack.filter.FromMatchesFilter; -import org.jivesoftware.smack.filter.StanzaFilter; -import org.jivesoftware.smack.packet.IQ; import org.jivesoftware.smack.packet.Stanza; -import org.jivesoftware.smack.util.Objects; import org.jivesoftware.smackx.bytestreams.ibb.InBandBytestreamManager; import org.jivesoftware.smackx.bytestreams.ibb.InBandBytestreamRequest; import org.jivesoftware.smackx.bytestreams.ibb.InBandBytestreamSession; @@ -82,15 +76,14 @@ public class IBBTransferNegotiator extends StreamNegotiator { return negotiateIncomingStream(streamInitiation); } - public StanzaFilter getInitiationPacketFilter(String from, String streamID) { + @Override + public void newStreamInitiation(String from, String streamID) { /* * this method is always called prior to #negotiateIncomingStream() so * the In-Band Bytestream initiation listener must ignore the next * In-Band Bytestream request with the given session ID */ this.manager.ignoreBytestreamRequestOnce(streamID); - - return new AndFilter(FromMatchesFilter.create(from), new IBBOpenSidFilter(streamID)); } public String[] getNamespaces() { @@ -108,26 +101,6 @@ public class IBBTransferNegotiator extends StreamNegotiator { return session.getInputStream(); } - /** - * This PacketFilter accepts an incoming In-Band Bytestream open request - * with a specified session ID. - */ - private static class IBBOpenSidFilter extends FlexibleStanzaTypeFilter { - - private final String sessionID; - - public IBBOpenSidFilter(String sessionID) { - this.sessionID = Objects.requireNonNull(sessionID, "sessionID must not be null"); - } - - @Override - protected boolean acceptSpecific(Open bytestream) { - // packet must by of type SET and contains the given session ID - return this.sessionID.equals(bytestream.getSessionID()) - && IQ.Type.set.equals(bytestream.getType()); - } - } - /** * Derive from InBandBytestreamRequest to access protected constructor. */ diff --git a/smack-extensions/src/main/java/org/jivesoftware/smackx/filetransfer/Socks5TransferNegotiator.java b/smack-extensions/src/main/java/org/jivesoftware/smackx/filetransfer/Socks5TransferNegotiator.java index 0cc7aa400..ef539b883 100644 --- a/smack-extensions/src/main/java/org/jivesoftware/smackx/filetransfer/Socks5TransferNegotiator.java +++ b/smack-extensions/src/main/java/org/jivesoftware/smackx/filetransfer/Socks5TransferNegotiator.java @@ -26,13 +26,7 @@ import org.jivesoftware.smack.SmackException.NoResponseException; import org.jivesoftware.smack.XMPPConnection; import org.jivesoftware.smack.XMPPException; import org.jivesoftware.smack.XMPPException.XMPPErrorException; -import org.jivesoftware.smack.filter.AndFilter; -import org.jivesoftware.smack.filter.FlexibleStanzaTypeFilter; -import org.jivesoftware.smack.filter.FromMatchesFilter; -import org.jivesoftware.smack.filter.StanzaFilter; -import org.jivesoftware.smack.packet.IQ; import org.jivesoftware.smack.packet.Stanza; -import org.jivesoftware.smack.util.Objects; import org.jivesoftware.smackx.bytestreams.socks5.Socks5BytestreamManager; import org.jivesoftware.smackx.bytestreams.socks5.Socks5BytestreamRequest; import org.jivesoftware.smackx.bytestreams.socks5.Socks5BytestreamSession; @@ -85,15 +79,13 @@ public class Socks5TransferNegotiator extends StreamNegotiator { } @Override - public StanzaFilter getInitiationPacketFilter(final String from, String streamID) { + public void newStreamInitiation(String from, String streamID) { /* * this method is always called prior to #negotiateIncomingStream() so the SOCKS5 * InitiationListener must ignore the next SOCKS5 Bytestream request with the given session * ID */ this.manager.ignoreBytestreamRequestOnce(streamID); - - return new AndFilter(FromMatchesFilter.create(from), new BytestreamSIDFilter(streamID)); } @Override @@ -123,25 +115,6 @@ public class Socks5TransferNegotiator extends StreamNegotiator { } } - /** - * This PacketFilter accepts an incoming SOCKS5 Bytestream request with a specified session ID. - */ - private static class BytestreamSIDFilter extends FlexibleStanzaTypeFilter { - - private final String sessionID; - - public BytestreamSIDFilter(String sessionID) { - this.sessionID = Objects.requireNonNull(sessionID, "SessionID cannot be null"); - } - - @Override - protected boolean acceptSpecific(Bytestream bytestream) { - // packet must by of type SET and contains the given session ID - return this.sessionID.equals(bytestream.getSessionID()) - && IQ.Type.set.equals(bytestream.getType()); - } - } - /** * Derive from Socks5BytestreamRequest to access protected constructor. */ diff --git a/smack-extensions/src/main/java/org/jivesoftware/smackx/filetransfer/StreamNegotiator.java b/smack-extensions/src/main/java/org/jivesoftware/smackx/filetransfer/StreamNegotiator.java index 0fff1304c..5b42ca15a 100644 --- a/smack-extensions/src/main/java/org/jivesoftware/smackx/filetransfer/StreamNegotiator.java +++ b/smack-extensions/src/main/java/org/jivesoftware/smackx/filetransfer/StreamNegotiator.java @@ -16,16 +16,16 @@ */ package org.jivesoftware.smackx.filetransfer; -import org.jivesoftware.smack.PacketCollector; import org.jivesoftware.smack.SmackException; import org.jivesoftware.smack.SmackException.NoResponseException; import org.jivesoftware.smack.SmackException.NotConnectedException; import org.jivesoftware.smack.XMPPConnection; import org.jivesoftware.smack.XMPPException; import org.jivesoftware.smack.XMPPException.XMPPErrorException; -import org.jivesoftware.smack.filter.StanzaFilter; import org.jivesoftware.smack.packet.IQ; import org.jivesoftware.smack.packet.Stanza; +import org.jivesoftware.smack.util.EventManger; +import org.jivesoftware.smack.util.EventManger.Callback; import org.jivesoftware.smackx.si.packet.StreamInitiation; import org.jivesoftware.smackx.xdata.FormField; import org.jivesoftware.smackx.xdata.packet.DataForm; @@ -43,6 +43,19 @@ import java.io.OutputStream; */ public abstract class StreamNegotiator { + /** + * A event manager for stream initiation requests send to us. + *

+ * Those are typical XEP-45 Open or XEP-65 Bytestream IQ requests. The even key is in the format + * "initiationFrom + '\t' + streamId" + *

+ */ + // TODO This field currently being static is considered a quick hack. Ideally this should take + // the local connection into account, for example by changing the key to + // "localJid + '\t' + initiationFrom + '\t' + streamId" or making the field non-static (but then + // you need to provide access to the InitiationListeners, which could get tricky) + protected static final EventManger initationSetEvents = new EventManger<>(); + /** * Creates the initiation acceptance packet to forward to the stream * initiator. @@ -51,7 +64,7 @@ public abstract class StreamNegotiator { * @param namespaces The namespace that relates to the accepted means of transfer. * @return The response to be forwarded to the initiator. */ - public StreamInitiation createInitiationAccept( + protected static StreamInitiation createInitiationAccept( StreamInitiation streamInitiationOffer, String[] namespaces) { StreamInitiation response = new StreamInitiation(); @@ -72,29 +85,42 @@ public abstract class StreamNegotiator { return response; } - Stanza initiateIncomingStream(XMPPConnection connection, StreamInitiation initiation) throws NoResponseException, XMPPErrorException, NotConnectedException { - StreamInitiation response = createInitiationAccept(initiation, + protected final IQ initiateIncomingStream(final XMPPConnection connection, StreamInitiation initiation) + throws NoResponseException, XMPPErrorException, NotConnectedException { + final StreamInitiation response = createInitiationAccept(initiation, getNamespaces()); - // establish collector to await response - PacketCollector collector = connection - .createPacketCollectorAndSend(getInitiationPacketFilter(initiation.getFrom(), initiation.getSessionID()), response); + newStreamInitiation(initiation.getFrom(), initiation.getSessionID()); - Stanza streamMethodInitiation = collector.nextResultOrThrow(); + final String eventKey = initiation.getFrom().toString() + '\t' + initiation.getSessionID(); + IQ streamMethodInitiation; + try { + streamMethodInitiation = initationSetEvents.performActionAndWaitForEvent(eventKey, connection.getPacketReplyTimeout(), new Callback() { + @Override + public void action() throws NotConnectedException { + connection.sendPacket(response); + } + }); + } + catch (InterruptedException e) { + // TODO remove this try/catch once merged into 4.2's master branch + throw new IllegalStateException(e); + } + if (streamMethodInitiation == null) { + throw NoResponseException.newWith(connection); + } + XMPPErrorException.ifHasErrorThenThrow(streamMethodInitiation); return streamMethodInitiation; } /** - * Returns the packet filter that will return the initiation packet for the appropriate stream - * initiation. + * Signal that a new stream initiation arrived. The negotiator may needs to prepare for it. * * @param from The initiator of the file transfer. * @param streamID The stream ID related to the transfer. - * @return The PacketFilter that will return the packet relatable to the stream - * initiation. */ - public abstract StanzaFilter getInitiationPacketFilter(String from, String streamID); + protected abstract void newStreamInitiation(String from, String streamID); abstract InputStream negotiateIncomingStream(Stanza streamInitiation) throws XMPPErrorException, @@ -147,4 +173,7 @@ public abstract class StreamNegotiator { */ public abstract String[] getNamespaces(); + public static void signal(String eventKey, IQ eventValue) { + initationSetEvents.signalEvent(eventKey, eventValue); + } }