1
0
Fork 0
mirror of https://codeberg.org/Mercury-IM/Smack synced 2024-11-27 00:32:07 +01:00

Optimized concurrency in packet writer, better job of cleanup on disconnect.

git-svn-id: http://svn.igniterealtime.org/svn/repos/smack/trunk@7183 b35dd754-fafc-0310-a699-88a17e54d16e
This commit is contained in:
Matt Tucker 2007-02-19 05:48:57 +00:00 committed by matt
parent 319dcd408f
commit 3e4a1ed5b5
3 changed files with 62 additions and 89 deletions

View file

@ -50,7 +50,8 @@ class PacketReader {
private XmlPullParser parser; private XmlPullParser parser;
private boolean done; private boolean done;
private Collection<PacketCollector> collectors = new ConcurrentLinkedQueue<PacketCollector>(); private Collection<PacketCollector> collectors = new ConcurrentLinkedQueue<PacketCollector>();
protected final Map<PacketListener, ListenerWrapper> listeners = new ConcurrentHashMap<PacketListener, ListenerWrapper>(); protected final Map<PacketListener, ListenerWrapper> listeners =
new ConcurrentHashMap<PacketListener, ListenerWrapper>();
protected final Collection<ConnectionListener> connectionListeners = protected final Collection<ConnectionListener> connectionListeners =
new CopyOnWriteArrayList<ConnectionListener>(); new CopyOnWriteArrayList<ConnectionListener>();
@ -197,6 +198,15 @@ class PacketReader {
} }
} }
/**
* Cleans up all resources used by the packet reader.
*/
void cleanup() {
connectionListeners.clear();
listeners.clear();
collectors.clear();
}
/** /**
* Sends out a notification that there was an error with the connection * Sends out a notification that there was an error with the connection
* and closes the connection. * and closes the connection.
@ -824,7 +834,7 @@ class PacketReader {
/** /**
* A wrapper class to associate a packet collector with a listener. * A wrapper class to associate a packet collector with a listener.
*/ */
protected static class ListenerWrapper { private static class ListenerWrapper {
private PacketListener packetListener; private PacketListener packetListener;
private PacketCollector packetCollector; private PacketCollector packetCollector;

View file

@ -25,10 +25,10 @@ import org.jivesoftware.smack.packet.Packet;
import java.io.IOException; import java.io.IOException;
import java.io.Writer; import java.io.Writer;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue; import java.util.Queue;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentHashMap;
/** /**
* Writes packets to a XMPP server. Packets are sent using a dedicated thread. Packet * Writes packets to a XMPP server. Packets are sent using a dedicated thread. Packet
@ -43,11 +43,11 @@ class PacketWriter {
private Thread keepAliveThread; private Thread keepAliveThread;
private Writer writer; private Writer writer;
private XMPPConnection connection; private XMPPConnection connection;
final private Queue<Packet> queue; private final Queue<Packet> queue;
private boolean done; private boolean done;
final protected List<ListenerWrapper> listeners = new ArrayList<ListenerWrapper>(); private final Map<PacketListener, ListenerWrapper> listeners =
private boolean listenersDeleted; new ConcurrentHashMap<PacketListener, ListenerWrapper>();
/** /**
* Timestamp when the last stanza was sent to the server. This information is used * Timestamp when the last stanza was sent to the server. This information is used
@ -56,15 +56,12 @@ class PacketWriter {
private long lastActive = System.currentTimeMillis(); private long lastActive = System.currentTimeMillis();
/** /**
* List of PacketInterceptor that will be notified when a new packet is about to be * List of PacketInterceptors that will be notified when a new packet is about to be
* sent to the server. These interceptors may modify the packet before it is being * sent to the server. These interceptors may modify the packet before it is being
* actually sent to the server. * actually sent to the server.
*/ */
final private List interceptors = new ArrayList(); private final Map<PacketInterceptor, InterceptorWrapper> interceptors =
/** new ConcurrentHashMap<PacketInterceptor, InterceptorWrapper>();
* Flag that indicates if an interceptor was deleted. This is an optimization flag.
*/
private boolean interceptorDeleted = false;
/** /**
* Creates a new packet writer with the specified connection. * Creates a new packet writer with the specified connection.
@ -83,8 +80,6 @@ class PacketWriter {
*/ */
protected void init() { protected void init() {
this.writer = connection.writer; this.writer = connection.writer;
listenersDeleted = false;
interceptorDeleted = false;
done = false; done = false;
writerThread = new Thread() { writerThread = new Thread() {
@ -130,9 +125,7 @@ class PacketWriter {
* @param packetFilter the packet filter to use. * @param packetFilter the packet filter to use.
*/ */
public void addPacketListener(PacketListener packetListener, PacketFilter packetFilter) { public void addPacketListener(PacketListener packetListener, PacketFilter packetFilter) {
synchronized (listeners) { listeners.put(packetListener, new ListenerWrapper(packetListener, packetFilter));
listeners.add(new ListenerWrapper(packetListener, packetFilter));
}
} }
/** /**
@ -141,17 +134,7 @@ class PacketWriter {
* @param packetListener the packet listener to remove. * @param packetListener the packet listener to remove.
*/ */
public void removePacketListener(PacketListener packetListener) { public void removePacketListener(PacketListener packetListener) {
synchronized (listeners) { listeners.remove(packetListener);
for (int i=0; i<listeners.size(); i++) {
ListenerWrapper wrapper = listeners.get(i);
if (wrapper != null && wrapper.packetListener.equals(packetListener)) {
listeners.set(i, null);
// Set the flag to indicate that the listener list needs
// to be cleaned up.
listenersDeleted = true;
}
}
}
} }
/** /**
@ -160,10 +143,8 @@ class PacketWriter {
* @return the count of packet listeners. * @return the count of packet listeners.
*/ */
public int getPacketListenerCount() { public int getPacketListenerCount() {
synchronized (listeners) {
return listeners.size(); return listeners.size();
} }
}
/** /**
* Registers a packet interceptor with this writer. The interceptor will be * Registers a packet interceptor with this writer. The interceptor will be
@ -175,9 +156,7 @@ class PacketWriter {
* @param packetFilter the packet filter to use. * @param packetFilter the packet filter to use.
*/ */
public void addPacketInterceptor(PacketInterceptor packetInterceptor, PacketFilter packetFilter) { public void addPacketInterceptor(PacketInterceptor packetInterceptor, PacketFilter packetFilter) {
synchronized (interceptors) { interceptors.put(packetInterceptor, new InterceptorWrapper(packetInterceptor, packetFilter));
interceptors.add(new InterceptorWrapper(packetInterceptor, packetFilter));
}
} }
/** /**
@ -186,17 +165,7 @@ class PacketWriter {
* @param packetInterceptor the packet interceptor to remove. * @param packetInterceptor the packet interceptor to remove.
*/ */
public void removePacketInterceptor(PacketInterceptor packetInterceptor) { public void removePacketInterceptor(PacketInterceptor packetInterceptor) {
synchronized (interceptors) { interceptors.remove(packetInterceptor);
for (int i=0; i<interceptors.size(); i++) {
InterceptorWrapper wrapper = (InterceptorWrapper)interceptors.get(i);
if (wrapper != null && wrapper.packetInterceptor.equals(packetInterceptor)) {
interceptors.set(i, null);
// Set the flag to indicate that the interceptor list needs
// to be cleaned up.
interceptorDeleted = true;
}
}
}
} }
/** /**
@ -242,6 +211,14 @@ class PacketWriter {
} }
} }
/**
* Cleans up all resources used by the packet writer.
*/
void cleanup() {
interceptors.clear();
listeners.clear();
}
/** /**
* Returns the next available packet from the queue for writing. * Returns the next available packet from the queue for writing.
* *
@ -279,7 +256,9 @@ class PacketWriter {
} }
} }
} }
// Flush out the rest of the queue. // Flush out the rest of the queue. If the queue is extremely large, it's possible
// we won't have time to entirely flush it before the socket is forced closed
// by the shutdown process.
try { try {
synchronized (writer) { synchronized (writer) {
while (!queue.isEmpty()) { while (!queue.isEmpty()) {
@ -293,6 +272,9 @@ class PacketWriter {
e.printStackTrace(); e.printStackTrace();
} }
// Delete the queue contents (hopefully nothing is left).
queue.clear();
// Close the stream. // Close the stream.
try { try {
writer.write("</stream:stream>"); writer.write("</stream:stream>");
@ -320,30 +302,15 @@ class PacketWriter {
/** /**
* Process listeners. * Process listeners.
*
* @param packet the packet to process.
*/ */
private void processListeners(Packet packet) { private void processListeners(Packet packet) {
// Clean up null entries in the listeners list if the flag is set. List
// removes are done seperately so that the main notification process doesn't
// need to synchronize on the list.
synchronized (listeners) {
if (listenersDeleted) {
for (int i=listeners.size()-1; i>=0; i--) {
if (listeners.get(i) == null) {
listeners.remove(i);
}
}
listenersDeleted = false;
}
}
// Notify the listeners of the new sent packet // Notify the listeners of the new sent packet
int size = listeners.size(); for (ListenerWrapper listenerWrapper : listeners.values()) {
for (int i=0; i<size; i++) {
ListenerWrapper listenerWrapper = listeners.get(i);
if (listenerWrapper != null) {
listenerWrapper.notifyListener(packet); listenerWrapper.notifyListener(packet);
} }
} }
}
/** /**
* Process interceptors. Interceptors may modify the packet that is about to be sent. * Process interceptors. Interceptors may modify the packet that is about to be sent.
@ -355,29 +322,11 @@ class PacketWriter {
*/ */
private void processInterceptors(Packet packet) { private void processInterceptors(Packet packet) {
if (packet != null) { if (packet != null) {
// Clean up null entries in the interceptors list if the flag is set. List for (InterceptorWrapper interceptorWrapper : interceptors.values()) {
// removes are done seperately so that the main notification process doesn't
// need to synchronize on the list.
synchronized (interceptors) {
if (interceptorDeleted) {
for (int i=interceptors.size()-1; i>=0; i--) {
if (interceptors.get(i) == null) {
interceptors.remove(i);
}
}
interceptorDeleted = false;
}
}
// Notify the interceptors of the new packet to be sent
int size = interceptors.size();
for (int i=0; i<size; i++) {
InterceptorWrapper interceptorWrapper = (InterceptorWrapper)interceptors.get(i);
if (interceptorWrapper != null) {
interceptorWrapper.notifyListener(packet); interceptorWrapper.notifyListener(packet);
} }
} }
} }
}
/** /**
* Sends to the server a new stream element. This operation may be requested several times * Sends to the server a new stream element. This operation may be requested several times
@ -400,7 +349,7 @@ class PacketWriter {
/** /**
* A wrapper class to associate a packet filter with a listener. * A wrapper class to associate a packet filter with a listener.
*/ */
protected static class ListenerWrapper { private static class ListenerWrapper {
private PacketListener packetListener; private PacketListener packetListener;
private PacketFilter packetFilter; private PacketFilter packetFilter;

View file

@ -601,7 +601,13 @@ public class XMPPConnection {
/** /**
* Closes the connection by setting presence to unavailable then closing the stream to * Closes the connection by setting presence to unavailable then closing the stream to
* the XMPP server. The XMPPConnection can still be used for connecting to the server * the XMPP server. The XMPPConnection can still be used for connecting to the server
* again. * again.<p>
*
* This method cleans up all resources used by the connection. Therefore, the roster,
* listeners and other stateful objects cannot be re-used by simply calling connect()
* on this connection again. This is unlike the behavior during unexpected disconnects
* (and subsequent connections). In that case, all state is preserved to allow for
* more seamless error recovery.
*/ */
public void disconnect() { public void disconnect() {
disconnect(new Presence(Presence.Type.unavailable)); disconnect(new Presence(Presence.Type.unavailable));
@ -613,7 +619,13 @@ public class XMPPConnection {
* again. A custom unavilable presence is useful for communicating offline presence * again. A custom unavilable presence is useful for communicating offline presence
* information such as "On vacation". Typically, just the status text of the presence * information such as "On vacation". Typically, just the status text of the presence
* packet is set with online information, but most XMPP servers will deliver the full * packet is set with online information, but most XMPP servers will deliver the full
* presence packet with whatever data is set. * presence packet with whatever data is set.<p>
*
* This method cleans up all resources used by the connection. Therefore, the roster,
* listeners and other stateful objects cannot be re-used by simply calling connect()
* on this connection again. This is unlike the behavior during unexpected disconnects
* (and subsequent connections). In that case, all state is preserved to allow for
* more seamless error recovery.
* *
* @param unavailablePresence the presence packet to send during shutdown. * @param unavailablePresence the presence packet to send during shutdown.
*/ */
@ -624,7 +636,9 @@ public class XMPPConnection {
this.wasAuthenticated = false; this.wasAuthenticated = false;
packetWriter.cleanup();
packetWriter = null; packetWriter = null;
packetReader.cleanup();
packetReader = null; packetReader = null;
} }