From c95c8b7e3a6fe2107380c6583a185d99c587438c Mon Sep 17 00:00:00 2001 From: Alex Wenckus Date: Wed, 21 Mar 2007 04:09:52 +0000 Subject: [PATCH] File transfer upgrade, 1.5 and beautification. Fixed fault tolerant negotiator. SMACK-128 git-svn-id: http://svn.igniterealtime.org/svn/repos/smack/trunk@7616 b35dd754-fafc-0310-a699-88a17e54d16e --- .../filetransfer/FaultTolerantNegotiator.java | 112 +++-- .../filetransfer/FileTransferNegotiator.java | 20 +- .../FileTransferNegotiatorManager.java | 26 ++ .../filetransfer/IBBTransferNegotiator.java | 20 +- .../filetransfer/IncomingFileTransfer.java | 36 +- .../Socks5TransferNegotiator.java | 402 +++--------------- .../Socks5TransferNegotiatorManager.java | 388 +++++++++++++++++ .../smackx/filetransfer/StreamNegotiator.java | 8 +- .../smackx/packet/Bytestream.java | 54 ++- .../smackx/provider/BytestreamsProvider.java | 16 +- 10 files changed, 629 insertions(+), 453 deletions(-) create mode 100644 source/org/jivesoftware/smackx/filetransfer/FileTransferNegotiatorManager.java create mode 100644 source/org/jivesoftware/smackx/filetransfer/Socks5TransferNegotiatorManager.java diff --git a/source/org/jivesoftware/smackx/filetransfer/FaultTolerantNegotiator.java b/source/org/jivesoftware/smackx/filetransfer/FaultTolerantNegotiator.java index 47bec5222..28ca043b7 100644 --- a/source/org/jivesoftware/smackx/filetransfer/FaultTolerantNegotiator.java +++ b/source/org/jivesoftware/smackx/filetransfer/FaultTolerantNegotiator.java @@ -30,10 +30,15 @@ import org.jivesoftware.smackx.packet.StreamInitiation; import java.io.InputStream; import java.io.OutputStream; +import java.util.concurrent.*; +import java.util.List; +import java.util.ArrayList; + /** - * The fault tolerant negotiator takes two stream negotiators, the primary and the secondary negotiator. - * If the primary negotiator fails during the stream negotiaton process, the second negotiator is used. + * The fault tolerant negotiator takes two stream negotiators, the primary and the secondary + * negotiator. If the primary negotiator fails during the stream negotiaton process, the second + * negotiator is used. */ public class FaultTolerantNegotiator extends StreamNegotiator { @@ -43,7 +48,8 @@ public class FaultTolerantNegotiator extends StreamNegotiator { private PacketFilter primaryFilter; private PacketFilter secondaryFilter; - public FaultTolerantNegotiator(XMPPConnection connection, StreamNegotiator primary, StreamNegotiator secondary) { + public FaultTolerantNegotiator(XMPPConnection connection, StreamNegotiator primary, + StreamNegotiator secondary) { this.primaryNegotiator = primary; this.secondaryNegotiator = secondary; this.connection = connection; @@ -58,41 +64,70 @@ public class FaultTolerantNegotiator extends StreamNegotiator { } InputStream negotiateIncomingStream(Packet streamInitiation) throws XMPPException { - throw new UnsupportedOperationException("Negotiation only handled by create incoming stream method."); + throw new UnsupportedOperationException("Negotiation only handled by create incoming " + + "stream method."); } - final Packet initiateIncomingStream(XMPPConnection connection, StreamInitiation initiation) throws XMPPException { - throw new UnsupportedOperationException("Initiation handled by createIncomingStream method"); + final Packet initiateIncomingStream(XMPPConnection connection, StreamInitiation initiation) { + throw new UnsupportedOperationException("Initiation handled by createIncomingStream " + + "method"); } public InputStream createIncomingStream(StreamInitiation initiation) throws XMPPException { - PacketFilter filter = getInitiationPacketFilter(initiation.getFrom(), initiation.getSessionID()); - PacketCollector collector = connection.createPacketCollector(filter); + PacketCollector collector = connection.createPacketCollector( + getInitiationPacketFilter(initiation.getFrom(), initiation.getSessionID())); - StreamInitiation response = super.createInitiationAccept(initiation, getNamespaces()); - connection.sendPacket(response); + connection.sendPacket(super.createInitiationAccept(initiation, getNamespaces())); - InputStream stream; + CompletionService service + = new ExecutorCompletionService(Executors.newFixedThreadPool(2)); + List> futures = new ArrayList>(); + InputStream stream = null; + XMPPException exception = null; try { - Packet streamInitiation = collector.nextResult(SmackConfiguration.getPacketReplyTimeout()); - if (streamInitiation == null) { - throw new XMPPException("No response from remote client"); + futures.add(service.submit(new NegotiatorService(collector))); + futures.add(service.submit(new NegotiatorService(collector))); + + int i = 0; + while (stream == null && i < futures.size()) { + Future future; + try { + i++; + future = service.poll(10, TimeUnit.SECONDS); + } + catch (InterruptedException e) { + continue; + } + + if (future == null) { + continue; + } + + try { + stream = future.get(); + } + catch (InterruptedException e) { + /* Do Nothing */ + } + catch (ExecutionException e) { + exception = new XMPPException(e.getCause()); + } } - StreamNegotiator negotiator = determineNegotiator(streamInitiation); - stream = negotiator.negotiateIncomingStream(streamInitiation); } - catch (XMPPException ex) { - ex.printStackTrace(); - Packet streamInitiation = collector.nextResult(SmackConfiguration.getPacketReplyTimeout()); - collector.cancel(); - if (streamInitiation == null) { - throw new XMPPException("No response from remote client"); + finally { + for (Future future : futures) { + future.cancel(true); } - StreamNegotiator negotiator = determineNegotiator(streamInitiation); - stream = negotiator.negotiateIncomingStream(streamInitiation); - } finally { collector.cancel(); } + if (stream == null) { + if (exception != null) { + throw exception; + } + else { + throw new XMPPException("File transfer negotiation failed."); + } + } return stream; } @@ -101,7 +136,8 @@ public class FaultTolerantNegotiator extends StreamNegotiator { return primaryFilter.accept(streamInitiation) ? primaryNegotiator : secondaryNegotiator; } - public OutputStream createOutgoingStream(String streamID, String initiator, String target) throws XMPPException { + public OutputStream createOutgoingStream(String streamID, String initiator, String target) + throws XMPPException { OutputStream stream; try { stream = primaryNegotiator.createOutgoingStream(streamID, initiator, target); @@ -114,10 +150,10 @@ public class FaultTolerantNegotiator extends StreamNegotiator { } public String[] getNamespaces() { - String [] primary = primaryNegotiator.getNamespaces(); - String [] secondary = secondaryNegotiator.getNamespaces(); + String[] primary = primaryNegotiator.getNamespaces(); + String[] secondary = secondaryNegotiator.getNamespaces(); - String [] namespaces = new String[primary.length + secondary.length]; + String[] namespaces = new String[primary.length + secondary.length]; System.arraycopy(primary, 0, namespaces, 0, primary.length); System.arraycopy(secondary, 0, namespaces, primary.length, secondary.length); @@ -127,4 +163,22 @@ public class FaultTolerantNegotiator extends StreamNegotiator { public void cleanup() { } + private class NegotiatorService implements Callable { + + private PacketCollector collector; + + NegotiatorService(PacketCollector collector) { + this.collector = collector; + } + + public InputStream call() throws Exception { + Packet streamInitiation = collector.nextResult( + SmackConfiguration.getPacketReplyTimeout() * 2); + if (streamInitiation == null) { + throw new XMPPException("No response from remote client"); + } + StreamNegotiator negotiator = determineNegotiator(streamInitiation); + return negotiator.negotiateIncomingStream(streamInitiation); + } + } } diff --git a/source/org/jivesoftware/smackx/filetransfer/FileTransferNegotiator.java b/source/org/jivesoftware/smackx/filetransfer/FileTransferNegotiator.java index 1d60c1663..1d2354b8e 100644 --- a/source/org/jivesoftware/smackx/filetransfer/FileTransferNegotiator.java +++ b/source/org/jivesoftware/smackx/filetransfer/FileTransferNegotiator.java @@ -74,6 +74,11 @@ public class FileTransferNegotiator { private static final Random randomGenerator = new Random(); + /** + * A static variable to use only offer IBB for file transfer. It is generally recommend to only + * set this variable to true for testing purposes as IBB is the backup file transfer method + * and shouldn't be used as the only transfer method in production systems. + */ public static boolean IBB_ONLY = false; /** @@ -178,7 +183,7 @@ public class FileTransferNegotiator { private final XMPPConnection connection; - private final StreamNegotiator byteStreamTransferManager; + private final Socks5TransferNegotiatorManager byteStreamTransferManager; private final StreamNegotiator inbandTransferManager; @@ -186,7 +191,7 @@ public class FileTransferNegotiator { configureConnection(connection); this.connection = connection; - byteStreamTransferManager = new Socks5TransferNegotiator(connection); + byteStreamTransferManager = new Socks5TransferNegotiatorManager(connection); inbandTransferManager = new IBBTransferNegotiator(connection); } @@ -298,10 +303,12 @@ public class FileTransferNegotiator { } if (isByteStream && isIBB && field.getType().equals(FormField.TYPE_LIST_MULTI)) { - return new FaultTolerantNegotiator(connection, byteStreamTransferManager, inbandTransferManager); + return new FaultTolerantNegotiator(connection, + byteStreamTransferManager.createNegotiator(), + inbandTransferManager); } else if (isByteStream) { - return byteStreamTransferManager; + return byteStreamTransferManager.createNegotiator(); } else { return inbandTransferManager; @@ -430,10 +437,11 @@ public class FileTransferNegotiator { } if (isByteStream && isIBB) { - return new FaultTolerantNegotiator(connection, byteStreamTransferManager, inbandTransferManager); + return new FaultTolerantNegotiator(connection, + byteStreamTransferManager.createNegotiator(), inbandTransferManager); } else if (isByteStream) { - return byteStreamTransferManager; + return byteStreamTransferManager.createNegotiator(); } else { return inbandTransferManager; diff --git a/source/org/jivesoftware/smackx/filetransfer/FileTransferNegotiatorManager.java b/source/org/jivesoftware/smackx/filetransfer/FileTransferNegotiatorManager.java new file mode 100644 index 000000000..e63c9ebf6 --- /dev/null +++ b/source/org/jivesoftware/smackx/filetransfer/FileTransferNegotiatorManager.java @@ -0,0 +1,26 @@ +/** + * $Revision:$ + * $Date:$ + * + * Copyright 2003-2007 Jive Software. + * + * All rights reserved. Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.jivesoftware.smackx.filetransfer; + +/** + * + */ +public interface FileTransferNegotiatorManager { + StreamNegotiator createNegotiator(); +} diff --git a/source/org/jivesoftware/smackx/filetransfer/IBBTransferNegotiator.java b/source/org/jivesoftware/smackx/filetransfer/IBBTransferNegotiator.java index a3501a50e..19fc23368 100644 --- a/source/org/jivesoftware/smackx/filetransfer/IBBTransferNegotiator.java +++ b/source/org/jivesoftware/smackx/filetransfer/IBBTransferNegotiator.java @@ -19,10 +19,7 @@ */ package org.jivesoftware.smackx.filetransfer; -import org.jivesoftware.smack.PacketCollector; -import org.jivesoftware.smack.PacketListener; -import org.jivesoftware.smack.XMPPConnection; -import org.jivesoftware.smack.XMPPException; +import org.jivesoftware.smack.*; import org.jivesoftware.smack.util.StringUtils; import org.jivesoftware.smack.filter.*; import org.jivesoftware.smack.packet.IQ; @@ -115,12 +112,12 @@ public class IBBTransferNegotiator extends StreamNegotiator { PacketCollector collector = connection .createPacketCollector(new PacketIDFilter(openIQ.getPacketID())); connection.sendPacket(openIQ); - - IQ openResponse = (IQ) collector.nextResult(); + // We don't want to wait forever for the result + IQ openResponse = (IQ) collector.nextResult(SmackConfiguration.getPacketReplyTimeout()); collector.cancel(); if (openResponse == null) { - throw new XMPPException("No response from peer"); + throw new XMPPException("No response from peer on IBB open"); } IQ.Type type = openResponse.getType(); @@ -144,9 +141,6 @@ public class IBBTransferNegotiator extends StreamNegotiator { public void cleanup() { } - public void cancel() { - } - private class IBBOutputStream extends OutputStream { protected byte[] buffer; @@ -457,10 +451,8 @@ public class IBBTransferNegotiator extends StreamNegotiator { IBBExtensions.Data data = (IBBExtensions.Data) packet. getExtension(IBBExtensions.Data.ELEMENT_NAME, IBBExtensions.NAMESPACE); - if (data == null) { - return false; - } - return data.getSessionID() != null && data.getSessionID().equalsIgnoreCase(sessionID); + return data != null && data.getSessionID() != null + && data.getSessionID().equalsIgnoreCase(sessionID); } } diff --git a/source/org/jivesoftware/smackx/filetransfer/IncomingFileTransfer.java b/source/org/jivesoftware/smackx/filetransfer/IncomingFileTransfer.java index 085a99327..3973d44f5 100644 --- a/source/org/jivesoftware/smackx/filetransfer/IncomingFileTransfer.java +++ b/source/org/jivesoftware/smackx/filetransfer/IncomingFileTransfer.java @@ -22,6 +22,7 @@ package org.jivesoftware.smackx.filetransfer; import org.jivesoftware.smack.XMPPException; import java.io.*; +import java.util.concurrent.*; /** * An incoming file transfer is created when the @@ -94,7 +95,7 @@ public class IncomingFileTransfer extends FileTransfer { * * * @param file The location to save the file. - * @throws XMPPException when the file transfer fails + * @throws XMPPException when the file transfer fails * @throws IllegalArgumentException This exception is thrown when the the provided file is * either null, or cannot be written to. */ @@ -151,13 +152,13 @@ public class IncomingFileTransfer extends FileTransfer { try { inputStream.close(); } - catch(Throwable io) { + catch (Throwable io) { /* Ignore */ } } if (outputStream != null) { try { - outputStream.close(); + outputStream.close(); } catch (Throwable io) { /* Ignore */ @@ -175,11 +176,34 @@ public class IncomingFileTransfer extends FileTransfer { private InputStream negotiateStream() throws XMPPException { setStatus(Status.negotiating_transfer); - StreamNegotiator streamNegotiator = negotiator + final StreamNegotiator streamNegotiator = negotiator .selectStreamNegotiator(recieveRequest); setStatus(Status.negotiating_stream); - InputStream inputStream = streamNegotiator - .createIncomingStream(recieveRequest.getStreamInitiation()); + FutureTask streamNegotiatorTask = new FutureTask( + new Callable() { + + public InputStream call() throws Exception { + return streamNegotiator + .createIncomingStream(recieveRequest.getStreamInitiation()); + } + }); + streamNegotiatorTask.run(); + InputStream inputStream; + try { + inputStream = streamNegotiatorTask.get(15, TimeUnit.SECONDS); + } + catch (InterruptedException e) { + throw new XMPPException("Interruption while executing", e); + } + catch (ExecutionException e) { + throw new XMPPException("Error in execution", e); + } + catch (TimeoutException e) { + throw new XMPPException("Request timed out", e); + } + finally { + streamNegotiatorTask.cancel(true); + } setStatus(Status.negotiated); return inputStream; } diff --git a/source/org/jivesoftware/smackx/filetransfer/Socks5TransferNegotiator.java b/source/org/jivesoftware/smackx/filetransfer/Socks5TransferNegotiator.java index 361bbeb96..8bccc233b 100644 --- a/source/org/jivesoftware/smackx/filetransfer/Socks5TransferNegotiator.java +++ b/source/org/jivesoftware/smackx/filetransfer/Socks5TransferNegotiator.java @@ -31,20 +31,17 @@ import org.jivesoftware.smack.packet.IQ; import org.jivesoftware.smack.packet.Packet; import org.jivesoftware.smack.packet.XMPPError; import org.jivesoftware.smack.util.StringUtils; -import org.jivesoftware.smack.util.Cache; -import org.jivesoftware.smackx.ServiceDiscoveryManager; import org.jivesoftware.smackx.packet.Bytestream; import org.jivesoftware.smackx.packet.Bytestream.StreamHost; import org.jivesoftware.smackx.packet.Bytestream.StreamHostUsed; -import org.jivesoftware.smackx.packet.DiscoverInfo; -import org.jivesoftware.smackx.packet.DiscoverInfo.Identity; -import org.jivesoftware.smackx.packet.DiscoverItems; -import org.jivesoftware.smackx.packet.DiscoverItems.Item; import org.jivesoftware.smackx.packet.StreamInitiation; import java.io.*; -import java.net.*; -import java.util.*; +import java.net.InetAddress; +import java.net.Socket; +import java.net.UnknownHostException; +import java.util.Collection; +import java.util.Iterator; /** * A SOCKS5 bytestream is negotiated partly over the XMPP XML stream and partly @@ -80,28 +77,17 @@ public class Socks5TransferNegotiator extends StreamNegotiator { */ private static final int CONNECT_FAILURE_THRESHOLD = 2; - private static final long BLACKLIST_LIFETIME = 60 * 1000 * 120; - public static boolean isAllowLocalProxyHost = true; private final XMPPConnection connection; - private List proxies; + private Socks5TransferNegotiatorManager transferNegotiatorManager; - private List streamHosts; - - // locks the proxies during their initialization process - private final Object proxyLock = new Object(); - - private ProxyProcess proxyProcess; - - // locks on the proxy process during its initiatilization process - private final Object processLock = new Object(); - - private final Cache addressBlacklist = new Cache(100, BLACKLIST_LIFETIME); - - public Socks5TransferNegotiator(final XMPPConnection connection) { + public Socks5TransferNegotiator(Socks5TransferNegotiatorManager transferNegotiatorManager, + final XMPPConnection connection) + { this.connection = connection; + this.transferNegotiatorManager = transferNegotiatorManager; } public PacketFilter getInitiationPacketFilter(String from, String sessionID) { @@ -117,7 +103,6 @@ public class Socks5TransferNegotiator extends StreamNegotiator { */ InputStream negotiateIncomingStream(Packet streamInitiation) throws XMPPException { - Bytestream streamHostsInfo = (Bytestream) streamInitiation; if (streamHostsInfo.getType().equals(IQ.Type.ERROR)) { @@ -135,7 +120,7 @@ public class Socks5TransferNegotiator extends StreamNegotiator { ex.getXMPPError()); connection.sendPacket(errorPacket); } - throw(ex); + throw (ex); } // send used-host confirmation @@ -145,7 +130,11 @@ public class Socks5TransferNegotiator extends StreamNegotiator { connection.sendPacket(streamResponse); try { - return selectedHost.establishedSocket.getInputStream(); + PushbackInputStream stream = new PushbackInputStream( + selectedHost.establishedSocket.getInputStream()); + int firstByte = stream.read(); + stream.unread(firstByte); + return stream; } catch (IOException e) { throw new XMPPException("Error establishing input stream", e); @@ -184,13 +173,12 @@ public class Socks5TransferNegotiator extends StreamNegotiator { * Selects a host to connect to over which the file will be transmitted. * * @param streamHostsInfo the packet recieved from the initiator containing the available hosts - * to transfer the file + * to transfer the file * @return the selected host and socket that were created. * @throws XMPPException when there is no appropriate host. */ private SelectedHostInfo selectHost(Bytestream streamHostsInfo) - throws XMPPException - { + throws XMPPException { Iterator it = streamHostsInfo.getStreamHosts().iterator(); StreamHost selectedHost = null; Socket socket = null; @@ -200,7 +188,7 @@ public class Socks5TransferNegotiator extends StreamNegotiator { // Check to see if this address has been blacklisted int failures = getConnectionFailures(address); - if(failures >= CONNECT_FAILURE_THRESHOLD) { + if (failures >= CONNECT_FAILURE_THRESHOLD) { continue; } // establish socket @@ -219,7 +207,7 @@ public class Socks5TransferNegotiator extends StreamNegotiator { socket = null; } } - if (selectedHost == null || socket == null || !socket.isConnected()) { + if (selectedHost == null || socket == null || !socket.isConnected()) { String errorMessage = "Could not establish socket with any provided host"; throw new XMPPException(errorMessage, new XMPPError( XMPPError.Condition.no_acceptable, errorMessage)); @@ -229,19 +217,11 @@ public class Socks5TransferNegotiator extends StreamNegotiator { } private void incrementConnectionFailures(String address) { - Integer count = (Integer) addressBlacklist.get(address); - if(count == null) { - count = new Integer(1); - } - else { - count = new Integer(count.intValue() + 1); - } - addressBlacklist.put(address, count); + transferNegotiatorManager.incrementConnectionFailures(address); } private int getConnectionFailures(String address) { - Integer count = (Integer) addressBlacklist.get(address); - return (count != null ? count.intValue() : 0); + return transferNegotiatorManager.getConnectionFailures(address); } /** @@ -292,9 +272,8 @@ public class Socks5TransferNegotiator extends StreamNegotiator { } private Socket initBytestreamSocket(final String sessionID, - String initiator, String target) throws Exception - { - ProxyProcess process; + String initiator, String target) throws Exception { + Socks5TransferNegotiatorManager.ProxyProcess process; try { process = establishListeningSocket(); } @@ -328,14 +307,14 @@ public class Socks5TransferNegotiator extends StreamNegotiator { * @param sessionID The session id of the stream. * @param proxy The server socket which will listen locally for remote * connections. - * @param digest the digest of the userids and the session id - * @param query the query which the response is being awaited + * @param digest the digest of the userids and the session id + * @param query the query which the response is being awaited * @return the selected host * @throws XMPPException when the response from the peer is an error or doesn't occur - * @throws IOException when there is an error establishing the local socket + * @throws IOException when there is an error establishing the local socket */ private SelectedHostInfo waitForUsedHostResponse(String sessionID, - final ProxyProcess proxy, final String digest, + final Socks5TransferNegotiatorManager.ProxyProcess proxy, final String digest, final Bytestream query) throws XMPPException, IOException { SelectedHostInfo info = new SelectedHostInfo(); @@ -394,22 +373,13 @@ public class Socks5TransferNegotiator extends StreamNegotiator { } } - private ProxyProcess establishListeningSocket() throws IOException { - synchronized (processLock) { - if (proxyProcess == null) { - proxyProcess = new ProxyProcess(new ServerSocket(7777)); - proxyProcess.start(); - } - } - proxyProcess.addTransfer(); - return proxyProcess; + private Socks5TransferNegotiatorManager.ProxyProcess establishListeningSocket() + throws IOException { + return transferNegotiatorManager.addTransfer(); } private void cleanupListeningSocket() { - if (proxyProcess == null) { - return; - } - proxyProcess.removeTransfer(); + transferNegotiatorManager.removeTransfer(); } private String discoverLocalIP() throws UnknownHostException { @@ -439,34 +409,31 @@ public class Socks5TransferNegotiator extends StreamNegotiator { * </iq> * * - * @param from initiator@host1/foo - the file transfer initiator. - * @param to target@host2/bar - the file transfer target. - * @param sid 'mySID' - the unique identifier for this file transfer + * @param from initiator@host1/foo - the file transfer initiator. + * @param to target@host2/bar - the file transfer target. + * @param sid 'mySID' - the unique identifier for this file transfer * @param localIP the IP of the local machine if it is being provided, null otherwise. - * @param port the port of the local mahine if it is being provided, null otherwise. + * @param port the port of the local mahine if it is being provided, null otherwise. * @return the created Bytestream packet */ private Bytestream createByteStreamInit(final String from, final String to, - final String sid, final String localIP, final int port) { + final String sid, final String localIP, final int port) + { Bytestream bs = new Bytestream(); bs.setTo(to); bs.setFrom(from); bs.setSessionID(sid); bs.setType(IQ.Type.SET); - bs.setMode(Bytestream.Mode.TCP); + bs.setMode(Bytestream.Mode.tcp); if (localIP != null && port > 0) { bs.addStreamHost(from, localIP, port); } // make sure the proxies have been initialized completely - synchronized (proxyLock) { - if (proxies == null) { - initProxies(); - } - } + Collection streamHosts = transferNegotiatorManager.getStreamHosts(); + if (streamHosts != null) { - Iterator it = streamHosts.iterator(); - while (it.hasNext()) { - bs.addStreamHost((StreamHost) it.next()); + for (StreamHost host : streamHosts) { + bs.addStreamHost(host); } } @@ -474,101 +441,20 @@ public class Socks5TransferNegotiator extends StreamNegotiator { } - - /** - * Checks the service discovery item returned from a server component to verify if it is - * a File Transfer proxy or not. - * - * @param manager the service discovery manager which will be used to query the component - * @param item the discovered item on the server relating - * @return returns the JID of the proxy if it is a proxy or null if the item is not a proxy. - */ - private String checkIsProxy(ServiceDiscoveryManager manager, Item item) { - DiscoverInfo info; - try { - info = manager.discoverInfo(item.getEntityID()); - } - catch (XMPPException e) { - return null; - } - Iterator itx = info.getIdentities(); - while (itx.hasNext()) { - DiscoverInfo.Identity identity = (Identity) itx.next(); - if ("proxy".equalsIgnoreCase(identity.getCategory()) - && "bytestreams".equalsIgnoreCase( - identity.getType())) { - return info.getFrom(); - } - } - return null; - } - - private void initProxies() { - proxies = new ArrayList(); - ServiceDiscoveryManager manager = ServiceDiscoveryManager - .getInstanceFor(connection); - try { - DiscoverItems discoItems = manager.discoverItems(connection.getServiceName()); - Iterator it = discoItems.getItems(); - while (it.hasNext()) { - DiscoverItems.Item item = (Item) it.next(); - String proxy = checkIsProxy(manager, item); - if(proxy != null) { - proxies.add(proxy); - } - } - } - catch (XMPPException e) { - return; - } - if (proxies.size() > 0) { - initStreamHosts(); - } - } - - private void initStreamHosts() { - List streamHosts = new ArrayList(); - Iterator it = proxies.iterator(); - IQ query; - PacketCollector collector; - Bytestream response; - while (it.hasNext()) { - String jid = it.next().toString(); - query = new IQ() { - public String getChildElementXML() { - return ""; - } - }; - query.setType(IQ.Type.GET); - query.setTo(jid); - - collector = connection.createPacketCollector(new PacketIDFilter( - query.getPacketID())); - connection.sendPacket(query); - - response = (Bytestream) collector.nextResult(SmackConfiguration - .getPacketReplyTimeout()); - if (response != null) { - streamHosts.addAll(response.getStreamHosts()); - } - collector.cancel(); - } - this.streamHosts = streamHosts; - } - /** * Returns the packet to send notification to the stream host to activate * the stream. * * @param sessionID the session ID of the file transfer to activate. - * @param from - * @param to the JID of the stream host - * @param target the JID of the file transfer target. + * @param from the sender of the bytestreeam + * @param to the JID of the stream host + * @param target the JID of the file transfer target. * @return the packet to send notification to the stream host to * activate the stream. */ private static Bytestream createByteStreamActivate(final String sessionID, - final String from, final String to, final String target) { + final String from, final String to, final String target) + { Bytestream activate = new Bytestream(sessionID); activate.setMode(null); activate.setToActivate(target); @@ -578,61 +464,6 @@ public class Socks5TransferNegotiator extends StreamNegotiator { return activate; } - /** - * Negotiates the Socks 5 bytestream when the local computer is acting as - * the proxy. - * - * @param connection the socket connection with the peer. - * @return the SHA-1 digest that is used to uniquely identify the file - * transfer. - * @throws XMPPException - * @throws IOException - */ - private String establishSocks5UploadConnection(Socket connection) throws XMPPException, IOException { - OutputStream out = new DataOutputStream(connection.getOutputStream()); - InputStream in = new DataInputStream(connection.getInputStream()); - - // first byte is version should be 5 - int b = in.read(); - if (b != 5) { - throw new XMPPException("Only SOCKS5 supported"); - } - - // second byte number of authentication methods supported - b = in.read(); - int[] auth = new int[b]; - for (int i = 0; i < b; i++) { - auth[i] = in.read(); - } - - int authMethod = -1; - for (int i = 0; i < auth.length; i++) { - authMethod = (auth[i] == 0 ? 0 : -1); // only auth method - // 0, no - // authentication, - // supported - if (authMethod == 0) { - break; - } - } - if (authMethod != 0) { - throw new XMPPException("Authentication method not supported"); - } - byte[] cmd = new byte[2]; - cmd[0] = (byte) 0x05; - cmd[1] = (byte) 0x00; - out.write(cmd); - - String responseDigest = createIncomingSocks5Message(in); - cmd = createOutgoingSocks5Message(0, responseDigest); - - if (!connection.isConnected()) { - throw new XMPPException("Socket closed by remote user"); - } - out.write(cmd); - return responseDigest; - } - public String[] getNamespaces() { return new String[]{NAMESPACE}; } @@ -659,7 +490,7 @@ public class Socks5TransferNegotiator extends StreamNegotiator { createIncomingSocks5Message(in); } - private String createIncomingSocks5Message(InputStream in) + static String createIncomingSocks5Message(InputStream in) throws IOException { byte[] cmd = new byte[5]; in.read(cmd, 0, 5); @@ -673,7 +504,7 @@ public class Socks5TransferNegotiator extends StreamNegotiator { return digest; } - private byte[] createOutgoingSocks5Message(int cmd, String digest) { + static byte[] createOutgoingSocks5Message(int cmd, String digest) { byte addr[] = digest.getBytes(); byte[] data = new byte[7 + addr.length]; @@ -691,14 +522,7 @@ public class Socks5TransferNegotiator extends StreamNegotiator { } public void cleanup() { - synchronized (processLock) { - if (proxyProcess != null) { - proxyProcess.stop(); - } - } - } - public void cancel() { } private static class SelectedHostInfo { @@ -718,132 +542,6 @@ public class Socks5TransferNegotiator extends StreamNegotiator { } } - private class ProxyProcess implements Runnable { - - private final ServerSocket listeningSocket; - - private final Map connectionMap = new HashMap(); - - private boolean done = false; - - private Thread thread; - private int transfers; - - public void run() { - try { - try { - listeningSocket.setSoTimeout(10000); - } - catch (SocketException e) { - // There was a TCP error, lets print the stack trace - e.printStackTrace(); - return; - } - while (!done) { - Socket conn = null; - synchronized (ProxyProcess.this) { - while (transfers <= 0 && !done) { - transfers = -1; - try { - ProxyProcess.this.wait(); - } - catch (InterruptedException e) { - /* Do nothing */ - } - } - } - if(done) { - break; - } - try { - synchronized (listeningSocket) { - conn = listeningSocket.accept(); - } - if (conn == null) { - continue; - } - String digest = establishSocks5UploadConnection(conn); - synchronized (connectionMap) { - connectionMap.put(digest, conn); - } - } - catch (SocketTimeoutException e) { - /* Do Nothing */ - } - catch (IOException e) { - /* Do Nothing */ - } - catch (XMPPException e) { - e.printStackTrace(); - if (conn != null) { - try { - conn.close(); - } - catch (IOException e1) { - /* Do Nothing */ - } - } - } - } - } - finally { - try { - listeningSocket.close(); - } - catch (IOException e) { - /* Do Nothing */ - } - } - } - - - public void start() { - thread.start(); - } - - public void stop() { - done = true; - synchronized (this) { - this.notify(); - } - synchronized (listeningSocket) { - listeningSocket.notify(); - } - } - - public int getPort() { - return listeningSocket.getLocalPort(); - } - - ProxyProcess(ServerSocket listeningSocket) { - thread = new Thread(this, "File Transfer Connection Listener"); - this.listeningSocket = listeningSocket; - } - - public Socket getSocket(String digest) { - synchronized (connectionMap) { - return (Socket) connectionMap.get(digest); - } - } - - public void addTransfer() { - synchronized (this) { - if (transfers == -1) { - transfers = 1; - this.notify(); - } - else { - transfers++; - } - } - } - - public void removeTransfer() { - synchronized (this) { - transfers--; - } - } - } private static class BytestreamSIDFilter implements PacketFilter { diff --git a/source/org/jivesoftware/smackx/filetransfer/Socks5TransferNegotiatorManager.java b/source/org/jivesoftware/smackx/filetransfer/Socks5TransferNegotiatorManager.java new file mode 100644 index 000000000..35f2c3509 --- /dev/null +++ b/source/org/jivesoftware/smackx/filetransfer/Socks5TransferNegotiatorManager.java @@ -0,0 +1,388 @@ +/** + * $Revision:$ + * $Date:$ + * + * Copyright 2003-2007 Jive Software. + * + * All rights reserved. Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.jivesoftware.smackx.filetransfer; + +import org.jivesoftware.smack.util.Cache; +import org.jivesoftware.smack.XMPPException; +import org.jivesoftware.smack.PacketCollector; +import org.jivesoftware.smack.SmackConfiguration; +import org.jivesoftware.smack.XMPPConnection; +import org.jivesoftware.smack.filter.PacketIDFilter; +import org.jivesoftware.smack.packet.IQ; +import org.jivesoftware.smackx.ServiceDiscoveryManager; +import org.jivesoftware.smackx.packet.DiscoverItems; +import org.jivesoftware.smackx.packet.Bytestream; +import org.jivesoftware.smackx.packet.DiscoverInfo; + +import java.net.ServerSocket; +import java.net.Socket; +import java.net.SocketException; +import java.net.SocketTimeoutException; +import java.util.*; +import java.io.*; + +/** + * + */ +public class Socks5TransferNegotiatorManager implements FileTransferNegotiatorManager { + + private static final long BLACKLIST_LIFETIME = 60 * 1000 * 120; + // locks the proxies during their initialization process + private final Object proxyLock = new Object(); + + private static ProxyProcess proxyProcess; + + // locks on the proxy process during its initiatilization process + private final Object processLock = new Object(); + + private final Cache addressBlacklist + = new Cache(100, BLACKLIST_LIFETIME); + + private XMPPConnection connection; + + private List proxies; + + private List streamHosts; + + public Socks5TransferNegotiatorManager(XMPPConnection connection) { + this.connection = connection; + } + + public StreamNegotiator createNegotiator() { + return new Socks5TransferNegotiator(this, connection); + } + + public void incrementConnectionFailures(String address) { + Integer count = addressBlacklist.get(address); + if (count == null) { + count = 1; + } + else { + count += 1; + } + addressBlacklist.put(address, count); + } + + public int getConnectionFailures(String address) { + Integer count = addressBlacklist.get(address); + return count != null ? count : 0; + } + + public ProxyProcess addTransfer() throws IOException { + synchronized (processLock) { + if (proxyProcess == null) { + proxyProcess = new ProxyProcess(new ServerSocket(7777)); + proxyProcess.start(); + } + } + proxyProcess.addTransfer(); + return proxyProcess; + } + + public void removeTransfer() { + if (proxyProcess == null) { + return; + } + proxyProcess.removeTransfer(); + } + + public Collection getStreamHosts() { + synchronized (proxyLock) { + if (proxies == null) { + initProxies(); + } + } + return Collections.unmodifiableCollection(streamHosts); + } + + /** + * Checks the service discovery item returned from a server component to verify if it is + * a File Transfer proxy or not. + * + * @param manager the service discovery manager which will be used to query the component + * @param item the discovered item on the server relating + * @return returns the JID of the proxy if it is a proxy or null if the item is not a proxy. + */ + private String checkIsProxy(ServiceDiscoveryManager manager, DiscoverItems.Item item) { + DiscoverInfo info; + try { + info = manager.discoverInfo(item.getEntityID()); + } + catch (XMPPException e) { + return null; + } + Iterator itx = info.getIdentities(); + while (itx.hasNext()) { + DiscoverInfo.Identity identity = (DiscoverInfo.Identity) itx.next(); + if ("proxy".equalsIgnoreCase(identity.getCategory()) + && "bytestreams".equalsIgnoreCase( + identity.getType())) { + return info.getFrom(); + } + } + return null; + } + + private void initProxies() { + proxies = new ArrayList(); + ServiceDiscoveryManager manager = ServiceDiscoveryManager + .getInstanceFor(connection); + try { + DiscoverItems discoItems = manager.discoverItems(connection.getServiceName()); + Iterator it = discoItems.getItems(); + while (it.hasNext()) { + DiscoverItems.Item item = (DiscoverItems.Item) it.next(); + String proxy = checkIsProxy(manager, item); + if (proxy != null) { + proxies.add(proxy); + } + } + } + catch (XMPPException e) { + return; + } + if (proxies.size() > 0) { + initStreamHosts(); + } + } + + /** + * Loads streamhost address and ports from the proxies on the local server. + */ + private void initStreamHosts() { + List streamHosts = new ArrayList(); + Iterator it = proxies.iterator(); + IQ query; + PacketCollector collector; + Bytestream response; + while (it.hasNext()) { + String jid = it.next().toString(); + query = new IQ() { + public String getChildElementXML() { + return ""; + } + }; + query.setType(IQ.Type.GET); + query.setTo(jid); + + collector = connection.createPacketCollector(new PacketIDFilter( + query.getPacketID())); + connection.sendPacket(query); + + response = (Bytestream) collector.nextResult(SmackConfiguration + .getPacketReplyTimeout()); + if (response != null) { + streamHosts.addAll(response.getStreamHosts()); + } + collector.cancel(); + } + this.streamHosts = streamHosts; + } + + public void cleanup() { + synchronized (processLock) { + if (proxyProcess != null) { + proxyProcess.stop(); + proxyProcess = null; + } + } + } + + class ProxyProcess implements Runnable { + + private final ServerSocket listeningSocket; + + private final Map connectionMap = new HashMap(); + + private boolean done = false; + + private Thread thread; + private int transfers; + + public void run() { + try { + try { + listeningSocket.setSoTimeout(10000); + } + catch (SocketException e) { + // There was a TCP error, lets print the stack trace + e.printStackTrace(); + return; + } + while (!done) { + Socket conn = null; + synchronized (ProxyProcess.this) { + while (transfers <= 0 && !done) { + transfers = -1; + try { + ProxyProcess.this.wait(); + } + catch (InterruptedException e) { + /* Do nothing */ + } + } + } + if (done) { + break; + } + try { + synchronized (listeningSocket) { + conn = listeningSocket.accept(); + } + if (conn == null) { + continue; + } + String digest = establishSocks5UploadConnection(conn); + synchronized (connectionMap) { + connectionMap.put(digest, conn); + } + } + catch (SocketTimeoutException e) { + /* Do Nothing */ + } + catch (IOException e) { + /* Do Nothing */ + } + catch (XMPPException e) { + e.printStackTrace(); + if (conn != null) { + try { + conn.close(); + } + catch (IOException e1) { + /* Do Nothing */ + } + } + } + } + } + finally { + try { + listeningSocket.close(); + } + catch (IOException e) { + /* Do Nothing */ + } + } + } + + /** + * Negotiates the Socks 5 bytestream when the local computer is acting as + * the proxy. + * + * @param connection the socket connection with the peer. + * @return the SHA-1 digest that is used to uniquely identify the file + * transfer. + * @throws XMPPException + * @throws IOException + */ + private String establishSocks5UploadConnection(Socket connection) throws XMPPException, IOException { + OutputStream out = new DataOutputStream(connection.getOutputStream()); + InputStream in = new DataInputStream(connection.getInputStream()); + + // first byte is version should be 5 + int b = in.read(); + if (b != 5) { + throw new XMPPException("Only SOCKS5 supported"); + } + + // second byte number of authentication methods supported + b = in.read(); + int[] auth = new int[b]; + for (int i = 0; i < b; i++) { + auth[i] = in.read(); + } + + int authMethod = -1; + for (int anAuth : auth) { + authMethod = (anAuth == 0 ? 0 : -1); // only auth method + // 0, no + // authentication, + // supported + if (authMethod == 0) { + break; + } + } + if (authMethod != 0) { + throw new XMPPException("Authentication method not supported"); + } + byte[] cmd = new byte[2]; + cmd[0] = (byte) 0x05; + cmd[1] = (byte) 0x00; + out.write(cmd); + + String responseDigest = Socks5TransferNegotiator.createIncomingSocks5Message(in); + cmd = Socks5TransferNegotiator.createOutgoingSocks5Message(0, responseDigest); + + if (!connection.isConnected()) { + throw new XMPPException("Socket closed by remote user"); + } + out.write(cmd); + return responseDigest; + } + + + public void start() { + thread.start(); + } + + public void stop() { + done = true; + synchronized (this) { + this.notify(); + } + synchronized (listeningSocket) { + listeningSocket.notify(); + } + } + + public int getPort() { + return listeningSocket.getLocalPort(); + } + + ProxyProcess(ServerSocket listeningSocket) { + thread = new Thread(this, "File Transfer Connection Listener"); + this.listeningSocket = listeningSocket; + } + + public Socket getSocket(String digest) { + synchronized (connectionMap) { + return connectionMap.get(digest); + } + } + + public void addTransfer() { + synchronized (this) { + if (transfers == -1) { + transfers = 1; + this.notify(); + } + else { + transfers++; + } + } + } + + public void removeTransfer() { + synchronized (this) { + transfers--; + } + } + } +} diff --git a/source/org/jivesoftware/smackx/filetransfer/StreamNegotiator.java b/source/org/jivesoftware/smackx/filetransfer/StreamNegotiator.java index e3beba542..f5021a451 100644 --- a/source/org/jivesoftware/smackx/filetransfer/StreamNegotiator.java +++ b/source/org/jivesoftware/smackx/filetransfer/StreamNegotiator.java @@ -54,7 +54,8 @@ public abstract class StreamNegotiator { * @return The response to be forwarded to the initator. */ public StreamInitiation createInitiationAccept( - StreamInitiation streamInitiationOffer, String [] namespaces) { + StreamInitiation streamInitiationOffer, String[] namespaces) + { StreamInitiation response = new StreamInitiation(); response.setTo(streamInitiationOffer.getFrom()); response.setFrom(streamInitiationOffer.getTo()); @@ -64,8 +65,8 @@ public abstract class StreamNegotiator { DataForm form = new DataForm(Form.TYPE_SUBMIT); FormField field = new FormField( FileTransferNegotiator.STREAM_DATA_FIELD_NAME); - for (int i = 0; i < namespaces.length; i++) { - field.addValue(namespaces[i]); + for (String namespace : namespaces) { + field.addValue(namespace); } form.addField(field); @@ -160,5 +161,4 @@ public abstract class StreamNegotiator { */ public abstract void cleanup(); - } diff --git a/source/org/jivesoftware/smackx/packet/Bytestream.java b/source/org/jivesoftware/smackx/packet/Bytestream.java index 1c14214ff..d37d9f6fb 100644 --- a/source/org/jivesoftware/smackx/packet/Bytestream.java +++ b/source/org/jivesoftware/smackx/packet/Bytestream.java @@ -33,9 +33,9 @@ public class Bytestream extends IQ { private String sessionID; - private Mode mode = Mode.TCP; + private Mode mode = Mode.tcp; - private final List streamHosts = new ArrayList(); + private final List streamHosts = new ArrayList(); private StreamHostUsed usedHost; @@ -63,7 +63,7 @@ public class Bytestream extends IQ { * Set the session ID related to the Byte Stream. The session ID is a unique * identifier used to differentiate between stream negotations. * - * @param sessionID + * @param sessionID the unique session ID that identifies the transfer. */ public void setSessionID(final String sessionID) { this.sessionID = sessionID; @@ -83,7 +83,7 @@ public class Bytestream extends IQ { * Set the transport mode. This should be put in the initiation of the * interaction. * - * @param mode + * @param mode the transport mode, either UDP or TCP * @see Mode */ public void setMode(final Mode mode) { @@ -145,7 +145,7 @@ public class Bytestream extends IQ { * * @return Returns the list of stream hosts contained in the packet. */ - public Collection getStreamHosts() { + public Collection getStreamHosts() { return Collections.unmodifiableCollection(streamHosts); } @@ -158,9 +158,10 @@ public class Bytestream extends IQ { * if there is none. */ public StreamHost getStreamHost(final String JID) { - StreamHost host; - for (Iterator it = streamHosts.iterator(); it.hasNext();) { - host = (StreamHost) it.next(); + if(JID == null) { + return null; + } + for (StreamHost host : streamHosts) { if (host.getJID().equals(JID)) { return host; } @@ -233,8 +234,9 @@ public class Bytestream extends IQ { buf.append(" mode = \"").append(getMode()).append("\""); buf.append(">"); if (getToActivate() == null) { - for (Iterator it = getStreamHosts().iterator(); it.hasNext();) - buf.append(((StreamHost) it.next()).toXML()); + for (StreamHost streamHost : getStreamHosts()) { + buf.append(streamHost.toXML()); + } } else { buf.append(getToActivate().toXML()); @@ -246,8 +248,8 @@ public class Bytestream extends IQ { buf.append(getUsedHost().toXML()); // A result from the server can also contain stream hosts else if (countStreamHosts() > 0) { - for (Iterator it = getStreamHosts().iterator(); it.hasNext();) { - buf.append(((StreamHost) it.next()).toXML()); + for (StreamHost host : streamHosts) { + buf.append(host.toXML()); } } } @@ -456,32 +458,28 @@ public class Bytestream extends IQ { * * @author Alexander Wenckus */ - public static class Mode { + public enum Mode { /** * A TCP based stream. */ - public static Mode TCP = new Mode("tcp"); + tcp, /** * A UDP based stream. */ - public static Mode UDP = new Mode("udp"); + udp; - private final String modeString; + public static Mode fromName(String name) { + Mode mode; + try { + mode = Mode.valueOf(name); + } + catch(Exception ex) { + mode = tcp; + } - private Mode(final String mode) { - this.modeString = mode; - } - - public String toString() { - return modeString; - } - - public boolean equals(final Object obj) { - if (!(obj instanceof Mode)) - return false; - return modeString.equals(((Mode) obj).modeString); + return mode; } } } diff --git a/source/org/jivesoftware/smackx/provider/BytestreamsProvider.java b/source/org/jivesoftware/smackx/provider/BytestreamsProvider.java index c67432cc0..b45a97f52 100644 --- a/source/org/jivesoftware/smackx/provider/BytestreamsProvider.java +++ b/source/org/jivesoftware/smackx/provider/BytestreamsProvider.java @@ -33,21 +33,10 @@ public class BytestreamsProvider implements IQProvider { /* * (non-Javadoc) - * - * @see org.jivesoftware.smack.provider.IQProvider#parseIQ(org.xmlpull.v1.XmlPullParser) - */ - /* - * (non-Javadoc) - * - * @see org.jivesoftware.smack.provider.IQProvider#parseIQ(org.xmlpull.v1.XmlPullParser) - */ - /* - * (non-Javadoc) - * + * * @see org.jivesoftware.smack.provider.IQProvider#parseIQ(org.xmlpull.v1.XmlPullParser) */ public IQ parseIQ(XmlPullParser parser) throws Exception { - // StringBuilder buf = new StringBuilder(); boolean done = false; Bytestream toReturn = new Bytestream(); @@ -95,8 +84,7 @@ public class BytestreamsProvider implements IQProvider { } } - toReturn.setMode((mode == "udp" ? Bytestream.Mode.UDP - : Bytestream.Mode.TCP)); + toReturn.setMode((Bytestream.Mode.fromName(mode))); toReturn.setSessionID(id); return toReturn; }