From ca4fbcf2b0b4d99fe2c2327555c770725608f51a Mon Sep 17 00:00:00 2001 From: Florian Schmaus Date: Mon, 26 May 2014 22:02:04 +0200 Subject: [PATCH] Make PacketReader and PacketWriter nested classes of XMPPTCPConnection. --- .../jivesoftware/smack/tcp/PacketReader.java | 363 ---------- .../jivesoftware/smack/tcp/PacketWriter.java | 238 ------- .../smack/tcp/XMPPTCPConnection.java | 623 +++++++++++++++--- .../smack/tcp/PacketWriterTest.java | 5 +- 4 files changed, 546 insertions(+), 683 deletions(-) delete mode 100644 smack-tcp/src/main/java/org/jivesoftware/smack/tcp/PacketReader.java delete mode 100644 smack-tcp/src/main/java/org/jivesoftware/smack/tcp/PacketWriter.java diff --git a/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/PacketReader.java b/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/PacketReader.java deleted file mode 100644 index 039f6ec7b..000000000 --- a/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/PacketReader.java +++ /dev/null @@ -1,363 +0,0 @@ -/** - * - * Copyright 2003-2007 Jive Software. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.jivesoftware.smack.tcp; - -import java.io.IOException; - -import org.jivesoftware.smack.packet.Packet; -import org.jivesoftware.smack.parsing.ParsingExceptionCallback; -import org.jivesoftware.smack.parsing.UnparsablePacket; -import org.jivesoftware.smack.sasl.SASLMechanism.Challenge; -import org.jivesoftware.smack.sasl.SASLMechanism.SASLFailure; -import org.jivesoftware.smack.sasl.SASLMechanism.Success; -import org.jivesoftware.smack.util.PacketParserUtils; -import org.jivesoftware.smack.ConnectionConfiguration; -import org.jivesoftware.smack.SmackException; -import org.jivesoftware.smack.SmackException.NoResponseException; -import org.jivesoftware.smack.SmackException.SecurityRequiredException; -import org.jivesoftware.smack.XMPPException.StreamErrorException; -import org.xmlpull.v1.XmlPullParserFactory; -import org.xmlpull.v1.XmlPullParser; -import org.xmlpull.v1.XmlPullParserException; - -/** - * Listens for XML traffic from the XMPP server and parses it into packet objects. - * The packet reader also invokes all packet listeners and collectors.

- * - * @see XMPPConnection#createPacketCollector - * @see XMPPConnection#addPacketListener - * @author Matt Tucker - */ -class PacketReader { - - private Thread readerThread; - - private XMPPTCPConnection connection; - private XmlPullParser parser; - - /** - * Set to true if the last features stanza from the server has been parsed. A XMPP connection - * handshake can invoke multiple features stanzas, e.g. when TLS is activated a second feature - * stanza is send by the server. This is set to true once the last feature stanza has been - * parsed. - */ - private volatile boolean lastFeaturesParsed; - - volatile boolean done; - - protected PacketReader(final XMPPTCPConnection connection) throws SmackException { - this.connection = connection; - this.init(); - } - - /** - * Initializes the reader in order to be used. The reader is initialized during the - * first connection and when reconnecting due to an abruptly disconnection. - * - * @throws SmackException if the parser could not be reset. - */ - protected void init() throws SmackException { - done = false; - lastFeaturesParsed = false; - - readerThread = new Thread() { - public void run() { - parsePackets(this); - } - }; - readerThread.setName("Smack Packet Reader (" + connection.getConnectionCounter() + ")"); - readerThread.setDaemon(true); - - resetParser(); - } - - /** - * Starts the packet reader thread and returns once a connection to the server - * has been established or if the server's features could not be parsed within - * the connection's PacketReplyTimeout. - * - * @throws NoResponseException if the server fails to send an opening stream back - * within packetReplyTimeout. - * @throws IOException - */ - synchronized public void startup() throws NoResponseException, IOException { - readerThread.start(); - - try { - // Wait until either: - // - the servers last features stanza has been parsed - // - an exception is thrown while parsing - // - the timeout occurs - wait(connection.getPacketReplyTimeout()); - } - catch (InterruptedException ie) { - // Ignore. - } - if (!lastFeaturesParsed) { - connection.throwConnectionExceptionOrNoResponse(); - } - } - - /** - * Shuts the packet reader down. This method simply sets the 'done' flag to true. - */ - public void shutdown() { - done = true; - } - - /** - * Resets the parser using the latest connection's reader. Reseting the parser is necessary - * when the plain connection has been secured or when a new opening stream element is going - * to be sent by the server. - * - * @throws SmackException if the parser could not be reset. - */ - private void resetParser() throws SmackException { - try { - parser = XmlPullParserFactory.newInstance().newPullParser(); - parser.setFeature(XmlPullParser.FEATURE_PROCESS_NAMESPACES, true); - parser.setInput(connection.getReader()); - } - catch (XmlPullParserException e) { - throw new SmackException(e); - } - } - - /** - * Parse top-level packets in order to process them further. - * - * @param thread the thread that is being used by the reader to parse incoming packets. - */ - private void parsePackets(Thread thread) { - try { - int eventType = parser.getEventType(); - do { - if (eventType == XmlPullParser.START_TAG) { - int parserDepth = parser.getDepth(); - String name = parser.getName(); - ParsingExceptionCallback callback = connection.getParsingExceptionCallback(); - Packet packet; - try { - packet = PacketParserUtils.parseStanza(parser, connection); - } catch (Exception e) { - String content = PacketParserUtils.parseContentDepth(parser, parserDepth); - UnparsablePacket message = new UnparsablePacket(content, e); - if (callback != null) { - callback.handleUnparsablePacket(message); - } - continue; - } - if (packet != null) { - connection.processPacket(packet); - } - // We found an opening stream. Record information about it, then notify - // the connectionID lock so that the packet reader startup can finish. - else if (name.equals("stream")) { - // Ensure the correct jabber:client namespace is being used. - if ("jabber:client".equals(parser.getNamespace(null))) { - // Get the connection id. - for (int i=0; i queue = new ArrayBlockingQueueWithShutdown(QUEUE_SIZE, true); - - private Thread writerThread; - private Writer writer; - - volatile boolean done; - - AtomicBoolean shutdownDone = new AtomicBoolean(false); - - /** - * Creates a new packet writer with the specified connection. - * - * @param connection the connection. - */ - protected PacketWriter(XMPPTCPConnection connection) { - this.connection = connection; - init(); - } - - /** - * Initializes the writer in order to be used. It is called at the first connection and also - * is invoked if the connection is disconnected by an error. - */ - protected void init() { - writer = connection.getWriter(); - done = false; - shutdownDone.set(false); - - queue.start(); - writerThread = new Thread() { - public void run() { - writePackets(this); - } - }; - writerThread.setName("Smack Packet Writer (" + connection.getConnectionCounter() + ")"); - writerThread.setDaemon(true); - } - - /** - * Sends the specified packet to the server. - * - * @param packet the packet to send. - * @throws NotConnectedException - */ - public void sendPacket(Packet packet) throws NotConnectedException { - if (done) { - throw new NotConnectedException(); - } - - try { - queue.put(packet); - } - catch (InterruptedException ie) { - throw new NotConnectedException(); - } - } - - /** - * Starts the packet writer thread and opens a connection to the server. The - * packet writer will continue writing packets until {@link #shutdown} or an - * error occurs. - */ - public void startup() { - writerThread.start(); - } - - void setWriter(Writer writer) { - this.writer = writer; - } - - /** - * Shuts down the packet writer. Once this method has been called, no further - * packets will be written to the server. - */ - public void shutdown() { - done = true; - queue.shutdown(); - synchronized(shutdownDone) { - if (!shutdownDone.get()) { - try { - shutdownDone.wait(connection.getPacketReplyTimeout()); - } - catch (InterruptedException e) { - LOGGER.log(Level.WARNING, "shutdown", e); - } - } - } - } - - /** - * Returns the next available packet from the queue for writing. - * - * @return the next packet for writing. - */ - private Packet nextPacket() { - if (done) { - return null; - } - - Packet packet = null; - try { - packet = queue.take(); - } - catch (InterruptedException e) { - // Do nothing - } - return packet; - } - - private void writePackets(Thread thisThread) { - try { - // Open the stream. - openStream(); - // Write out packets from the queue. - while (!done && (writerThread == thisThread)) { - Packet packet = nextPacket(); - if (packet != null) { - writer.write(packet.toXML().toString()); - - if (queue.isEmpty()) { - writer.flush(); - } - } - } - // Flush out the rest of the queue. If the queue is extremely large, it's possible - // we won't have time to entirely flush it before the socket is forced closed - // by the shutdown process. - try { - while (!queue.isEmpty()) { - Packet packet = queue.remove(); - writer.write(packet.toXML().toString()); - } - writer.flush(); - } - catch (Exception e) { - LOGGER.log(Level.WARNING, "Exception flushing queue during shutdown, ignore and continue", e); - } - - // Delete the queue contents (hopefully nothing is left). - queue.clear(); - - // Close the stream. - try { - writer.write(""); - writer.flush(); - } - catch (Exception e) { - LOGGER.log(Level.WARNING, "Exception writing closing stream element", e); - - } - finally { - try { - writer.close(); - } - catch (Exception e) { - // Do nothing - } - } - - shutdownDone.set(true); - synchronized(shutdownDone) { - shutdownDone.notify(); - } - } - catch (IOException ioe) { - // The exception can be ignored if the the connection is 'done' - // or if the it was caused because the socket got closed - if (!(done || connection.isSocketClosed())) { - shutdown(); - connection.notifyConnectionError(ioe); - } - } - } - - /** - * Sends to the server a new stream element. This operation may be requested several times - * so we need to encapsulate the logic in one place. This message will be sent while doing - * TLS, SASL and resource binding. - * - * @throws IOException If an error occurs while sending the stanza to the server. - */ - void openStream() throws IOException { - StringBuilder stream = new StringBuilder(); - stream.append(""); - writer.write(stream.toString()); - writer.flush(); - } - -} diff --git a/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java b/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java index a5ec7eac7..181331239 100644 --- a/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java +++ b/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java @@ -20,21 +20,31 @@ import org.jivesoftware.smack.AbstractXMPPConnection; import org.jivesoftware.smack.ConnectionConfiguration; import org.jivesoftware.smack.ConnectionCreationListener; import org.jivesoftware.smack.ConnectionListener; -import org.jivesoftware.smack.SASLAuthentication; import org.jivesoftware.smack.SmackConfiguration; import org.jivesoftware.smack.SmackException; import org.jivesoftware.smack.SmackException.AlreadyLoggedInException; import org.jivesoftware.smack.SmackException.NoResponseException; import org.jivesoftware.smack.SmackException.NotConnectedException; import org.jivesoftware.smack.SmackException.ConnectionException; +import org.jivesoftware.smack.SmackException.SecurityRequiredException; +import org.jivesoftware.smack.XMPPException.StreamErrorException; import org.jivesoftware.smack.XMPPConnection; import org.jivesoftware.smack.XMPPException; import org.jivesoftware.smack.compression.XMPPInputOutputStream; import org.jivesoftware.smack.packet.Packet; import org.jivesoftware.smack.packet.Presence; import org.jivesoftware.smack.parsing.ParsingExceptionCallback; +import org.jivesoftware.smack.parsing.UnparsablePacket; +import org.jivesoftware.smack.sasl.SASLMechanism.Challenge; +import org.jivesoftware.smack.sasl.SASLMechanism.SASLFailure; +import org.jivesoftware.smack.sasl.SASLMechanism.Success; +import org.jivesoftware.smack.util.ArrayBlockingQueueWithShutdown; +import org.jivesoftware.smack.util.PacketParserUtils; import org.jivesoftware.smack.util.StringUtils; import org.jivesoftware.smack.util.dns.HostAddress; +import org.xmlpull.v1.XmlPullParser; +import org.xmlpull.v1.XmlPullParserException; +import org.xmlpull.v1.XmlPullParserFactory; import javax.net.ssl.KeyManager; import javax.net.ssl.KeyManagerFactory; @@ -54,7 +64,6 @@ import java.io.InputStream; import java.io.InputStreamReader; import java.io.OutputStream; import java.io.OutputStreamWriter; -import java.io.Reader; import java.io.UnsupportedEncodingException; import java.io.Writer; import java.lang.reflect.Constructor; @@ -67,6 +76,7 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Locale; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Level; import java.util.logging.Logger; @@ -84,11 +94,12 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { /** * The socket which is used for this connection. */ - Socket socket; + private Socket socket; - String connectionID = null; + private String connectionID = null; private String user = null; private boolean connected = false; + // socketClosed is used concurrent // by XMPPTCPConnection, PacketReader, PacketWriter private volatile boolean socketClosed = false; @@ -98,8 +109,8 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { private ParsingExceptionCallback parsingExceptionCallback = SmackConfiguration.getDefaultParsingExceptionCallback(); - PacketWriter packetWriter; - PacketReader packetReader; + private PacketWriter packetWriter; + private PacketReader packetReader; /** * Collection of available stream compression methods offered by the server. @@ -193,6 +204,7 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { config.setCallbackHandler(callbackHandler); } + @Override public String getConnectionID() { if (!isConnected()) { return null; @@ -200,6 +212,7 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { return connectionID; } + @Override public String getUser() { if (!isAuthenticated()) { return null; @@ -335,22 +348,26 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { callConnectionAuthenticatedListener(); } + @Override public boolean isConnected() { return connected; } + @Override public boolean isSecureConnection() { - return isUsingTLS(); + return usingTLS; } public boolean isSocketClosed() { return socketClosed; } + @Override public boolean isAuthenticated() { return authenticated; } + @Override public boolean isAnonymous() { return anonymous; } @@ -454,8 +471,8 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { try { if (isFirstInitialization) { - packetWriter = new PacketWriter(this); - packetReader = new PacketReader(this); + packetWriter = new PacketWriter(); + packetReader = new PacketReader(); // If debugging is enabled, we should start the thread that will listen for // all packets and then log them. @@ -534,16 +551,6 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { * TLS code below **********************************************/ - /** - * Returns true if the connection to the server has successfully negotiated TLS. Once TLS - * has been negotiatied the connection has been secured. - * - * @return true if the connection to the server has successfully negotiated TLS. - */ - public boolean isUsingTLS() { - return usingTLS; - } - /** * Notification message saying that the server supports TLS so confirm the server that we * want to secure the connection. @@ -551,7 +558,7 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { * @param required true when the server indicates that TLS is required. * @throws IOException if an exception occurs. */ - void startTLSReceived(boolean required) throws IOException { + private void startTLSReceived(boolean required) throws IOException { if (required && config.getSecurityMode() == ConnectionConfiguration.SecurityMode.disabled) { notifyConnectionError(new IllegalStateException( @@ -574,7 +581,7 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { * * @throws Exception if an exception occurs. */ - void proceedTLSReceived() throws Exception { + private void proceedTLSReceived() throws Exception { SSLContext context = this.config.getCustomSSLContext(); KeyStore ks = null; KeyManager[] kms = null; @@ -679,7 +686,7 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { * * @param methods compression methods offered by the server. */ - void setAvailableCompressionMethods(Collection methods) { + private void setAvailableCompressionMethods(Collection methods) { compressionMethods = methods; } @@ -700,6 +707,7 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { return null; } + @Override public boolean isUsingCompression() { return compressionHandler != null && serverAckdCompression; } @@ -759,7 +767,7 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { * * @throws IOException if there is an exception starting stream compression. */ - void startStreamCompression() throws IOException { + private void startStreamCompression() throws IOException { serverAckdCompression = true; // Initialize the reader and writer with the new secured version initReaderAndWriter(); @@ -776,7 +784,7 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { * Notifies the XMPP connection that stream compression negotiation is done so that the * connection process can proceed. */ - void streamCompressionNegotiationDone() { + private void streamCompressionNegotiationDone() { synchronized (compressionLock) { compressionLock.notify(); } @@ -824,7 +832,7 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { * * @param e the exception that causes the connection close event. */ - synchronized void notifyConnectionError(Exception e) { + private synchronized void notifyConnectionError(Exception e) { // Listeners were already notified of the exception, return right here. if ((packetReader == null || packetReader.done) && (packetWriter == null || packetWriter.done)) return; @@ -836,61 +844,6 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { callConnectionClosedOnErrorListener(e); } - @Override - protected void processPacket(Packet packet) { - super.processPacket(packet); - } - - @Override - protected Reader getReader() { - return super.getReader(); - } - - @Override - protected Writer getWriter() { - return super.getWriter(); - } - - @Override - protected void throwConnectionExceptionOrNoResponse() throws IOException, NoResponseException { - super.throwConnectionExceptionOrNoResponse(); - } - - @Override - protected void setServiceName(String serviceName) { - super.setServiceName(serviceName); - } - - @Override - protected void serverRequiresBinding() { - super.serverRequiresBinding(); - } - - @Override - protected void setServiceCapsNode(String node) { - super.setServiceCapsNode(node); - } - - @Override - protected void serverSupportsSession() { - super.serverSupportsSession(); - } - - @Override - protected void setRosterVersioningSupported() { - super.setRosterVersioningSupported(); - } - - @Override - protected void serverSupportsAccountCreation() { - super.serverSupportsAccountCreation(); - } - - @Override - protected SASLAuthentication getSASLAuthentication() { - return super.getSASLAuthentication(); - } - /** * Sends a notification indicating that the connection was reconnected successfully. */ @@ -907,4 +860,514 @@ public class XMPPTCPConnection extends AbstractXMPPConnection { } } } + + protected class PacketReader { + + private Thread readerThread; + + private XmlPullParser parser; + + /** + * Set to true if the last features stanza from the server has been parsed. A XMPP connection + * handshake can invoke multiple features stanzas, e.g. when TLS is activated a second feature + * stanza is send by the server. This is set to true once the last feature stanza has been + * parsed. + */ + private volatile boolean lastFeaturesParsed; + + private volatile boolean done; + + PacketReader() throws SmackException { + this.init(); + } + + /** + * Initializes the reader in order to be used. The reader is initialized during the + * first connection and when reconnecting due to an abruptly disconnection. + * + * @throws SmackException if the parser could not be reset. + */ + void init() throws SmackException { + done = false; + lastFeaturesParsed = false; + + readerThread = new Thread() { + public void run() { + parsePackets(this); + } + }; + readerThread.setName("Smack Packet Reader (" + getConnectionCounter() + ")"); + readerThread.setDaemon(true); + + resetParser(); + } + + /** + * Starts the packet reader thread and returns once a connection to the server + * has been established or if the server's features could not be parsed within + * the connection's PacketReplyTimeout. + * + * @throws NoResponseException if the server fails to send an opening stream back + * within packetReplyTimeout. + * @throws IOException + */ + synchronized void startup() throws NoResponseException, IOException { + readerThread.start(); + + try { + // Wait until either: + // - the servers last features stanza has been parsed + // - an exception is thrown while parsing + // - the timeout occurs + wait(getPacketReplyTimeout()); + } + catch (InterruptedException ie) { + // Ignore. + } + if (!lastFeaturesParsed) { + throwConnectionExceptionOrNoResponse(); + } + } + + /** + * Shuts the packet reader down. This method simply sets the 'done' flag to true. + */ + void shutdown() { + done = true; + } + + /** + * Resets the parser using the latest connection's reader. Reseting the parser is necessary + * when the plain connection has been secured or when a new opening stream element is going + * to be sent by the server. + * + * @throws SmackException if the parser could not be reset. + */ + private void resetParser() throws SmackException { + try { + parser = XmlPullParserFactory.newInstance().newPullParser(); + parser.setFeature(XmlPullParser.FEATURE_PROCESS_NAMESPACES, true); + parser.setInput(getReader()); + } + catch (XmlPullParserException e) { + throw new SmackException(e); + } + } + + /** + * Parse top-level packets in order to process them further. + * + * @param thread the thread that is being used by the reader to parse incoming packets. + */ + private void parsePackets(Thread thread) { + try { + int eventType = parser.getEventType(); + do { + if (eventType == XmlPullParser.START_TAG) { + int parserDepth = parser.getDepth(); + String name = parser.getName(); + ParsingExceptionCallback callback = getParsingExceptionCallback(); + Packet packet; + try { + packet = PacketParserUtils.parseStanza(parser, XMPPTCPConnection.this); + } catch (Exception e) { + String content = PacketParserUtils.parseContentDepth(parser, parserDepth); + UnparsablePacket message = new UnparsablePacket(content, e); + if (callback != null) { + callback.handleUnparsablePacket(message); + } + continue; + } + if (packet != null) { + processPacket(packet); + } + // We found an opening stream. Record information about it, then notify + // the connectionID lock so that the packet reader startup can finish. + else if (name.equals("stream")) { + // Ensure the correct jabber:client namespace is being used. + if ("jabber:client".equals(parser.getNamespace(null))) { + // Get the connection id. + for (int i=0; i queue = new ArrayBlockingQueueWithShutdown(QUEUE_SIZE, true); + + private Thread writerThread; + private Writer writer; + + private volatile boolean done; + + protected AtomicBoolean shutdownDone = new AtomicBoolean(false); + + /** + * Creates a new packet writer with the specified connection. + */ + PacketWriter() { + init(); + } + + /** + * Initializes the writer in order to be used. It is called at the first connection and also + * is invoked if the connection is disconnected by an error. + */ + void init() { + writer = getWriter(); + done = false; + shutdownDone.set(false); + + queue.start(); + writerThread = new Thread() { + public void run() { + writePackets(this); + } + }; + writerThread.setName("Smack Packet Writer (" + getConnectionCounter() + ")"); + writerThread.setDaemon(true); + } + + /** + * Sends the specified packet to the server. + * + * @param packet the packet to send. + * @throws NotConnectedException + */ + public void sendPacket(Packet packet) throws NotConnectedException { + if (done) { + throw new NotConnectedException(); + } + + try { + queue.put(packet); + } + catch (InterruptedException ie) { + throw new NotConnectedException(); + } + } + + /** + * Starts the packet writer thread and opens a connection to the server. The + * packet writer will continue writing packets until {@link #shutdown} or an + * error occurs. + */ + void startup() { + writerThread.start(); + } + + void setWriter(Writer writer) { + this.writer = writer; + } + + /** + * Shuts down the packet writer. Once this method has been called, no further + * packets will be written to the server. + */ + void shutdown() { + done = true; + queue.shutdown(); + synchronized(shutdownDone) { + if (!shutdownDone.get()) { + try { + shutdownDone.wait(getPacketReplyTimeout()); + } + catch (InterruptedException e) { + LOGGER.log(Level.WARNING, "shutdown", e); + } + } + } + } + + /** + * Returns the next available packet from the queue for writing. + * + * @return the next packet for writing. + */ + private Packet nextPacket() { + if (done) { + return null; + } + + Packet packet = null; + try { + packet = queue.take(); + } + catch (InterruptedException e) { + // Do nothing + } + return packet; + } + + private void writePackets(Thread thisThread) { + try { + // Open the stream. + openStream(); + // Write out packets from the queue. + while (!done && (writerThread == thisThread)) { + Packet packet = nextPacket(); + if (packet != null) { + writer.write(packet.toXML().toString()); + + if (queue.isEmpty()) { + writer.flush(); + } + } + } + // Flush out the rest of the queue. If the queue is extremely large, it's possible + // we won't have time to entirely flush it before the socket is forced closed + // by the shutdown process. + try { + while (!queue.isEmpty()) { + Packet packet = queue.remove(); + writer.write(packet.toXML().toString()); + } + writer.flush(); + } + catch (Exception e) { + LOGGER.log(Level.WARNING, "Exception flushing queue during shutdown, ignore and continue", e); + } + + // Delete the queue contents (hopefully nothing is left). + queue.clear(); + + // Close the stream. + try { + writer.write(""); + writer.flush(); + } + catch (Exception e) { + LOGGER.log(Level.WARNING, "Exception writing closing stream element", e); + + } + finally { + try { + writer.close(); + } + catch (Exception e) { + // Do nothing + } + } + + shutdownDone.set(true); + synchronized(shutdownDone) { + shutdownDone.notify(); + } + } + catch (IOException ioe) { + // The exception can be ignored if the the connection is 'done' + // or if the it was caused because the socket got closed + if (!(done || isSocketClosed())) { + shutdown(); + notifyConnectionError(ioe); + } + } + } + + /** + * Sends to the server a new stream element. This operation may be requested several times + * so we need to encapsulate the logic in one place. This message will be sent while doing + * TLS, SASL and resource binding. + * + * @throws IOException If an error occurs while sending the stanza to the server. + */ + void openStream() throws IOException { + StringBuilder stream = new StringBuilder(); + stream.append(""); + writer.write(stream.toString()); + writer.flush(); + } + } } diff --git a/smack-tcp/src/test/java/org/jivesoftware/smack/tcp/PacketWriterTest.java b/smack-tcp/src/test/java/org/jivesoftware/smack/tcp/PacketWriterTest.java index f344c590d..8cc428f61 100644 --- a/smack-tcp/src/test/java/org/jivesoftware/smack/tcp/PacketWriterTest.java +++ b/smack-tcp/src/test/java/org/jivesoftware/smack/tcp/PacketWriterTest.java @@ -23,6 +23,7 @@ import java.util.concurrent.CyclicBarrier; import org.jivesoftware.smack.SmackException.NotConnectedException; import org.jivesoftware.smack.packet.Message; +import org.jivesoftware.smack.tcp.XMPPTCPConnection.PacketWriter; import org.junit.Test; import static org.junit.Assert.fail; @@ -44,11 +45,11 @@ public class PacketWriterTest { @Test public void shouldBlockAndUnblockTest() throws InterruptedException, BrokenBarrierException, NotConnectedException { XMPPTCPConnection connection = new XMPPTCPConnection("foobar.com"); - final PacketWriter pw = new PacketWriter(connection); + final PacketWriter pw = connection.new PacketWriter(); pw.setWriter(new BlockingStringWriter()); pw.startup(); - for (int i = 0; i < PacketWriter.QUEUE_SIZE; i++) { + for (int i = 0; i < XMPPTCPConnection.PacketWriter.QUEUE_SIZE; i++) { pw.sendPacket(new Message()); }