diff --git a/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/PacketReader.java b/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/PacketReader.java
deleted file mode 100644
index 039f6ec7b..000000000
--- a/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/PacketReader.java
+++ /dev/null
@@ -1,363 +0,0 @@
-/**
- *
- * Copyright 2003-2007 Jive Software.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.jivesoftware.smack.tcp;
-
-import java.io.IOException;
-
-import org.jivesoftware.smack.packet.Packet;
-import org.jivesoftware.smack.parsing.ParsingExceptionCallback;
-import org.jivesoftware.smack.parsing.UnparsablePacket;
-import org.jivesoftware.smack.sasl.SASLMechanism.Challenge;
-import org.jivesoftware.smack.sasl.SASLMechanism.SASLFailure;
-import org.jivesoftware.smack.sasl.SASLMechanism.Success;
-import org.jivesoftware.smack.util.PacketParserUtils;
-import org.jivesoftware.smack.ConnectionConfiguration;
-import org.jivesoftware.smack.SmackException;
-import org.jivesoftware.smack.SmackException.NoResponseException;
-import org.jivesoftware.smack.SmackException.SecurityRequiredException;
-import org.jivesoftware.smack.XMPPException.StreamErrorException;
-import org.xmlpull.v1.XmlPullParserFactory;
-import org.xmlpull.v1.XmlPullParser;
-import org.xmlpull.v1.XmlPullParserException;
-
-/**
- * Listens for XML traffic from the XMPP server and parses it into packet objects.
- * The packet reader also invokes all packet listeners and collectors.
- *
- * @see XMPPConnection#createPacketCollector
- * @see XMPPConnection#addPacketListener
- * @author Matt Tucker
- */
-class PacketReader {
-
- private Thread readerThread;
-
- private XMPPTCPConnection connection;
- private XmlPullParser parser;
-
- /**
- * Set to true if the last features stanza from the server has been parsed. A XMPP connection
- * handshake can invoke multiple features stanzas, e.g. when TLS is activated a second feature
- * stanza is send by the server. This is set to true once the last feature stanza has been
- * parsed.
- */
- private volatile boolean lastFeaturesParsed;
-
- volatile boolean done;
-
- protected PacketReader(final XMPPTCPConnection connection) throws SmackException {
- this.connection = connection;
- this.init();
- }
-
- /**
- * Initializes the reader in order to be used. The reader is initialized during the
- * first connection and when reconnecting due to an abruptly disconnection.
- *
- * @throws SmackException if the parser could not be reset.
- */
- protected void init() throws SmackException {
- done = false;
- lastFeaturesParsed = false;
-
- readerThread = new Thread() {
- public void run() {
- parsePackets(this);
- }
- };
- readerThread.setName("Smack Packet Reader (" + connection.getConnectionCounter() + ")");
- readerThread.setDaemon(true);
-
- resetParser();
- }
-
- /**
- * Starts the packet reader thread and returns once a connection to the server
- * has been established or if the server's features could not be parsed within
- * the connection's PacketReplyTimeout.
- *
- * @throws NoResponseException if the server fails to send an opening stream back
- * within packetReplyTimeout.
- * @throws IOException
- */
- synchronized public void startup() throws NoResponseException, IOException {
- readerThread.start();
-
- try {
- // Wait until either:
- // - the servers last features stanza has been parsed
- // - an exception is thrown while parsing
- // - the timeout occurs
- wait(connection.getPacketReplyTimeout());
- }
- catch (InterruptedException ie) {
- // Ignore.
- }
- if (!lastFeaturesParsed) {
- connection.throwConnectionExceptionOrNoResponse();
- }
- }
-
- /**
- * Shuts the packet reader down. This method simply sets the 'done' flag to true.
- */
- public void shutdown() {
- done = true;
- }
-
- /**
- * Resets the parser using the latest connection's reader. Reseting the parser is necessary
- * when the plain connection has been secured or when a new opening stream element is going
- * to be sent by the server.
- *
- * @throws SmackException if the parser could not be reset.
- */
- private void resetParser() throws SmackException {
- try {
- parser = XmlPullParserFactory.newInstance().newPullParser();
- parser.setFeature(XmlPullParser.FEATURE_PROCESS_NAMESPACES, true);
- parser.setInput(connection.getReader());
- }
- catch (XmlPullParserException e) {
- throw new SmackException(e);
- }
- }
-
- /**
- * Parse top-level packets in order to process them further.
- *
- * @param thread the thread that is being used by the reader to parse incoming packets.
- */
- private void parsePackets(Thread thread) {
- try {
- int eventType = parser.getEventType();
- do {
- if (eventType == XmlPullParser.START_TAG) {
- int parserDepth = parser.getDepth();
- String name = parser.getName();
- ParsingExceptionCallback callback = connection.getParsingExceptionCallback();
- Packet packet;
- try {
- packet = PacketParserUtils.parseStanza(parser, connection);
- } catch (Exception e) {
- String content = PacketParserUtils.parseContentDepth(parser, parserDepth);
- UnparsablePacket message = new UnparsablePacket(content, e);
- if (callback != null) {
- callback.handleUnparsablePacket(message);
- }
- continue;
- }
- if (packet != null) {
- connection.processPacket(packet);
- }
- // We found an opening stream. Record information about it, then notify
- // the connectionID lock so that the packet reader startup can finish.
- else if (name.equals("stream")) {
- // Ensure the correct jabber:client namespace is being used.
- if ("jabber:client".equals(parser.getNamespace(null))) {
- // Get the connection id.
- for (int i=0; i queue = new ArrayBlockingQueueWithShutdown(QUEUE_SIZE, true);
-
- private Thread writerThread;
- private Writer writer;
-
- volatile boolean done;
-
- AtomicBoolean shutdownDone = new AtomicBoolean(false);
-
- /**
- * Creates a new packet writer with the specified connection.
- *
- * @param connection the connection.
- */
- protected PacketWriter(XMPPTCPConnection connection) {
- this.connection = connection;
- init();
- }
-
- /**
- * Initializes the writer in order to be used. It is called at the first connection and also
- * is invoked if the connection is disconnected by an error.
- */
- protected void init() {
- writer = connection.getWriter();
- done = false;
- shutdownDone.set(false);
-
- queue.start();
- writerThread = new Thread() {
- public void run() {
- writePackets(this);
- }
- };
- writerThread.setName("Smack Packet Writer (" + connection.getConnectionCounter() + ")");
- writerThread.setDaemon(true);
- }
-
- /**
- * Sends the specified packet to the server.
- *
- * @param packet the packet to send.
- * @throws NotConnectedException
- */
- public void sendPacket(Packet packet) throws NotConnectedException {
- if (done) {
- throw new NotConnectedException();
- }
-
- try {
- queue.put(packet);
- }
- catch (InterruptedException ie) {
- throw new NotConnectedException();
- }
- }
-
- /**
- * Starts the packet writer thread and opens a connection to the server. The
- * packet writer will continue writing packets until {@link #shutdown} or an
- * error occurs.
- */
- public void startup() {
- writerThread.start();
- }
-
- void setWriter(Writer writer) {
- this.writer = writer;
- }
-
- /**
- * Shuts down the packet writer. Once this method has been called, no further
- * packets will be written to the server.
- */
- public void shutdown() {
- done = true;
- queue.shutdown();
- synchronized(shutdownDone) {
- if (!shutdownDone.get()) {
- try {
- shutdownDone.wait(connection.getPacketReplyTimeout());
- }
- catch (InterruptedException e) {
- LOGGER.log(Level.WARNING, "shutdown", e);
- }
- }
- }
- }
-
- /**
- * Returns the next available packet from the queue for writing.
- *
- * @return the next packet for writing.
- */
- private Packet nextPacket() {
- if (done) {
- return null;
- }
-
- Packet packet = null;
- try {
- packet = queue.take();
- }
- catch (InterruptedException e) {
- // Do nothing
- }
- return packet;
- }
-
- private void writePackets(Thread thisThread) {
- try {
- // Open the stream.
- openStream();
- // Write out packets from the queue.
- while (!done && (writerThread == thisThread)) {
- Packet packet = nextPacket();
- if (packet != null) {
- writer.write(packet.toXML().toString());
-
- if (queue.isEmpty()) {
- writer.flush();
- }
- }
- }
- // Flush out the rest of the queue. If the queue is extremely large, it's possible
- // we won't have time to entirely flush it before the socket is forced closed
- // by the shutdown process.
- try {
- while (!queue.isEmpty()) {
- Packet packet = queue.remove();
- writer.write(packet.toXML().toString());
- }
- writer.flush();
- }
- catch (Exception e) {
- LOGGER.log(Level.WARNING, "Exception flushing queue during shutdown, ignore and continue", e);
- }
-
- // Delete the queue contents (hopefully nothing is left).
- queue.clear();
-
- // Close the stream.
- try {
- writer.write("");
- writer.flush();
- }
- catch (Exception e) {
- LOGGER.log(Level.WARNING, "Exception writing closing stream element", e);
-
- }
- finally {
- try {
- writer.close();
- }
- catch (Exception e) {
- // Do nothing
- }
- }
-
- shutdownDone.set(true);
- synchronized(shutdownDone) {
- shutdownDone.notify();
- }
- }
- catch (IOException ioe) {
- // The exception can be ignored if the the connection is 'done'
- // or if the it was caused because the socket got closed
- if (!(done || connection.isSocketClosed())) {
- shutdown();
- connection.notifyConnectionError(ioe);
- }
- }
- }
-
- /**
- * Sends to the server a new stream element. This operation may be requested several times
- * so we need to encapsulate the logic in one place. This message will be sent while doing
- * TLS, SASL and resource binding.
- *
- * @throws IOException If an error occurs while sending the stanza to the server.
- */
- void openStream() throws IOException {
- StringBuilder stream = new StringBuilder();
- stream.append("");
- writer.write(stream.toString());
- writer.flush();
- }
-
-}
diff --git a/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java b/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java
index a5ec7eac7..181331239 100644
--- a/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java
+++ b/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java
@@ -20,21 +20,31 @@ import org.jivesoftware.smack.AbstractXMPPConnection;
import org.jivesoftware.smack.ConnectionConfiguration;
import org.jivesoftware.smack.ConnectionCreationListener;
import org.jivesoftware.smack.ConnectionListener;
-import org.jivesoftware.smack.SASLAuthentication;
import org.jivesoftware.smack.SmackConfiguration;
import org.jivesoftware.smack.SmackException;
import org.jivesoftware.smack.SmackException.AlreadyLoggedInException;
import org.jivesoftware.smack.SmackException.NoResponseException;
import org.jivesoftware.smack.SmackException.NotConnectedException;
import org.jivesoftware.smack.SmackException.ConnectionException;
+import org.jivesoftware.smack.SmackException.SecurityRequiredException;
+import org.jivesoftware.smack.XMPPException.StreamErrorException;
import org.jivesoftware.smack.XMPPConnection;
import org.jivesoftware.smack.XMPPException;
import org.jivesoftware.smack.compression.XMPPInputOutputStream;
import org.jivesoftware.smack.packet.Packet;
import org.jivesoftware.smack.packet.Presence;
import org.jivesoftware.smack.parsing.ParsingExceptionCallback;
+import org.jivesoftware.smack.parsing.UnparsablePacket;
+import org.jivesoftware.smack.sasl.SASLMechanism.Challenge;
+import org.jivesoftware.smack.sasl.SASLMechanism.SASLFailure;
+import org.jivesoftware.smack.sasl.SASLMechanism.Success;
+import org.jivesoftware.smack.util.ArrayBlockingQueueWithShutdown;
+import org.jivesoftware.smack.util.PacketParserUtils;
import org.jivesoftware.smack.util.StringUtils;
import org.jivesoftware.smack.util.dns.HostAddress;
+import org.xmlpull.v1.XmlPullParser;
+import org.xmlpull.v1.XmlPullParserException;
+import org.xmlpull.v1.XmlPullParserFactory;
import javax.net.ssl.KeyManager;
import javax.net.ssl.KeyManagerFactory;
@@ -54,7 +64,6 @@ import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
-import java.io.Reader;
import java.io.UnsupportedEncodingException;
import java.io.Writer;
import java.lang.reflect.Constructor;
@@ -67,6 +76,7 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -84,11 +94,12 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
/**
* The socket which is used for this connection.
*/
- Socket socket;
+ private Socket socket;
- String connectionID = null;
+ private String connectionID = null;
private String user = null;
private boolean connected = false;
+
// socketClosed is used concurrent
// by XMPPTCPConnection, PacketReader, PacketWriter
private volatile boolean socketClosed = false;
@@ -98,8 +109,8 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
private ParsingExceptionCallback parsingExceptionCallback = SmackConfiguration.getDefaultParsingExceptionCallback();
- PacketWriter packetWriter;
- PacketReader packetReader;
+ private PacketWriter packetWriter;
+ private PacketReader packetReader;
/**
* Collection of available stream compression methods offered by the server.
@@ -193,6 +204,7 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
config.setCallbackHandler(callbackHandler);
}
+ @Override
public String getConnectionID() {
if (!isConnected()) {
return null;
@@ -200,6 +212,7 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
return connectionID;
}
+ @Override
public String getUser() {
if (!isAuthenticated()) {
return null;
@@ -335,22 +348,26 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
callConnectionAuthenticatedListener();
}
+ @Override
public boolean isConnected() {
return connected;
}
+ @Override
public boolean isSecureConnection() {
- return isUsingTLS();
+ return usingTLS;
}
public boolean isSocketClosed() {
return socketClosed;
}
+ @Override
public boolean isAuthenticated() {
return authenticated;
}
+ @Override
public boolean isAnonymous() {
return anonymous;
}
@@ -454,8 +471,8 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
try {
if (isFirstInitialization) {
- packetWriter = new PacketWriter(this);
- packetReader = new PacketReader(this);
+ packetWriter = new PacketWriter();
+ packetReader = new PacketReader();
// If debugging is enabled, we should start the thread that will listen for
// all packets and then log them.
@@ -534,16 +551,6 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
* TLS code below
**********************************************/
- /**
- * Returns true if the connection to the server has successfully negotiated TLS. Once TLS
- * has been negotiatied the connection has been secured.
- *
- * @return true if the connection to the server has successfully negotiated TLS.
- */
- public boolean isUsingTLS() {
- return usingTLS;
- }
-
/**
* Notification message saying that the server supports TLS so confirm the server that we
* want to secure the connection.
@@ -551,7 +558,7 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
* @param required true when the server indicates that TLS is required.
* @throws IOException if an exception occurs.
*/
- void startTLSReceived(boolean required) throws IOException {
+ private void startTLSReceived(boolean required) throws IOException {
if (required && config.getSecurityMode() ==
ConnectionConfiguration.SecurityMode.disabled) {
notifyConnectionError(new IllegalStateException(
@@ -574,7 +581,7 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
*
* @throws Exception if an exception occurs.
*/
- void proceedTLSReceived() throws Exception {
+ private void proceedTLSReceived() throws Exception {
SSLContext context = this.config.getCustomSSLContext();
KeyStore ks = null;
KeyManager[] kms = null;
@@ -679,7 +686,7 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
*
* @param methods compression methods offered by the server.
*/
- void setAvailableCompressionMethods(Collection methods) {
+ private void setAvailableCompressionMethods(Collection methods) {
compressionMethods = methods;
}
@@ -700,6 +707,7 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
return null;
}
+ @Override
public boolean isUsingCompression() {
return compressionHandler != null && serverAckdCompression;
}
@@ -759,7 +767,7 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
*
* @throws IOException if there is an exception starting stream compression.
*/
- void startStreamCompression() throws IOException {
+ private void startStreamCompression() throws IOException {
serverAckdCompression = true;
// Initialize the reader and writer with the new secured version
initReaderAndWriter();
@@ -776,7 +784,7 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
* Notifies the XMPP connection that stream compression negotiation is done so that the
* connection process can proceed.
*/
- void streamCompressionNegotiationDone() {
+ private void streamCompressionNegotiationDone() {
synchronized (compressionLock) {
compressionLock.notify();
}
@@ -824,7 +832,7 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
*
* @param e the exception that causes the connection close event.
*/
- synchronized void notifyConnectionError(Exception e) {
+ private synchronized void notifyConnectionError(Exception e) {
// Listeners were already notified of the exception, return right here.
if ((packetReader == null || packetReader.done) &&
(packetWriter == null || packetWriter.done)) return;
@@ -836,61 +844,6 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
callConnectionClosedOnErrorListener(e);
}
- @Override
- protected void processPacket(Packet packet) {
- super.processPacket(packet);
- }
-
- @Override
- protected Reader getReader() {
- return super.getReader();
- }
-
- @Override
- protected Writer getWriter() {
- return super.getWriter();
- }
-
- @Override
- protected void throwConnectionExceptionOrNoResponse() throws IOException, NoResponseException {
- super.throwConnectionExceptionOrNoResponse();
- }
-
- @Override
- protected void setServiceName(String serviceName) {
- super.setServiceName(serviceName);
- }
-
- @Override
- protected void serverRequiresBinding() {
- super.serverRequiresBinding();
- }
-
- @Override
- protected void setServiceCapsNode(String node) {
- super.setServiceCapsNode(node);
- }
-
- @Override
- protected void serverSupportsSession() {
- super.serverSupportsSession();
- }
-
- @Override
- protected void setRosterVersioningSupported() {
- super.setRosterVersioningSupported();
- }
-
- @Override
- protected void serverSupportsAccountCreation() {
- super.serverSupportsAccountCreation();
- }
-
- @Override
- protected SASLAuthentication getSASLAuthentication() {
- return super.getSASLAuthentication();
- }
-
/**
* Sends a notification indicating that the connection was reconnected successfully.
*/
@@ -907,4 +860,514 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
}
}
}
+
+ protected class PacketReader {
+
+ private Thread readerThread;
+
+ private XmlPullParser parser;
+
+ /**
+ * Set to true if the last features stanza from the server has been parsed. A XMPP connection
+ * handshake can invoke multiple features stanzas, e.g. when TLS is activated a second feature
+ * stanza is send by the server. This is set to true once the last feature stanza has been
+ * parsed.
+ */
+ private volatile boolean lastFeaturesParsed;
+
+ private volatile boolean done;
+
+ PacketReader() throws SmackException {
+ this.init();
+ }
+
+ /**
+ * Initializes the reader in order to be used. The reader is initialized during the
+ * first connection and when reconnecting due to an abruptly disconnection.
+ *
+ * @throws SmackException if the parser could not be reset.
+ */
+ void init() throws SmackException {
+ done = false;
+ lastFeaturesParsed = false;
+
+ readerThread = new Thread() {
+ public void run() {
+ parsePackets(this);
+ }
+ };
+ readerThread.setName("Smack Packet Reader (" + getConnectionCounter() + ")");
+ readerThread.setDaemon(true);
+
+ resetParser();
+ }
+
+ /**
+ * Starts the packet reader thread and returns once a connection to the server
+ * has been established or if the server's features could not be parsed within
+ * the connection's PacketReplyTimeout.
+ *
+ * @throws NoResponseException if the server fails to send an opening stream back
+ * within packetReplyTimeout.
+ * @throws IOException
+ */
+ synchronized void startup() throws NoResponseException, IOException {
+ readerThread.start();
+
+ try {
+ // Wait until either:
+ // - the servers last features stanza has been parsed
+ // - an exception is thrown while parsing
+ // - the timeout occurs
+ wait(getPacketReplyTimeout());
+ }
+ catch (InterruptedException ie) {
+ // Ignore.
+ }
+ if (!lastFeaturesParsed) {
+ throwConnectionExceptionOrNoResponse();
+ }
+ }
+
+ /**
+ * Shuts the packet reader down. This method simply sets the 'done' flag to true.
+ */
+ void shutdown() {
+ done = true;
+ }
+
+ /**
+ * Resets the parser using the latest connection's reader. Reseting the parser is necessary
+ * when the plain connection has been secured or when a new opening stream element is going
+ * to be sent by the server.
+ *
+ * @throws SmackException if the parser could not be reset.
+ */
+ private void resetParser() throws SmackException {
+ try {
+ parser = XmlPullParserFactory.newInstance().newPullParser();
+ parser.setFeature(XmlPullParser.FEATURE_PROCESS_NAMESPACES, true);
+ parser.setInput(getReader());
+ }
+ catch (XmlPullParserException e) {
+ throw new SmackException(e);
+ }
+ }
+
+ /**
+ * Parse top-level packets in order to process them further.
+ *
+ * @param thread the thread that is being used by the reader to parse incoming packets.
+ */
+ private void parsePackets(Thread thread) {
+ try {
+ int eventType = parser.getEventType();
+ do {
+ if (eventType == XmlPullParser.START_TAG) {
+ int parserDepth = parser.getDepth();
+ String name = parser.getName();
+ ParsingExceptionCallback callback = getParsingExceptionCallback();
+ Packet packet;
+ try {
+ packet = PacketParserUtils.parseStanza(parser, XMPPTCPConnection.this);
+ } catch (Exception e) {
+ String content = PacketParserUtils.parseContentDepth(parser, parserDepth);
+ UnparsablePacket message = new UnparsablePacket(content, e);
+ if (callback != null) {
+ callback.handleUnparsablePacket(message);
+ }
+ continue;
+ }
+ if (packet != null) {
+ processPacket(packet);
+ }
+ // We found an opening stream. Record information about it, then notify
+ // the connectionID lock so that the packet reader startup can finish.
+ else if (name.equals("stream")) {
+ // Ensure the correct jabber:client namespace is being used.
+ if ("jabber:client".equals(parser.getNamespace(null))) {
+ // Get the connection id.
+ for (int i=0; i queue = new ArrayBlockingQueueWithShutdown(QUEUE_SIZE, true);
+
+ private Thread writerThread;
+ private Writer writer;
+
+ private volatile boolean done;
+
+ protected AtomicBoolean shutdownDone = new AtomicBoolean(false);
+
+ /**
+ * Creates a new packet writer with the specified connection.
+ */
+ PacketWriter() {
+ init();
+ }
+
+ /**
+ * Initializes the writer in order to be used. It is called at the first connection and also
+ * is invoked if the connection is disconnected by an error.
+ */
+ void init() {
+ writer = getWriter();
+ done = false;
+ shutdownDone.set(false);
+
+ queue.start();
+ writerThread = new Thread() {
+ public void run() {
+ writePackets(this);
+ }
+ };
+ writerThread.setName("Smack Packet Writer (" + getConnectionCounter() + ")");
+ writerThread.setDaemon(true);
+ }
+
+ /**
+ * Sends the specified packet to the server.
+ *
+ * @param packet the packet to send.
+ * @throws NotConnectedException
+ */
+ public void sendPacket(Packet packet) throws NotConnectedException {
+ if (done) {
+ throw new NotConnectedException();
+ }
+
+ try {
+ queue.put(packet);
+ }
+ catch (InterruptedException ie) {
+ throw new NotConnectedException();
+ }
+ }
+
+ /**
+ * Starts the packet writer thread and opens a connection to the server. The
+ * packet writer will continue writing packets until {@link #shutdown} or an
+ * error occurs.
+ */
+ void startup() {
+ writerThread.start();
+ }
+
+ void setWriter(Writer writer) {
+ this.writer = writer;
+ }
+
+ /**
+ * Shuts down the packet writer. Once this method has been called, no further
+ * packets will be written to the server.
+ */
+ void shutdown() {
+ done = true;
+ queue.shutdown();
+ synchronized(shutdownDone) {
+ if (!shutdownDone.get()) {
+ try {
+ shutdownDone.wait(getPacketReplyTimeout());
+ }
+ catch (InterruptedException e) {
+ LOGGER.log(Level.WARNING, "shutdown", e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Returns the next available packet from the queue for writing.
+ *
+ * @return the next packet for writing.
+ */
+ private Packet nextPacket() {
+ if (done) {
+ return null;
+ }
+
+ Packet packet = null;
+ try {
+ packet = queue.take();
+ }
+ catch (InterruptedException e) {
+ // Do nothing
+ }
+ return packet;
+ }
+
+ private void writePackets(Thread thisThread) {
+ try {
+ // Open the stream.
+ openStream();
+ // Write out packets from the queue.
+ while (!done && (writerThread == thisThread)) {
+ Packet packet = nextPacket();
+ if (packet != null) {
+ writer.write(packet.toXML().toString());
+
+ if (queue.isEmpty()) {
+ writer.flush();
+ }
+ }
+ }
+ // Flush out the rest of the queue. If the queue is extremely large, it's possible
+ // we won't have time to entirely flush it before the socket is forced closed
+ // by the shutdown process.
+ try {
+ while (!queue.isEmpty()) {
+ Packet packet = queue.remove();
+ writer.write(packet.toXML().toString());
+ }
+ writer.flush();
+ }
+ catch (Exception e) {
+ LOGGER.log(Level.WARNING, "Exception flushing queue during shutdown, ignore and continue", e);
+ }
+
+ // Delete the queue contents (hopefully nothing is left).
+ queue.clear();
+
+ // Close the stream.
+ try {
+ writer.write("");
+ writer.flush();
+ }
+ catch (Exception e) {
+ LOGGER.log(Level.WARNING, "Exception writing closing stream element", e);
+
+ }
+ finally {
+ try {
+ writer.close();
+ }
+ catch (Exception e) {
+ // Do nothing
+ }
+ }
+
+ shutdownDone.set(true);
+ synchronized(shutdownDone) {
+ shutdownDone.notify();
+ }
+ }
+ catch (IOException ioe) {
+ // The exception can be ignored if the the connection is 'done'
+ // or if the it was caused because the socket got closed
+ if (!(done || isSocketClosed())) {
+ shutdown();
+ notifyConnectionError(ioe);
+ }
+ }
+ }
+
+ /**
+ * Sends to the server a new stream element. This operation may be requested several times
+ * so we need to encapsulate the logic in one place. This message will be sent while doing
+ * TLS, SASL and resource binding.
+ *
+ * @throws IOException If an error occurs while sending the stanza to the server.
+ */
+ void openStream() throws IOException {
+ StringBuilder stream = new StringBuilder();
+ stream.append("");
+ writer.write(stream.toString());
+ writer.flush();
+ }
+ }
}
diff --git a/smack-tcp/src/test/java/org/jivesoftware/smack/tcp/PacketWriterTest.java b/smack-tcp/src/test/java/org/jivesoftware/smack/tcp/PacketWriterTest.java
index f344c590d..8cc428f61 100644
--- a/smack-tcp/src/test/java/org/jivesoftware/smack/tcp/PacketWriterTest.java
+++ b/smack-tcp/src/test/java/org/jivesoftware/smack/tcp/PacketWriterTest.java
@@ -23,6 +23,7 @@ import java.util.concurrent.CyclicBarrier;
import org.jivesoftware.smack.SmackException.NotConnectedException;
import org.jivesoftware.smack.packet.Message;
+import org.jivesoftware.smack.tcp.XMPPTCPConnection.PacketWriter;
import org.junit.Test;
import static org.junit.Assert.fail;
@@ -44,11 +45,11 @@ public class PacketWriterTest {
@Test
public void shouldBlockAndUnblockTest() throws InterruptedException, BrokenBarrierException, NotConnectedException {
XMPPTCPConnection connection = new XMPPTCPConnection("foobar.com");
- final PacketWriter pw = new PacketWriter(connection);
+ final PacketWriter pw = connection.new PacketWriter();
pw.setWriter(new BlockingStringWriter());
pw.startup();
- for (int i = 0; i < PacketWriter.QUEUE_SIZE; i++) {
+ for (int i = 0; i < XMPPTCPConnection.PacketWriter.QUEUE_SIZE; i++) {
pw.sendPacket(new Message());
}