File transfer fault tolerance

git-svn-id: http://svn.igniterealtime.org/svn/repos/smack/trunk@3406 b35dd754-fafc-0310-a699-88a17e54d16e
This commit is contained in:
Alex Wenckus 2006-02-08 00:31:17 +00:00 committed by alex
parent d807155a29
commit 586e3a1604
8 changed files with 440 additions and 181 deletions

View File

@ -0,0 +1,130 @@
/**
* $RCSfile$
* $Revision: $
* $Date: $
*
* Copyright 2003-2006 Jive Software.
*
* All rights reserved. Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jivesoftware.smackx.filetransfer;
import org.jivesoftware.smack.PacketCollector;
import org.jivesoftware.smack.SmackConfiguration;
import org.jivesoftware.smack.XMPPConnection;
import org.jivesoftware.smack.XMPPException;
import org.jivesoftware.smack.filter.OrFilter;
import org.jivesoftware.smack.filter.PacketFilter;
import org.jivesoftware.smack.packet.Packet;
import org.jivesoftware.smackx.packet.StreamInitiation;
import java.io.InputStream;
import java.io.OutputStream;
/**
* The fault tolerant negotiator takes two stream negotiators, the primary and the secondary negotiator.
* If the primary negotiator fails during the stream negotiaton process, the second negotiator is used.
*/
public class FaultTolerantNegotiator extends StreamNegotiator {
private StreamNegotiator primaryNegotiator;
private StreamNegotiator secondaryNegotiator;
private XMPPConnection connection;
private PacketFilter primaryFilter;
private PacketFilter secondaryFilter;
public FaultTolerantNegotiator(XMPPConnection connection, StreamNegotiator primary, StreamNegotiator secondary) {
this.primaryNegotiator = primary;
this.secondaryNegotiator = secondary;
this.connection = connection;
}
public PacketFilter getInitiationPacketFilter(String from, String streamID) {
if (primaryFilter == null || secondaryFilter == null) {
primaryFilter = primaryNegotiator.getInitiationPacketFilter(from, streamID);
secondaryFilter = secondaryNegotiator.getInitiationPacketFilter(from, streamID);
}
return new OrFilter(primaryFilter, secondaryFilter);
}
InputStream negotiateIncomingStream(Packet streamInitiation) throws XMPPException {
throw new UnsupportedOperationException("Negotiation only handled by create incoming stream method.");
}
final Packet initiateIncomingStream(XMPPConnection connection, StreamInitiation initiation) throws XMPPException {
throw new UnsupportedOperationException("Initiation handled by createIncomingStream method");
}
public InputStream createIncomingStream(StreamInitiation initiation) throws XMPPException {
PacketFilter filter = getInitiationPacketFilter(initiation.getFrom(), initiation.getSessionID());
PacketCollector collector = connection.createPacketCollector(filter);
StreamInitiation response = super.createInitiationAccept(initiation, getNamespaces());
connection.sendPacket(response);
InputStream stream;
try {
Packet streamInitiation = collector.nextResult(SmackConfiguration.getPacketReplyTimeout());
if (streamInitiation == null) {
throw new XMPPException("No response from remote client");
}
StreamNegotiator negotiator = determineNegotiator(streamInitiation);
stream = negotiator.negotiateIncomingStream(streamInitiation);
}
catch (XMPPException ex) {
ex.printStackTrace();
Packet streamInitiation = collector.nextResult(SmackConfiguration.getPacketReplyTimeout());
collector.cancel();
if (streamInitiation == null) {
throw new XMPPException("No response from remote client");
}
StreamNegotiator negotiator = determineNegotiator(streamInitiation);
stream = negotiator.negotiateIncomingStream(streamInitiation);
} finally {
collector.cancel();
}
return stream;
}
private StreamNegotiator determineNegotiator(Packet streamInitiation) {
return primaryFilter.accept(streamInitiation) ? primaryNegotiator : secondaryNegotiator;
}
public OutputStream createOutgoingStream(String streamID, String initiator, String target) throws XMPPException {
OutputStream stream;
try {
stream = primaryNegotiator.createOutgoingStream(streamID, initiator, target);
}
catch (XMPPException ex) {
stream = secondaryNegotiator.createOutgoingStream(streamID, initiator, target);
}
return stream;
}
public String[] getNamespaces() {
String [] primary = primaryNegotiator.getNamespaces();
String [] secondary = secondaryNegotiator.getNamespaces();
String [] namespaces = new String[primary.length + secondary.length];
System.arraycopy(primary, 0, namespaces, 0, primary.length);
System.arraycopy(secondary, 0, namespaces, primary.length, secondary.length);
return namespaces;
}
public void cleanup() {
}
}

View File

@ -115,7 +115,10 @@ public abstract class FileTransfer {
* and 1. * and 1.
*/ */
public double getProgress() { public double getProgress() {
return 0; if(amountWritten == 0) {
return 0;
}
return amountWritten / fileSize;
} }
/** /**

View File

@ -72,6 +72,8 @@ public class FileTransferNegotiator {
private static final Random randomGenerator = new Random(); private static final Random randomGenerator = new Random();
public static boolean IBB_ONLY = false;
/** /**
* Returns the file transfer negotiator related to a particular connection. * Returns the file transfer negotiator related to a particular connection.
* When this class is requested on a particular connection the file transfer * When this class is requested on a particular connection the file transfer
@ -82,6 +84,9 @@ public class FileTransferNegotiator {
*/ */
public static FileTransferNegotiator getInstanceFor( public static FileTransferNegotiator getInstanceFor(
final XMPPConnection connection) { final XMPPConnection connection) {
if (connection == null) {
throw new IllegalArgumentException("Connection cannot be null");
}
if (!connection.isConnected()) { if (!connection.isConnected()) {
return null; return null;
} }
@ -227,7 +232,7 @@ public class FileTransferNegotiator {
StreamNegotiator selectedStreamNegotiator; StreamNegotiator selectedStreamNegotiator;
try { try {
selectedStreamNegotiator = selectProtocol(streamMethodField); selectedStreamNegotiator = getNegotiator(streamMethodField);
} }
catch (XMPPException e) { catch (XMPPException e) {
IQ iqPacket = createIQ(si.getPacketID(), si.getFrom(), si.getTo(), IQ iqPacket = createIQ(si.getPacketID(), si.getFrom(), si.getTo(),
@ -254,14 +259,14 @@ public class FileTransferNegotiator {
return field; return field;
} }
private StreamNegotiator selectProtocol(final FormField field) private StreamNegotiator getNegotiator(final FormField field)
throws XMPPException { throws XMPPException {
String variable = null; String variable;
boolean isByteStream = false; boolean isByteStream = false;
boolean isIBB = false; boolean isIBB = false;
for (Iterator it = field.getOptions(); it.hasNext();) { for (Iterator it = field.getOptions(); it.hasNext();) {
variable = ((FormField.Option) it.next()).getValue(); variable = ((FormField.Option) it.next()).getValue();
if (variable.equals(BYTE_STREAM)) { if (variable.equals(BYTE_STREAM) && !IBB_ONLY) {
isByteStream = true; isByteStream = true;
} }
else if (variable.equals(INBAND_BYTE_STREAM)) { else if (variable.equals(INBAND_BYTE_STREAM)) {
@ -274,8 +279,15 @@ public class FileTransferNegotiator {
throw new XMPPException("No acceptable transfer mechanism", error); throw new XMPPException("No acceptable transfer mechanism", error);
} }
return (isByteStream ? byteStreamTransferManager if (isByteStream && isIBB && field.getType().equals(FormField.TYPE_LIST_MULTI)) {
: inbandTransferManager); return new FaultTolerantNegotiator(connection, byteStreamTransferManager, inbandTransferManager);
}
else if (isByteStream) {
return byteStreamTransferManager;
}
else {
return inbandTransferManager;
}
} }
/** /**
@ -362,9 +374,8 @@ public class FileTransferNegotiator {
IQ iqResponse = (IQ) siResponse; IQ iqResponse = (IQ) siResponse;
if (iqResponse.getType().equals(IQ.Type.RESULT)) { if (iqResponse.getType().equals(IQ.Type.RESULT)) {
StreamInitiation response = (StreamInitiation) siResponse; StreamInitiation response = (StreamInitiation) siResponse;
return getUploadNegotiator((((FormField) response return getOutgoingNegotiator(getStreamMethodField(response
.getFeatureNegotiationForm().getFields().next()) .getFeatureNegotiationForm()));
.getValues().next()).toString());
} }
else if (iqResponse.getType().equals(IQ.Type.ERROR)) { else if (iqResponse.getType().equals(IQ.Type.ERROR)) {
@ -379,23 +390,44 @@ public class FileTransferNegotiator {
} }
} }
private StreamNegotiator getUploadNegotiator(String selectedProtocol) { private StreamNegotiator getOutgoingNegotiator(final FormField field)
if (selectedProtocol.equals(BYTE_STREAM)) { throws XMPPException {
String variable;
boolean isByteStream = false;
boolean isIBB = false;
for (Iterator it = field.getValues(); it.hasNext();) {
variable = (it.next().toString());
if (variable.equals(BYTE_STREAM) && !IBB_ONLY) {
isByteStream = true;
}
else if (variable.equals(INBAND_BYTE_STREAM)) {
isIBB = true;
}
}
if (!isByteStream && !isIBB) {
XMPPError error = new XMPPError(400);
throw new XMPPException("No acceptable transfer mechanism", error);
}
if (isByteStream && isIBB) {
return new FaultTolerantNegotiator(connection, byteStreamTransferManager, inbandTransferManager);
}
else if (isByteStream) {
return byteStreamTransferManager; return byteStreamTransferManager;
} }
else if (selectedProtocol.equals(INBAND_BYTE_STREAM)) {
return inbandTransferManager;
}
else { else {
return null; return inbandTransferManager;
} }
} }
private DataForm createDefaultInitiationForm() { private DataForm createDefaultInitiationForm() {
DataForm form = new DataForm(Form.TYPE_FORM); DataForm form = new DataForm(Form.TYPE_FORM);
FormField field = new FormField(STREAM_DATA_FIELD_NAME); FormField field = new FormField(STREAM_DATA_FIELD_NAME);
field.setType(FormField.TYPE_LIST_SINGLE); field.setType(FormField.TYPE_LIST_MULTI);
field.addOption(new FormField.Option(BYTE_STREAM)); if (!IBB_ONLY) {
field.addOption(new FormField.Option(BYTE_STREAM));
}
field.addOption(new FormField.Option(INBAND_BYTE_STREAM)); field.addOption(new FormField.Option(INBAND_BYTE_STREAM));
form.addField(field); form.addField(field);
return form; return form;

View File

@ -19,7 +19,10 @@
*/ */
package org.jivesoftware.smackx.filetransfer; package org.jivesoftware.smackx.filetransfer;
import org.jivesoftware.smack.*; import org.jivesoftware.smack.PacketCollector;
import org.jivesoftware.smack.PacketListener;
import org.jivesoftware.smack.XMPPConnection;
import org.jivesoftware.smack.XMPPException;
import org.jivesoftware.smack.filter.*; import org.jivesoftware.smack.filter.*;
import org.jivesoftware.smack.packet.IQ; import org.jivesoftware.smack.packet.IQ;
import org.jivesoftware.smack.packet.Message; import org.jivesoftware.smack.packet.Message;
@ -58,38 +61,22 @@ public class IBBTransferNegotiator extends StreamNegotiator {
this.connection = connection; this.connection = connection;
} }
/* public PacketFilter getInitiationPacketFilter(String from, String streamID) {
* (non-Javadoc) return new AndFilter(new FromContainsFilter(
* from), new IBBOpenSidFilter(streamID));
* @see org.jivesoftware.smackx.filetransfer.StreamNegotiator#initiateDownload(org.jivesoftware.smackx.packet.StreamInitiation, }
* java.io.File)
*/
public InputStream initiateIncomingStream(StreamInitiation initiation)
throws XMPPException {
StreamInitiation response = super.createInitiationAccept(initiation,
NAMESPACE);
// establish collector to await response InputStream negotiateIncomingStream(Packet streamInitiation) throws XMPPException {
PacketCollector collector = connection Open openRequest = (Open) streamInitiation;
.createPacketCollector(new AndFilter(new FromContainsFilter(
initiation.getFrom()), new IBBSidFilter(initiation.getSessionID())));
connection.sendPacket(response);
IBBExtensions.Open openRequest = (IBBExtensions.Open) collector if (openRequest.getType().equals(IQ.Type.ERROR)) {
.nextResult(SmackConfiguration.getPacketReplyTimeout());
if (openRequest == null) {
throw new XMPPException("No response from file transfer initiator");
}
else if (openRequest.getType().equals(IQ.Type.ERROR)) {
throw new XMPPException(openRequest.getError()); throw new XMPPException(openRequest.getError());
} }
collector.cancel();
PacketFilter dataFilter = new AndFilter(new PacketExtensionFilter( PacketFilter dataFilter = new IBBMessageSidFilter(openRequest.getFrom(),
IBBExtensions.Data.ELEMENT_NAME, IBBExtensions.NAMESPACE), openRequest.getSessionID());
new FromMatchesFilter(initiation.getFrom()));
PacketFilter closeFilter = new AndFilter(new PacketTypeFilter( PacketFilter closeFilter = new AndFilter(new PacketTypeFilter(
IBBExtensions.Close.class), new FromMatchesFilter(initiation IBBExtensions.Close.class), new FromMatchesFilter(openRequest
.getFrom())); .getFrom()));
InputStream stream = new IBBInputStream(openRequest.getSessionID(), InputStream stream = new IBBInputStream(openRequest.getSessionID(),
@ -100,6 +87,11 @@ public class IBBTransferNegotiator extends StreamNegotiator {
return stream; return stream;
} }
public InputStream createIncomingStream(StreamInitiation initiation) throws XMPPException {
Packet openRequest = initiateIncomingStream(connection, initiation);
return negotiateIncomingStream(openRequest);
}
/** /**
* Creates and sends the response for the open request. * Creates and sends the response for the open request.
* *
@ -111,7 +103,7 @@ public class IBBTransferNegotiator extends StreamNegotiator {
IQ.Type.RESULT)); IQ.Type.RESULT));
} }
public OutputStream initiateOutgoingStream(String streamID, String initiator, public OutputStream createOutgoingStream(String streamID, String initiator,
String target) throws XMPPException { String target) throws XMPPException {
Open openIQ = new Open(streamID, DEFAULT_BLOCK_SIZE); Open openIQ = new Open(streamID, DEFAULT_BLOCK_SIZE);
openIQ.setTo(target); openIQ.setTo(target);
@ -143,8 +135,11 @@ public class IBBTransferNegotiator extends StreamNegotiator {
return new IBBOutputStream(target, streamID, DEFAULT_BLOCK_SIZE); return new IBBOutputStream(target, streamID, DEFAULT_BLOCK_SIZE);
} }
public String getNamespace() { public String[] getNamespaces() {
return NAMESPACE; return new String[]{NAMESPACE};
}
public void cleanup() {
} }
private class IBBOutputStream extends OutputStream { private class IBBOutputStream extends OutputStream {
@ -155,22 +150,25 @@ public class IBBTransferNegotiator extends StreamNegotiator {
protected int seq = 0; protected int seq = 0;
private final Message template; final String userID;
private final int options = Base64.DONT_BREAK_LINES; private final int options = Base64.DONT_BREAK_LINES;
private IQ closePacket; final private IQ closePacket;
private String messageID; private String messageID;
private String sid;
IBBOutputStream(String userID, String sid, int blockSize) { IBBOutputStream(String userID, String sid, int blockSize) {
if (blockSize <= 0) { if (blockSize <= 0) {
throw new IllegalArgumentException("Buffer size <= 0"); throw new IllegalArgumentException("Buffer size <= 0");
} }
buffer = new byte[blockSize]; buffer = new byte[blockSize];
template = new Message(userID); this.userID = userID;
Message template = new Message(userID);
messageID = template.getPacketID(); messageID = template.getPacketID();
template.addExtension(new IBBExtensions.Data(sid)); this.sid = sid;
closePacket = createClosePacket(userID, sid); closePacket = createClosePacket(userID, sid);
} }
@ -209,12 +207,11 @@ public class IBBTransferNegotiator extends StreamNegotiator {
} }
private void writeToXML(byte[] buffer, int offset, int len) { private void writeToXML(byte[] buffer, int offset, int len) {
template.setPacketID(messageID + "_" + seq); Message template = createTemplate(messageID + "_" + seq);
IBBExtensions.Data ext = (IBBExtensions.Data) template IBBExtensions.Data ext = new IBBExtensions.Data(sid);
.getExtension(IBBExtensions.Data.ELEMENT_NAME, template.addExtension(ext);
IBBExtensions.NAMESPACE);
String enc = Base64.encodeBytes(buffer, offset, count, options); String enc = Base64.encodeBytes(buffer, offset, len, options);
ext.setData(enc); ext.setData(enc);
ext.setSeq(seq); ext.setSeq(seq);
@ -236,6 +233,11 @@ public class IBBTransferNegotiator extends StreamNegotiator {
write(b, 0, b.length); write(b, 0, b.length);
} }
public Message createTemplate(String messageID) {
Message template = new Message(userID);
template.setPacketID(messageID);
return template;
}
} }
private class IBBInputStream extends InputStream implements PacketListener { private class IBBInputStream extends InputStream implements PacketListener {
@ -304,24 +306,23 @@ public class IBBTransferNegotiator extends StreamNegotiator {
private boolean loadBufferWait() throws IOException { private boolean loadBufferWait() throws IOException {
IBBExtensions.Data data; IBBExtensions.Data data;
do {
Message mess = null; Message mess = null;
while (mess == null) { while (mess == null) {
if (isDone) { if (isDone) {
mess = (Message) dataCollector.pollResult(); mess = (Message) dataCollector.pollResult();
if (mess == null) { if (mess == null) {
return false; return false;
}
}
else {
mess = (Message) dataCollector.nextResult(1000);
} }
} }
data = (IBBExtensions.Data) mess.getExtension( else {
IBBExtensions.Data.ELEMENT_NAME, mess = (Message) dataCollector.nextResult(1000);
IBBExtensions.NAMESPACE); }
} }
while (!data.getSessionID().equals(streamID)); data = (IBBExtensions.Data) mess.getExtension(
IBBExtensions.Data.ELEMENT_NAME,
IBBExtensions.NAMESPACE);
checkSequence((int) data.getSeq()); checkSequence((int) data.getSeq());
buffer = Base64.decode(data.getData()); buffer = Base64.decode(data.getData());
bufferPointer = 0; bufferPointer = 0;
@ -389,11 +390,11 @@ public class IBBTransferNegotiator extends StreamNegotiator {
} }
} }
private static class IBBSidFilter implements PacketFilter { private static class IBBOpenSidFilter implements PacketFilter {
private String sessionID; private String sessionID;
public IBBSidFilter(String sessionID) { public IBBOpenSidFilter(String sessionID) {
if (sessionID == null) { if (sessionID == null) {
throw new IllegalArgumentException("StreamID cannot be null"); throw new IllegalArgumentException("StreamID cannot be null");
} }
@ -411,4 +412,31 @@ public class IBBTransferNegotiator extends StreamNegotiator {
} }
} }
private static class IBBMessageSidFilter implements PacketFilter {
private final String sessionID;
private String from;
public IBBMessageSidFilter(String from, String sessionID) {
this.from = from;
this.sessionID = sessionID;
}
public boolean accept(Packet packet) {
if (!(packet instanceof Message)) {
return false;
}
if (!packet.getFrom().equalsIgnoreCase(from)) {
return false;
}
IBBExtensions.Data data = (IBBExtensions.Data) packet.
getExtension(IBBExtensions.Data.ELEMENT_NAME, IBBExtensions.NAMESPACE);
if (data == null) {
return false;
}
return data.getSessionID() != null && data.getSessionID().equalsIgnoreCase(sessionID);
}
}
} }

View File

@ -175,7 +175,7 @@ public class IncomingFileTransfer extends FileTransfer {
.selectStreamNegotiator(recieveRequest); .selectStreamNegotiator(recieveRequest);
setStatus(Status.NEGOTIATING_STREAM); setStatus(Status.NEGOTIATING_STREAM);
InputStream inputStream = streamNegotiator InputStream inputStream = streamNegotiator
.initiateIncomingStream(recieveRequest.getStreamInitiation()); .createIncomingStream(recieveRequest.getStreamInitiation());
setStatus(Status.NEGOTIATED); setStatus(Status.NEGOTIATED);
return inputStream; return inputStream;
} }

View File

@ -309,8 +309,8 @@ public class OutgoingFileTransfer extends FileTransfer {
// Negotiate the stream // Negotiate the stream
setStatus(Status.NEGOTIATING_STREAM); setStatus(Status.NEGOTIATING_STREAM);
outputStream = streamNegotiator.initiateOutgoingStream(streamID, outputStream = streamNegotiator.createOutgoingStream(streamID,
initiator, getPeer()); initiator, getPeer());
if (!getStatus().equals(Status.NEGOTIATING_STREAM)) { if (!getStatus().equals(Status.NEGOTIATING_STREAM)) {
return null; return null;
} }

View File

@ -29,6 +29,7 @@ import org.jivesoftware.smack.filter.PacketFilter;
import org.jivesoftware.smack.filter.PacketIDFilter; import org.jivesoftware.smack.filter.PacketIDFilter;
import org.jivesoftware.smack.packet.IQ; import org.jivesoftware.smack.packet.IQ;
import org.jivesoftware.smack.packet.Packet; import org.jivesoftware.smack.packet.Packet;
import org.jivesoftware.smack.packet.XMPPError;
import org.jivesoftware.smack.util.StringUtils; import org.jivesoftware.smack.util.StringUtils;
import org.jivesoftware.smackx.ServiceDiscoveryManager; import org.jivesoftware.smackx.ServiceDiscoveryManager;
import org.jivesoftware.smackx.packet.Bytestream; import org.jivesoftware.smackx.packet.Bytestream;
@ -84,35 +85,40 @@ public class Socks5TransferNegotiator extends StreamNegotiator {
this.connection = connection; this.connection = connection;
} }
public PacketFilter getInitiationPacketFilter(String from, String sessionID) {
return new AndFilter(new FromMatchesFilter(from),
new BytestreamSIDFilter(sessionID));
}
/* /*
* (non-Javadoc) * (non-Javadoc)
* *
* @see org.jivesoftware.smackx.filetransfer.StreamNegotiator#initiateDownload(org.jivesoftware.smackx.packet.StreamInitiation, * @see org.jivesoftware.smackx.filetransfer.StreamNegotiator#initiateDownload(org.jivesoftware.smackx.packet.StreamInitiation,
* java.io.File) * java.io.File)
*/ */
public InputStream initiateIncomingStream(StreamInitiation initiation) InputStream negotiateIncomingStream(Packet streamInitiation)
throws XMPPException { throws XMPPException {
StreamInitiation response = super.createInitiationAccept(initiation,
NAMESPACE);
// establish collector to await response Bytestream streamHostsInfo = (Bytestream) streamInitiation;
PacketCollector collector = connection
.createPacketCollector(new AndFilter(new FromMatchesFilter(initiation.getFrom()),
new BytestreamSIDFilter(initiation.getSessionID())));
connection.sendPacket(response);
Bytestream streamHostsInfo = (Bytestream) collector
.nextResult(SmackConfiguration.getPacketReplyTimeout()); if (streamHostsInfo.getType().equals(IQ.Type.ERROR)) {
if (streamHostsInfo == null) {
throw new XMPPException("No response from file transfer initiator");
}
else if (streamHostsInfo.getType().equals(IQ.Type.ERROR)) {
throw new XMPPException(streamHostsInfo.getError()); throw new XMPPException(streamHostsInfo.getError());
} }
collector.cancel(); SelectedHostInfo selectedHost;
try {
// select appropriate host // select appropriate host
SelectedHostInfo selectedHost = selectHost(streamHostsInfo); selectedHost = selectHost(streamHostsInfo);
}
catch (XMPPException ex) {
if (ex.getXMPPError() != null) {
IQ errorPacket = super.createError(streamHostsInfo.getTo(),
streamHostsInfo.getFrom(), streamHostsInfo.getPacketID(),
ex.getXMPPError());
connection.sendPacket(errorPacket);
}
throw(ex);
}
// send used-host confirmation // send used-host confirmation
Bytestream streamResponse = createUsedHostConfirmation( Bytestream streamResponse = createUsedHostConfirmation(
@ -126,6 +132,12 @@ public class Socks5TransferNegotiator extends StreamNegotiator {
catch (IOException e) { catch (IOException e) {
throw new XMPPException("Error establishing input stream", e); throw new XMPPException("Error establishing input stream", e);
} }
}
public InputStream createIncomingStream(StreamInitiation initiation) throws XMPPException {
Packet streamInitiation = initiateIncomingStream(connection, initiation);
return negotiateIncomingStream(streamInitiation);
} }
/** /**
@ -176,12 +188,11 @@ public class Socks5TransferNegotiator extends StreamNegotiator {
e.printStackTrace(); e.printStackTrace();
selectedHost = null; selectedHost = null;
socket = null; socket = null;
continue;
} }
} }
if (selectedHost == null || socket == null) { if (selectedHost == null || socket == null) {
throw new XMPPException( throw new XMPPException(
"Could not establish socket with any provided host"); "Could not establish socket with any provided host", new XMPPError(406));
} }
return new SelectedHostInfo(selectedHost, socket); return new SelectedHostInfo(selectedHost, socket);
@ -212,7 +223,7 @@ public class Socks5TransferNegotiator extends StreamNegotiator {
* @see org.jivesoftware.smackx.filetransfer.StreamNegotiator#initiateUpload(java.lang.String, * @see org.jivesoftware.smackx.filetransfer.StreamNegotiator#initiateUpload(java.lang.String,
* org.jivesoftware.smackx.packet.StreamInitiation, java.io.File) * org.jivesoftware.smackx.packet.StreamInitiation, java.io.File)
*/ */
public OutputStream initiateOutgoingStream(String streamID, String initiator, public OutputStream createOutgoingStream(String streamID, String initiator,
String target) throws XMPPException { String target) throws XMPPException {
Socket socket; Socket socket;
try { try {
@ -294,6 +305,12 @@ public class Socks5TransferNegotiator extends StreamNegotiator {
throw new XMPPException("Unexpected response from remote user"); throw new XMPPException("Unexpected response from remote user");
} }
// check for an error
if (response.getType().equals(IQ.Type.ERROR)) {
throw new XMPPException("Remote client returned error, stream hosts expected",
response.getError());
}
StreamHostUsed used = response.getUsedHost(); StreamHostUsed used = response.getUsedHost();
StreamHost usedHost = query.getStreamHost(used.getJID()); StreamHost usedHost = query.getStreamHost(used.getJID());
if (usedHost == null) { if (usedHost == null) {
@ -370,11 +387,11 @@ public class Socks5TransferNegotiator extends StreamNegotiator {
* &lt;/iq&gt; * &lt;/iq&gt;
* </pre> * </pre>
* *
* @param from initiator@host1/foo - The file transfer initiator. * @param from initiator@host1/foo - The file transfer initiator.
* @param to target@host2/bar - The file transfer target. * @param to target@host2/bar - The file transfer target.
* @param sid 'mySID' - the unique identifier for this file transfer * @param sid 'mySID' - the unique identifier for this file transfer
* @param localIP The IP of the local machine if it is being provided, null otherwise. * @param localIP The IP of the local machine if it is being provided, null otherwise.
* @param port The port of the local mahine if it is being provided, null otherwise. * @param port The port of the local mahine if it is being provided, null otherwise.
* @return Returns the created <b><i>Bytestream</b></i> packet * @return Returns the created <b><i>Bytestream</b></i> packet
*/ */
private Bytestream createByteStreamInit(final String from, final String to, private Bytestream createByteStreamInit(final String from, final String to,
@ -546,8 +563,8 @@ public class Socks5TransferNegotiator extends StreamNegotiator {
return responseDigest; return responseDigest;
} }
public String getNamespace() { public String[] getNamespaces() {
return NAMESPACE; return new String[]{NAMESPACE};
} }
private void establishSOCKS5ConnectionToProxy(Socket socket, String digest) private void establishSOCKS5ConnectionToProxy(Socket socket, String digest)
@ -603,7 +620,10 @@ public class Socks5TransferNegotiator extends StreamNegotiator {
return data; return data;
} }
private class SelectedHostInfo { public void cleanup() {
}
private static class SelectedHostInfo {
protected XMPPException exception; protected XMPPException exception;
@ -684,6 +704,9 @@ public class Socks5TransferNegotiator extends StreamNegotiator {
public void stop() { public void stop() {
done = true; done = true;
synchronized(this) {
this.notify();
}
} }
public int getPort() { public int getPort() {

View File

@ -19,103 +19,146 @@
*/ */
package org.jivesoftware.smackx.filetransfer; package org.jivesoftware.smackx.filetransfer;
import java.io.InputStream; import org.jivesoftware.smack.PacketCollector;
import java.io.OutputStream; import org.jivesoftware.smack.SmackConfiguration;
import org.jivesoftware.smack.XMPPConnection;
import org.jivesoftware.smack.XMPPException; import org.jivesoftware.smack.XMPPException;
import org.jivesoftware.smack.filter.PacketFilter;
import org.jivesoftware.smack.packet.IQ; import org.jivesoftware.smack.packet.IQ;
import org.jivesoftware.smack.packet.Packet;
import org.jivesoftware.smack.packet.XMPPError;
import org.jivesoftware.smackx.Form; import org.jivesoftware.smackx.Form;
import org.jivesoftware.smackx.FormField; import org.jivesoftware.smackx.FormField;
import org.jivesoftware.smackx.packet.DataForm; import org.jivesoftware.smackx.packet.DataForm;
import org.jivesoftware.smackx.packet.StreamInitiation; import org.jivesoftware.smackx.packet.StreamInitiation;
import java.io.InputStream;
import java.io.OutputStream;
/** /**
* After the file transfer negotiation process is completed according to * After the file transfer negotiation process is completed according to
* JEP-0096, the negotation process is passed off to a particular stream * JEP-0096, the negotation process is passed off to a particular stream
* negotiator. The stream negotiator will then negotiate the chosen stream and * negotiator. The stream negotiator will then negotiate the chosen stream and
* return the stream to transfer the file. * return the stream to transfer the file.
* *
*
* @author Alexander Wenckus * @author Alexander Wenckus
*
*/ */
public abstract class StreamNegotiator { public abstract class StreamNegotiator {
/** /**
* Creates the initiation acceptance packet to forward to the stream * Creates the initiation acceptance packet to forward to the stream
* initiator. * initiator.
* *
* @param streamInitiationOffer * @param streamInitiationOffer The offer from the stream initatior to connect for a stream.
* The offer from the stream initatior to connect for a stream. * @param namespaces The namespace that relates to the accepted means of transfer.
* @param namespace * @return The response to be forwarded to the initator.
* The namespace that relates to the accepted means of transfer. */
* @return The response to be forwarded to the initator. public StreamInitiation createInitiationAccept(
*/ StreamInitiation streamInitiationOffer, String [] namespaces) {
public StreamInitiation createInitiationAccept( StreamInitiation response = new StreamInitiation();
StreamInitiation streamInitiationOffer, String namespace) { response.setTo(streamInitiationOffer.getFrom());
StreamInitiation response = new StreamInitiation(); response.setFrom(streamInitiationOffer.getTo());
response.setTo(streamInitiationOffer.getFrom()); response.setType(IQ.Type.RESULT);
response.setFrom(streamInitiationOffer.getTo()); response.setPacketID(streamInitiationOffer.getPacketID());
response.setType(IQ.Type.RESULT);
response.setPacketID(streamInitiationOffer.getPacketID());
DataForm form = new DataForm(Form.TYPE_SUBMIT); DataForm form = new DataForm(Form.TYPE_SUBMIT);
FormField field = new FormField( FormField field = new FormField(
FileTransferNegotiator.STREAM_DATA_FIELD_NAME); FileTransferNegotiator.STREAM_DATA_FIELD_NAME);
field.addValue(namespace); for (int i = 0; i < namespaces.length; i++) {
form.addField(field); field.addValue(namespaces[i]);
}
form.addField(field);
response.setFeatureNegotiationForm(form); response.setFeatureNegotiationForm(form);
return response; return response;
} }
public IQ createError(String from, String to, String packetID, XMPPError xmppError) {
IQ iq = FileTransferNegotiator.createIQ(packetID, to, from, IQ.Type.ERROR);
iq.setError(xmppError);
return iq;
}
Packet initiateIncomingStream(XMPPConnection connection, StreamInitiation initiation) throws XMPPException {
StreamInitiation response = createInitiationAccept(initiation,
getNamespaces());
// establish collector to await response
PacketCollector collector = connection
.createPacketCollector(getInitiationPacketFilter(initiation.getFrom(), initiation.getSessionID()));
connection.sendPacket(response);
Packet streamMethodInitiation = collector
.nextResult(SmackConfiguration.getPacketReplyTimeout());
collector.cancel();
if (streamMethodInitiation == null) {
throw new XMPPException("No response from file transfer initiator");
}
return streamMethodInitiation;
}
/** /**
* This method handles the file stream download negotiation process. The * Returns the packet filter that will return the initiation packet for the appropriate stream
* appropriate stream negotiator's initiate incoming stream is called after * initiation.
* an appropriate file transfer method is selected. The manager will respond *
* to the initatior with the selected means of transfer, then it will handle * @param from The initiatior of the file transfer.
* any negotation specific to the particular transfer method. This method * @param streamID The stream ID related to the transfer.
* returns the InputStream, ready to transfer the file. * @return The <b><i>PacketFilter</b></i> that will return the packet relatable to the stream
* * initiation.
* @param initiation */
* The initation that triggered this download. public abstract PacketFilter getInitiationPacketFilter(String from, String streamID);
* @return After the negotation process is complete, the InputStream to
* write a file to is returned.
* @throws XMPPException
* If an error occurs during this process an XMPPException is
* thrown.
*/
public abstract InputStream initiateIncomingStream(
StreamInitiation initiation) throws XMPPException;
/**
* This method handles the file upload stream negotiation process. The
* particular stream negotiator is determined during the file transfer
* negotiation process. This method returns the OutputStream to transmit the
* file to the remote user.
*
* @param streamID
* The streamID that uniquely identifies the file transfer.
* @param initiator
* The fully-qualified JID of the initiator of the file transfer.
* @param target
* The fully-qualified JID of the target or reciever of the file
* transfer.
* @return The negotiated stream ready for data.
* @throws XMPPException
* If an error occurs during the negotiation process an
* exception will be thrown.
*/
public abstract OutputStream initiateOutgoingStream(String streamID,
String initiator, String target) throws XMPPException;
/** abstract InputStream negotiateIncomingStream(Packet streamInitiation) throws XMPPException;
* Returns the XMPP namespace reserved for this particular type of file
* transfer. /**
* * This method handles the file stream download negotiation process. The
* @return Returns the XMPP namespace reserved for this particular type of * appropriate stream negotiator's initiate incoming stream is called after
* file transfer. * an appropriate file transfer method is selected. The manager will respond
*/ * to the initatior with the selected means of transfer, then it will handle
public abstract String getNamespace(); * any negotation specific to the particular transfer method. This method
* returns the InputStream, ready to transfer the file.
*
* @param initiation The initation that triggered this download.
* @return After the negotation process is complete, the InputStream to
* write a file to is returned.
* @throws XMPPException If an error occurs during this process an XMPPException is
* thrown.
*/
public abstract InputStream createIncomingStream(StreamInitiation initiation) throws XMPPException;
/**
* This method handles the file upload stream negotiation process. The
* particular stream negotiator is determined during the file transfer
* negotiation process. This method returns the OutputStream to transmit the
* file to the remote user.
*
* @param streamID The streamID that uniquely identifies the file transfer.
* @param initiator The fully-qualified JID of the initiator of the file transfer.
* @param target The fully-qualified JID of the target or reciever of the file
* transfer.
* @return The negotiated stream ready for data.
* @throws XMPPException If an error occurs during the negotiation process an
* exception will be thrown.
*/
public abstract OutputStream createOutgoingStream(String streamID,
String initiator, String target) throws XMPPException;
/**
* Returns the XMPP namespace reserved for this particular type of file
* transfer.
*
* @return Returns the XMPP namespace reserved for this particular type of
* file transfer.
*/
public abstract String[] getNamespaces();
/**
* Cleanup any and all resources associated with this negotiator.
*/
public abstract void cleanup();
} }