mirror of
https://github.com/vanitasvitae/Smack.git
synced 2024-11-01 01:35:59 +01:00
796 lines
28 KiB
Java
796 lines
28 KiB
Java
|
/**
|
||
|
* All rights reserved. Licensed under the Apache License, Version 2.0 (the "License");
|
||
|
* you may not use this file except in compliance with the License.
|
||
|
* You may obtain a copy of the License at
|
||
|
*
|
||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||
|
*
|
||
|
* Unless required by applicable law or agreed to in writing, software
|
||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||
|
* See the License for the specific language governing permissions and
|
||
|
* limitations under the License.
|
||
|
*/
|
||
|
package org.jivesoftware.smackx.ibb;
|
||
|
|
||
|
import java.io.IOException;
|
||
|
import java.io.InputStream;
|
||
|
import java.io.OutputStream;
|
||
|
import java.net.SocketTimeoutException;
|
||
|
import java.util.concurrent.BlockingQueue;
|
||
|
import java.util.concurrent.LinkedBlockingQueue;
|
||
|
import java.util.concurrent.TimeUnit;
|
||
|
|
||
|
import org.jivesoftware.smack.Connection;
|
||
|
import org.jivesoftware.smack.PacketListener;
|
||
|
import org.jivesoftware.smack.XMPPException;
|
||
|
import org.jivesoftware.smack.filter.AndFilter;
|
||
|
import org.jivesoftware.smack.filter.PacketFilter;
|
||
|
import org.jivesoftware.smack.filter.PacketTypeFilter;
|
||
|
import org.jivesoftware.smack.packet.IQ;
|
||
|
import org.jivesoftware.smack.packet.Message;
|
||
|
import org.jivesoftware.smack.packet.Packet;
|
||
|
import org.jivesoftware.smack.packet.PacketExtension;
|
||
|
import org.jivesoftware.smack.packet.XMPPError;
|
||
|
import org.jivesoftware.smack.util.StringUtils;
|
||
|
import org.jivesoftware.smackx.bytestreams.BytestreamSession;
|
||
|
import org.jivesoftware.smackx.ibb.packet.Close;
|
||
|
import org.jivesoftware.smackx.ibb.packet.Data;
|
||
|
import org.jivesoftware.smackx.ibb.packet.DataPacketExtension;
|
||
|
import org.jivesoftware.smackx.ibb.packet.Open;
|
||
|
import org.jivesoftware.smackx.packet.SyncPacketSend;
|
||
|
|
||
|
/**
|
||
|
* InBandBytestreamSession class represents an In-Band Bytestream session.
|
||
|
* <p>
|
||
|
* In-band bytestreams are bidirectional and this session encapsulates the streams for both
|
||
|
* directions.
|
||
|
* <p>
|
||
|
* Note that closing the In-Band Bytestream session will close both streams. If both streams are
|
||
|
* closed individually the session will be closed automatically once the second stream is closed.
|
||
|
* Use the {@link #setCloseBothStreamsEnabled(boolean)} method if both streams should be closed
|
||
|
* automatically if one of them is closed.
|
||
|
*
|
||
|
* @author Henning Staib
|
||
|
*/
|
||
|
public class InBandBytestreamSession implements BytestreamSession {
|
||
|
|
||
|
/* XMPP connection */
|
||
|
private final Connection connection;
|
||
|
|
||
|
/* the In-Band Bytestream open request for this session */
|
||
|
private final Open byteStreamRequest;
|
||
|
|
||
|
/*
|
||
|
* the input stream for this session (either IQIBBInputStream or MessageIBBInputStream)
|
||
|
*/
|
||
|
private IBBInputStream inputStream;
|
||
|
|
||
|
/*
|
||
|
* the output stream for this session (either IQIBBOutputStream or MessageIBBOutputStream)
|
||
|
*/
|
||
|
private IBBOutputStream outputStream;
|
||
|
|
||
|
/* JID of the remote peer */
|
||
|
private String remoteJID;
|
||
|
|
||
|
/* flag to close both streams if one of them is closed */
|
||
|
private boolean closeBothStreamsEnabled = false;
|
||
|
|
||
|
/* flag to indicate if session is closed */
|
||
|
private boolean isClosed = false;
|
||
|
|
||
|
/**
|
||
|
* Constructor.
|
||
|
*
|
||
|
* @param connection the XMPP connection
|
||
|
* @param byteStreamRequest the In-Band Bytestream open request for this session
|
||
|
* @param remoteJID JID of the remote peer
|
||
|
*/
|
||
|
protected InBandBytestreamSession(Connection connection, Open byteStreamRequest,
|
||
|
String remoteJID) {
|
||
|
this.connection = connection;
|
||
|
this.byteStreamRequest = byteStreamRequest;
|
||
|
this.remoteJID = remoteJID;
|
||
|
|
||
|
// initialize streams dependent to the uses stanza type
|
||
|
switch (byteStreamRequest.getStanza()) {
|
||
|
case IQ:
|
||
|
this.inputStream = new IQIBBInputStream();
|
||
|
this.outputStream = new IQIBBOutputStream();
|
||
|
break;
|
||
|
case MESSAGE:
|
||
|
this.inputStream = new MessageIBBInputStream();
|
||
|
this.outputStream = new MessageIBBOutputStream();
|
||
|
break;
|
||
|
}
|
||
|
|
||
|
}
|
||
|
|
||
|
public InputStream getInputStream() {
|
||
|
return this.inputStream;
|
||
|
}
|
||
|
|
||
|
public OutputStream getOutputStream() {
|
||
|
return this.outputStream;
|
||
|
}
|
||
|
|
||
|
public int getReadTimeout() {
|
||
|
return this.inputStream.readTimeout;
|
||
|
}
|
||
|
|
||
|
public void setReadTimeout(int timeout) {
|
||
|
if (timeout < 0) {
|
||
|
throw new IllegalArgumentException("Timeout must be >= 0");
|
||
|
}
|
||
|
this.inputStream.readTimeout = timeout;
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* Returns whether both streams should be closed automatically if one of the streams is closed.
|
||
|
* Default is <code>false</code>.
|
||
|
*
|
||
|
* @return <code>true</code> if both streams will be closed if one of the streams is closed,
|
||
|
* <code>false</code> if both streams can be closed independently.
|
||
|
*/
|
||
|
public boolean isCloseBothStreamsEnabled() {
|
||
|
return closeBothStreamsEnabled;
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* Sets whether both streams should be closed automatically if one of the streams is closed.
|
||
|
* Default is <code>false</code>.
|
||
|
*
|
||
|
* @param closeBothStreamsEnabled <code>true</code> if both streams should be closed if one of
|
||
|
* the streams is closed, <code>false</code> if both streams should be closed
|
||
|
* independently
|
||
|
*/
|
||
|
public void setCloseBothStreamsEnabled(boolean closeBothStreamsEnabled) {
|
||
|
this.closeBothStreamsEnabled = closeBothStreamsEnabled;
|
||
|
}
|
||
|
|
||
|
public void close() throws IOException {
|
||
|
closeByLocal(true); // close input stream
|
||
|
closeByLocal(false); // close output stream
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* This method is invoked if a request to close the In-Band Bytestream has been received.
|
||
|
*
|
||
|
* @param closeRequest the close request from the remote peer
|
||
|
*/
|
||
|
protected void closeByPeer(Close closeRequest) {
|
||
|
|
||
|
/*
|
||
|
* close streams without flushing them, because stream is already considered closed on the
|
||
|
* remote peers side
|
||
|
*/
|
||
|
this.inputStream.closeInternal();
|
||
|
this.inputStream.cleanup();
|
||
|
this.outputStream.closeInternal(false);
|
||
|
|
||
|
// acknowledge close request
|
||
|
IQ confirmClose = IQ.createResultIQ(closeRequest);
|
||
|
this.connection.sendPacket(confirmClose);
|
||
|
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* This method is invoked if one of the streams has been closed locally, if an error occurred
|
||
|
* locally or if the whole session should be closed.
|
||
|
*
|
||
|
* @throws IOException if an error occurs while sending the close request
|
||
|
*/
|
||
|
protected synchronized void closeByLocal(boolean in) throws IOException {
|
||
|
if (this.isClosed) {
|
||
|
return;
|
||
|
}
|
||
|
|
||
|
if (this.closeBothStreamsEnabled) {
|
||
|
this.inputStream.closeInternal();
|
||
|
this.outputStream.closeInternal(true);
|
||
|
}
|
||
|
else {
|
||
|
if (in) {
|
||
|
this.inputStream.closeInternal();
|
||
|
}
|
||
|
else {
|
||
|
// close stream but try to send any data left
|
||
|
this.outputStream.closeInternal(true);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
if (this.inputStream.isClosed && this.outputStream.isClosed) {
|
||
|
this.isClosed = true;
|
||
|
|
||
|
// send close request
|
||
|
Close close = new Close(this.byteStreamRequest.getSessionID());
|
||
|
close.setTo(this.remoteJID);
|
||
|
try {
|
||
|
SyncPacketSend.getReply(this.connection, close);
|
||
|
}
|
||
|
catch (XMPPException e) {
|
||
|
throw new IOException("Error while closing stream: " + e.getMessage());
|
||
|
}
|
||
|
|
||
|
this.inputStream.cleanup();
|
||
|
|
||
|
// remove session from manager
|
||
|
InBandBytestreamManager.getByteStreamManager(this.connection).getSessions().remove(this);
|
||
|
}
|
||
|
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* IBBInputStream class is the base implementation of an In-Band Bytestream input stream.
|
||
|
* Subclasses of this input stream must provide a packet listener along with a packet filter to
|
||
|
* collect the In-Band Bytestream data packets.
|
||
|
*/
|
||
|
private abstract class IBBInputStream extends InputStream {
|
||
|
|
||
|
/* the data packet listener to fill the data queue */
|
||
|
private final PacketListener dataPacketListener;
|
||
|
|
||
|
/* queue containing received In-Band Bytestream data packets */
|
||
|
protected final BlockingQueue<DataPacketExtension> dataQueue = new LinkedBlockingQueue<DataPacketExtension>();
|
||
|
|
||
|
/* buffer containing the data from one data packet */
|
||
|
private byte[] buffer;
|
||
|
|
||
|
/* pointer to the next byte to read from buffer */
|
||
|
private int bufferPointer = -1;
|
||
|
|
||
|
/* data packet sequence (range from 0 to 65535) */
|
||
|
private long seq = -1;
|
||
|
|
||
|
/* flag to indicate if input stream is closed */
|
||
|
private boolean isClosed = false;
|
||
|
|
||
|
/* flag to indicate if close method was invoked */
|
||
|
private boolean closeInvoked = false;
|
||
|
|
||
|
/* timeout for read operations */
|
||
|
private int readTimeout = 0;
|
||
|
|
||
|
/**
|
||
|
* Constructor.
|
||
|
*/
|
||
|
public IBBInputStream() {
|
||
|
// add data packet listener to connection
|
||
|
this.dataPacketListener = getDataPacketListener();
|
||
|
connection.addPacketListener(this.dataPacketListener, getDataPacketFilter());
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* Returns the packet listener that processes In-Band Bytestream data packets.
|
||
|
*
|
||
|
* @return the data packet listener
|
||
|
*/
|
||
|
protected abstract PacketListener getDataPacketListener();
|
||
|
|
||
|
/**
|
||
|
* Returns the packet filter that accepts In-Band Bytestream data packets.
|
||
|
*
|
||
|
* @return the data packet filter
|
||
|
*/
|
||
|
protected abstract PacketFilter getDataPacketFilter();
|
||
|
|
||
|
public synchronized int read() throws IOException {
|
||
|
checkClosed();
|
||
|
|
||
|
// if nothing read yet or whole buffer has been read fill buffer
|
||
|
if (bufferPointer == -1 || bufferPointer >= buffer.length) {
|
||
|
// if no data available and stream was closed return -1
|
||
|
if (!loadBuffer()) {
|
||
|
return -1;
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// return byte and increment buffer pointer
|
||
|
return (int) buffer[bufferPointer++];
|
||
|
}
|
||
|
|
||
|
public synchronized int read(byte[] b, int off, int len) throws IOException {
|
||
|
if (b == null) {
|
||
|
throw new NullPointerException();
|
||
|
}
|
||
|
else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length)
|
||
|
|| ((off + len) < 0)) {
|
||
|
throw new IndexOutOfBoundsException();
|
||
|
}
|
||
|
else if (len == 0) {
|
||
|
return 0;
|
||
|
}
|
||
|
|
||
|
checkClosed();
|
||
|
|
||
|
// if nothing read yet or whole buffer has been read fill buffer
|
||
|
if (bufferPointer == -1 || bufferPointer >= buffer.length) {
|
||
|
// if no data available and stream was closed return -1
|
||
|
if (!loadBuffer()) {
|
||
|
return -1;
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// if more bytes wanted than available return all available
|
||
|
int bytesAvailable = buffer.length - bufferPointer;
|
||
|
if (len > bytesAvailable) {
|
||
|
len = bytesAvailable;
|
||
|
}
|
||
|
|
||
|
System.arraycopy(buffer, bufferPointer, b, off, len);
|
||
|
bufferPointer += len;
|
||
|
return len;
|
||
|
}
|
||
|
|
||
|
public synchronized int read(byte[] b) throws IOException {
|
||
|
return read(b, 0, b.length);
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* This method blocks until a data packet is received, the stream is closed or the current
|
||
|
* thread is interrupted.
|
||
|
*
|
||
|
* @return <code>true</code> if data was received, otherwise <code>false</code>
|
||
|
* @throws IOException if data packets are out of sequence
|
||
|
*/
|
||
|
private synchronized boolean loadBuffer() throws IOException {
|
||
|
|
||
|
// wait until data is available or stream is closed
|
||
|
DataPacketExtension data = null;
|
||
|
try {
|
||
|
if (this.readTimeout == 0) {
|
||
|
while (data == null) {
|
||
|
if (isClosed && this.dataQueue.isEmpty()) {
|
||
|
return false;
|
||
|
}
|
||
|
data = this.dataQueue.poll(1000, TimeUnit.MILLISECONDS);
|
||
|
}
|
||
|
}
|
||
|
else {
|
||
|
data = this.dataQueue.poll(this.readTimeout, TimeUnit.MILLISECONDS);
|
||
|
if (data == null) {
|
||
|
throw new SocketTimeoutException();
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
catch (InterruptedException e) {
|
||
|
// Restore the interrupted status
|
||
|
Thread.currentThread().interrupt();
|
||
|
return false;
|
||
|
}
|
||
|
|
||
|
// handle sequence overflow
|
||
|
if (this.seq == 65535) {
|
||
|
this.seq = -1;
|
||
|
}
|
||
|
|
||
|
// check if data packets sequence is successor of last seen sequence
|
||
|
long seq = data.getSeq();
|
||
|
if (seq - 1 != this.seq) {
|
||
|
// packets out of order; close stream/session
|
||
|
InBandBytestreamSession.this.close();
|
||
|
throw new IOException("Packets out of sequence");
|
||
|
}
|
||
|
else {
|
||
|
this.seq = seq;
|
||
|
}
|
||
|
|
||
|
// set buffer to decoded data
|
||
|
buffer = data.getDecodedData();
|
||
|
bufferPointer = 0;
|
||
|
return true;
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* Checks if this stream is closed and throws an IOException if necessary
|
||
|
*
|
||
|
* @throws IOException if stream is closed and no data should be read anymore
|
||
|
*/
|
||
|
private void checkClosed() throws IOException {
|
||
|
/* throw no exception if there is data available, but not if close method was invoked */
|
||
|
if ((isClosed && this.dataQueue.isEmpty()) || closeInvoked) {
|
||
|
// clear data queue in case additional data was received after stream was closed
|
||
|
this.dataQueue.clear();
|
||
|
throw new IOException("Stream is closed");
|
||
|
}
|
||
|
}
|
||
|
|
||
|
public boolean markSupported() {
|
||
|
return false;
|
||
|
}
|
||
|
|
||
|
public void close() throws IOException {
|
||
|
if (isClosed) {
|
||
|
return;
|
||
|
}
|
||
|
|
||
|
this.closeInvoked = true;
|
||
|
|
||
|
InBandBytestreamSession.this.closeByLocal(true);
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* This method sets the close flag and removes the data packet listener.
|
||
|
*/
|
||
|
private void closeInternal() {
|
||
|
if (isClosed) {
|
||
|
return;
|
||
|
}
|
||
|
isClosed = true;
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* Invoked if the session is closed.
|
||
|
*/
|
||
|
private void cleanup() {
|
||
|
connection.removePacketListener(this.dataPacketListener);
|
||
|
}
|
||
|
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* IQIBBInputStream class implements IBBInputStream to be used with IQ stanzas encapsulating the
|
||
|
* data packets.
|
||
|
*/
|
||
|
private class IQIBBInputStream extends IBBInputStream {
|
||
|
|
||
|
protected PacketListener getDataPacketListener() {
|
||
|
return new PacketListener() {
|
||
|
|
||
|
private long lastSequence = -1;
|
||
|
|
||
|
public void processPacket(Packet packet) {
|
||
|
// get data packet extension
|
||
|
DataPacketExtension data = (DataPacketExtension) packet.getExtension(
|
||
|
DataPacketExtension.ELEMENT_NAME,
|
||
|
InBandBytestreamManager.NAMESPACE);
|
||
|
|
||
|
/*
|
||
|
* check if sequence was not used already (see XEP-0047 Section 2.2)
|
||
|
*/
|
||
|
if (data.getSeq() <= this.lastSequence) {
|
||
|
IQ unexpectedRequest = IQ.createErrorResponse((IQ) packet, new XMPPError(
|
||
|
XMPPError.Condition.unexpected_request));
|
||
|
connection.sendPacket(unexpectedRequest);
|
||
|
return;
|
||
|
|
||
|
}
|
||
|
|
||
|
// check if encoded data is valid (see XEP-0047 Section 2.2)
|
||
|
if (data.getDecodedData() == null) {
|
||
|
// data is invalid; respond with bad-request error
|
||
|
IQ badRequest = IQ.createErrorResponse((IQ) packet, new XMPPError(
|
||
|
XMPPError.Condition.bad_request));
|
||
|
connection.sendPacket(badRequest);
|
||
|
return;
|
||
|
}
|
||
|
|
||
|
// data is valid; add to data queue
|
||
|
dataQueue.offer(data);
|
||
|
|
||
|
// confirm IQ
|
||
|
IQ confirmData = IQ.createResultIQ((IQ) packet);
|
||
|
connection.sendPacket(confirmData);
|
||
|
|
||
|
// set last seen sequence
|
||
|
this.lastSequence = data.getSeq();
|
||
|
if (this.lastSequence == 65535) {
|
||
|
this.lastSequence = -1;
|
||
|
}
|
||
|
|
||
|
}
|
||
|
|
||
|
};
|
||
|
}
|
||
|
|
||
|
protected PacketFilter getDataPacketFilter() {
|
||
|
/*
|
||
|
* filter all IQ stanzas having type 'SET' (represented by Data class), containing a
|
||
|
* data packet extension, matching session ID and recipient
|
||
|
*/
|
||
|
return new AndFilter(new PacketTypeFilter(Data.class), new IBBDataPacketFilter());
|
||
|
}
|
||
|
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* MessageIBBInputStream class implements IBBInputStream to be used with message stanzas
|
||
|
* encapsulating the data packets.
|
||
|
*/
|
||
|
private class MessageIBBInputStream extends IBBInputStream {
|
||
|
|
||
|
protected PacketListener getDataPacketListener() {
|
||
|
return new PacketListener() {
|
||
|
|
||
|
public void processPacket(Packet packet) {
|
||
|
// get data packet extension
|
||
|
DataPacketExtension data = (DataPacketExtension) packet.getExtension(
|
||
|
DataPacketExtension.ELEMENT_NAME,
|
||
|
InBandBytestreamManager.NAMESPACE);
|
||
|
|
||
|
// check if encoded data is valid
|
||
|
if (data.getDecodedData() == null) {
|
||
|
/*
|
||
|
* TODO once a majority of XMPP server implementation support XEP-0079
|
||
|
* Advanced Message Processing the invalid message could be answered with an
|
||
|
* appropriate error. For now we just ignore the packet. Subsequent packets
|
||
|
* with an increased sequence will cause the input stream to close the
|
||
|
* stream/session.
|
||
|
*/
|
||
|
return;
|
||
|
}
|
||
|
|
||
|
// data is valid; add to data queue
|
||
|
dataQueue.offer(data);
|
||
|
|
||
|
// TODO confirm packet once XMPP servers support XEP-0079
|
||
|
}
|
||
|
|
||
|
};
|
||
|
}
|
||
|
|
||
|
@Override
|
||
|
protected PacketFilter getDataPacketFilter() {
|
||
|
/*
|
||
|
* filter all message stanzas containing a data packet extension, matching session ID
|
||
|
* and recipient
|
||
|
*/
|
||
|
return new AndFilter(new PacketTypeFilter(Message.class), new IBBDataPacketFilter());
|
||
|
}
|
||
|
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* IBBDataPacketFilter class filters all packets from the remote peer of this session,
|
||
|
* containing an In-Band Bytestream data packet extension whose session ID matches this sessions
|
||
|
* ID.
|
||
|
*/
|
||
|
private class IBBDataPacketFilter implements PacketFilter {
|
||
|
|
||
|
public boolean accept(Packet packet) {
|
||
|
// sender equals remote peer
|
||
|
if (!packet.getFrom().equalsIgnoreCase(remoteJID)) {
|
||
|
return false;
|
||
|
}
|
||
|
|
||
|
// stanza contains data packet extension
|
||
|
PacketExtension packetExtension = packet.getExtension(DataPacketExtension.ELEMENT_NAME,
|
||
|
InBandBytestreamManager.NAMESPACE);
|
||
|
if (packetExtension == null || !(packetExtension instanceof DataPacketExtension)) {
|
||
|
return false;
|
||
|
}
|
||
|
|
||
|
// session ID equals this session ID
|
||
|
DataPacketExtension data = (DataPacketExtension) packetExtension;
|
||
|
if (!data.getSessionID().equals(byteStreamRequest.getSessionID())) {
|
||
|
return false;
|
||
|
}
|
||
|
|
||
|
return true;
|
||
|
}
|
||
|
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* IBBOutputStream class is the base implementation of an In-Band Bytestream output stream.
|
||
|
* Subclasses of this output stream must provide a method to send data over XMPP stream.
|
||
|
*/
|
||
|
private abstract class IBBOutputStream extends OutputStream {
|
||
|
|
||
|
/* buffer with the size of this sessions block size */
|
||
|
protected final byte[] buffer;
|
||
|
|
||
|
/* pointer to next byte to write to buffer */
|
||
|
protected int bufferPointer = 0;
|
||
|
|
||
|
/* data packet sequence (range from 0 to 65535) */
|
||
|
protected long seq = 0;
|
||
|
|
||
|
/* flag to indicate if output stream is closed */
|
||
|
protected boolean isClosed = false;
|
||
|
|
||
|
/**
|
||
|
* Constructor.
|
||
|
*/
|
||
|
public IBBOutputStream() {
|
||
|
this.buffer = new byte[byteStreamRequest.getBlockSize()];
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* Writes the given data packet to the XMPP stream.
|
||
|
*
|
||
|
* @param data the data packet
|
||
|
* @throws IOException if an I/O error occurred while sending or if the stream is closed
|
||
|
*/
|
||
|
protected abstract void writeToXML(DataPacketExtension data) throws IOException;
|
||
|
|
||
|
public synchronized void write(int b) throws IOException {
|
||
|
if (this.isClosed) {
|
||
|
throw new IOException("Stream is closed");
|
||
|
}
|
||
|
|
||
|
// if buffer is full flush buffer
|
||
|
if (bufferPointer >= buffer.length) {
|
||
|
flushBuffer();
|
||
|
}
|
||
|
|
||
|
buffer[bufferPointer++] = (byte) b;
|
||
|
}
|
||
|
|
||
|
public synchronized void write(byte b[], int off, int len) throws IOException {
|
||
|
if (b == null) {
|
||
|
throw new NullPointerException();
|
||
|
}
|
||
|
else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length)
|
||
|
|| ((off + len) < 0)) {
|
||
|
throw new IndexOutOfBoundsException();
|
||
|
}
|
||
|
else if (len == 0) {
|
||
|
return;
|
||
|
}
|
||
|
|
||
|
if (this.isClosed) {
|
||
|
throw new IOException("Stream is closed");
|
||
|
}
|
||
|
|
||
|
// is data to send greater than buffer size
|
||
|
if (len >= buffer.length) {
|
||
|
|
||
|
// "byte" off the first chunk to write out
|
||
|
writeOut(b, off, buffer.length);
|
||
|
|
||
|
// recursively call this method with the lesser amount
|
||
|
write(b, off + buffer.length, len - buffer.length);
|
||
|
}
|
||
|
else {
|
||
|
writeOut(b, off, len);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
public synchronized void write(byte[] b) throws IOException {
|
||
|
write(b, 0, b.length);
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* Fills the buffer with the given data and sends it over the XMPP stream if the buffers
|
||
|
* capacity has been reached. This method is only called from this class so it is assured
|
||
|
* that the amount of data to send is <= buffer capacity
|
||
|
*
|
||
|
* @param b the data
|
||
|
* @param off the data
|
||
|
* @param len the number of bytes to write
|
||
|
* @throws IOException if an I/O error occurred while sending or if the stream is closed
|
||
|
*/
|
||
|
private synchronized void writeOut(byte b[], int off, int len) throws IOException {
|
||
|
if (this.isClosed) {
|
||
|
throw new IOException("Stream is closed");
|
||
|
}
|
||
|
|
||
|
// set to 0 in case the next 'if' block is not executed
|
||
|
int available = 0;
|
||
|
|
||
|
// is data to send greater that buffer space left
|
||
|
if (len > buffer.length - bufferPointer) {
|
||
|
// fill buffer to capacity and send it
|
||
|
available = buffer.length - bufferPointer;
|
||
|
System.arraycopy(b, off, buffer, bufferPointer, available);
|
||
|
bufferPointer += available;
|
||
|
flushBuffer();
|
||
|
}
|
||
|
|
||
|
// copy the data left to buffer
|
||
|
System.arraycopy(b, off + available, buffer, bufferPointer, len - available);
|
||
|
bufferPointer += len - available;
|
||
|
}
|
||
|
|
||
|
public synchronized void flush() throws IOException {
|
||
|
if (this.isClosed) {
|
||
|
throw new IOException("Stream is closed");
|
||
|
}
|
||
|
flushBuffer();
|
||
|
}
|
||
|
|
||
|
private synchronized void flushBuffer() throws IOException {
|
||
|
|
||
|
// do nothing if no data to send available
|
||
|
if (bufferPointer == 0) {
|
||
|
return;
|
||
|
}
|
||
|
|
||
|
// create data packet
|
||
|
String enc = StringUtils.encodeBase64(buffer, 0, bufferPointer, false);
|
||
|
DataPacketExtension data = new DataPacketExtension(byteStreamRequest.getSessionID(),
|
||
|
this.seq, enc);
|
||
|
|
||
|
// write to XMPP stream
|
||
|
writeToXML(data);
|
||
|
|
||
|
// reset buffer pointer
|
||
|
bufferPointer = 0;
|
||
|
|
||
|
// increment sequence, considering sequence overflow
|
||
|
this.seq = (this.seq + 1 == 65535 ? 0 : this.seq + 1);
|
||
|
|
||
|
}
|
||
|
|
||
|
public void close() throws IOException {
|
||
|
if (isClosed) {
|
||
|
return;
|
||
|
}
|
||
|
InBandBytestreamSession.this.closeByLocal(false);
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* Sets the close flag and optionally flushes the stream.
|
||
|
*
|
||
|
* @param flush if <code>true</code> flushes the stream
|
||
|
*/
|
||
|
protected void closeInternal(boolean flush) {
|
||
|
if (this.isClosed) {
|
||
|
return;
|
||
|
}
|
||
|
this.isClosed = true;
|
||
|
|
||
|
try {
|
||
|
if (flush) {
|
||
|
flushBuffer();
|
||
|
}
|
||
|
}
|
||
|
catch (IOException e) {
|
||
|
/*
|
||
|
* ignore, because writeToXML() will not throw an exception if stream is already
|
||
|
* closed
|
||
|
*/
|
||
|
}
|
||
|
}
|
||
|
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* IQIBBOutputStream class implements IBBOutputStream to be used with IQ stanzas encapsulating
|
||
|
* the data packets.
|
||
|
*/
|
||
|
private class IQIBBOutputStream extends IBBOutputStream {
|
||
|
|
||
|
@Override
|
||
|
protected synchronized void writeToXML(DataPacketExtension data) throws IOException {
|
||
|
// create IQ stanza containing data packet
|
||
|
IQ iq = new Data(data);
|
||
|
iq.setTo(remoteJID);
|
||
|
|
||
|
try {
|
||
|
SyncPacketSend.getReply(connection, iq);
|
||
|
}
|
||
|
catch (XMPPException e) {
|
||
|
// close session unless it is already closed
|
||
|
if (!this.isClosed) {
|
||
|
InBandBytestreamSession.this.close();
|
||
|
throw new IOException("Error while sending Data: " + e.getMessage());
|
||
|
}
|
||
|
}
|
||
|
|
||
|
}
|
||
|
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* MessageIBBOutputStream class implements IBBOutputStream to be used with message stanzas
|
||
|
* encapsulating the data packets.
|
||
|
*/
|
||
|
private class MessageIBBOutputStream extends IBBOutputStream {
|
||
|
|
||
|
@Override
|
||
|
protected synchronized void writeToXML(DataPacketExtension data) {
|
||
|
// create message stanza containing data packet
|
||
|
Message message = new Message(remoteJID);
|
||
|
message.addExtension(data);
|
||
|
|
||
|
connection.sendPacket(message);
|
||
|
|
||
|
}
|
||
|
|
||
|
}
|
||
|
|
||
|
}
|