1
0
Fork 0
mirror of https://github.com/vanitasvitae/Smack.git synced 2024-06-26 13:24:49 +02:00
Smack/tcp/src/main/java/org/jivesoftware/smack/PacketWriter.java
Florian Schmaus 978f692eb0 Use XmlStringBuilder in most toXML() bodies
Also change StringUtils.escapeForXML() and Packet.toXML() to return
CharSequence instead of String. XmlStringBuilder now has 'optX' methods.

Remove XmlUtils in favor of XmlStringBuilder
2014-03-22 16:59:15 +01:00

222 lines
7 KiB
Java

/**
*
* Copyright 2003-2007 Jive Software.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jivesoftware.smack;
import org.jivesoftware.smack.packet.Packet;
import java.io.IOException;
import java.io.Writer;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* Writes packets to a XMPP server. Packets are sent using a dedicated thread. Packet
* interceptors can be registered to dynamically modify packets before they're actually
* sent. Packet listeners can be registered to listen for all outgoing packets.
*
* @see XMPPConnection#addPacketInterceptor
* @see XMPPConnection#addPacketSendingListener
*
* @author Matt Tucker
*/
class PacketWriter {
private static final Logger LOGGER = Logger.getLogger(PacketWriter.class.getName());
private Thread writerThread;
private Writer writer;
private TCPConnection connection;
private final BlockingQueue<Packet> queue;
volatile boolean done;
/**
* Creates a new packet writer with the specified connection.
*
* @param connection the connection.
*/
protected PacketWriter(TCPConnection connection) {
this.queue = new ArrayBlockingQueue<Packet>(500, true);
this.connection = connection;
init();
}
/**
* Initializes the writer in order to be used. It is called at the first connection and also
* is invoked if the connection is disconnected by an error.
*/
protected void init() {
this.writer = connection.writer;
done = false;
writerThread = new Thread() {
public void run() {
writePackets(this);
}
};
writerThread.setName("Smack Packet Writer (" + connection.connectionCounterValue + ")");
writerThread.setDaemon(true);
}
/**
* Sends the specified packet to the server.
*
* @param packet the packet to send.
*/
public void sendPacket(Packet packet) {
if (!done) {
try {
queue.put(packet);
}
catch (InterruptedException ie) {
LOGGER.log(Level.SEVERE, "Failed to queue packet to send to server: " + packet.toString(), ie);
return;
}
synchronized (queue) {
queue.notifyAll();
}
}
}
/**
* Starts the packet writer thread and opens a connection to the server. The
* packet writer will continue writing packets until {@link #shutdown} or an
* error occurs.
*/
public void startup() {
writerThread.start();
}
void setWriter(Writer writer) {
this.writer = writer;
}
/**
* Shuts down the packet writer. Once this method has been called, no further
* packets will be written to the server.
*/
public void shutdown() {
done = true;
synchronized (queue) {
queue.notifyAll();
}
}
/**
* Returns the next available packet from the queue for writing.
*
* @return the next packet for writing.
*/
private Packet nextPacket() {
Packet packet = null;
// Wait until there's a packet or we're done.
while (!done && (packet = queue.poll()) == null) {
try {
synchronized (queue) {
queue.wait();
}
}
catch (InterruptedException ie) {
// Do nothing
}
}
return packet;
}
private void writePackets(Thread thisThread) {
try {
// Open the stream.
openStream();
// Write out packets from the queue.
while (!done && (writerThread == thisThread)) {
Packet packet = nextPacket();
if (packet != null) {
writer.write(packet.toXML().toString());
if (queue.isEmpty()) {
writer.flush();
}
}
}
// Flush out the rest of the queue. If the queue is extremely large, it's possible
// we won't have time to entirely flush it before the socket is forced closed
// by the shutdown process.
try {
while (!queue.isEmpty()) {
Packet packet = queue.remove();
writer.write(packet.toXML().toString());
}
writer.flush();
}
catch (Exception e) {
LOGGER.warning("Error flushing queue during shutdown, ignore and continue");
}
// Delete the queue contents (hopefully nothing is left).
queue.clear();
// Close the stream.
try {
writer.write("</stream:stream>");
writer.flush();
}
catch (Exception e) {
// Do nothing
}
finally {
try {
writer.close();
}
catch (Exception e) {
// Do nothing
}
}
}
catch (IOException ioe) {
// The exception can be ignored if the the connection is 'done'
// or if the it was caused because the socket got closed
if (!(done || connection.isSocketClosed())) {
done = true;
// packetReader could be set to null by an concurrent disconnect() call.
// Therefore Prevent NPE exceptions by checking packetReader.
if (connection.packetReader != null) {
connection.notifyConnectionError(ioe);
}
}
}
}
/**
* Sends to the server a new stream element. This operation may be requested several times
* so we need to encapsulate the logic in one place. This message will be sent while doing
* TLS, SASL and resource binding.
*
* @throws IOException If an error occurs while sending the stanza to the server.
*/
void openStream() throws IOException {
StringBuilder stream = new StringBuilder();
stream.append("<stream:stream");
stream.append(" to=\"").append(connection.getServiceName()).append("\"");
stream.append(" xmlns=\"jabber:client\"");
stream.append(" xmlns:stream=\"http://etherx.jabber.org/streams\"");
stream.append(" version=\"1.0\">");
writer.write(stream.toString());
writer.flush();
}
}