From 7c847dad6a61e2b32622b23cfee134f931d0133f Mon Sep 17 00:00:00 2001 From: Matt Tucker Date: Fri, 16 Feb 2007 00:12:17 +0000 Subject: [PATCH] Switched from volatile collection to copy on write array. Fixes concurrency bugs and leaking resources, but may have performance ramifications. git-svn-id: http://svn.igniterealtime.org/svn/repos/smack/trunk@7159 b35dd754-fafc-0310-a699-88a17e54d16e --- .../org/jivesoftware/smack/PacketReader.java | 187 ++++-------------- .../smack/ReconnectionManager.java | 19 +- .../jivesoftware/smack/XMPPConnection.java | 10 +- 3 files changed, 41 insertions(+), 175 deletions(-) diff --git a/source/org/jivesoftware/smack/PacketReader.java b/source/org/jivesoftware/smack/PacketReader.java index eb4d07101..fd1f35545 100644 --- a/source/org/jivesoftware/smack/PacketReader.java +++ b/source/org/jivesoftware/smack/PacketReader.java @@ -33,6 +33,7 @@ import java.io.IOException; import java.util.*; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import java.util.concurrent.CopyOnWriteArrayList; /** * Listens for XML traffic from the XMPP server and parses it into packet objects. @@ -50,11 +51,10 @@ class PacketReader { private XMPPConnection connection; private XmlPullParser parser; private boolean done; - private final VolatileMemberCollection collectors = - new VolatileMemberCollection(50); - protected final VolatileMemberCollection listeners = new VolatileMemberCollection(50); + private List collectors = new CopyOnWriteArrayList(); + protected final List listeners = new CopyOnWriteArrayList(); protected final List connectionListeners = - new ArrayList(); + new CopyOnWriteArrayList(); private String connectionID = null; private Semaphore connectionSemaphore; @@ -122,11 +122,8 @@ class PacketReader { * @param packetFilter the packet filter to use. */ public void addPacketListener(PacketListener packetListener, PacketFilter packetFilter) { - ListenerWrapper wrapper = new ListenerWrapper(this, packetListener, - packetFilter); - synchronized (listeners) { - listeners.add(wrapper); - } + ListenerWrapper wrapper = new ListenerWrapper(this, packetListener, packetFilter); + listeners.add(wrapper); } /** @@ -135,7 +132,15 @@ class PacketReader { * @param packetListener the packet listener to remove. */ public void removePacketListener(PacketListener packetListener) { - listeners.remove(packetListener); + // Find the index of the wrapper in the list of listeners. This operation will + // work because of a special equals() implementation in the ListenerWrapper class. + int index = listeners.indexOf(packetListener); + if (index == -1) { + return; + } + ListenerWrapper wrapper = listeners.remove(index); + // Cancel the wrapper since it's been removed. + wrapper.cancel(); } /** @@ -180,19 +185,14 @@ class PacketReader { public void shutdown() { // Notify connection listeners of the connection closing if done hasn't already been set. if (!done) { - List listenersCopy; - synchronized (connectionListeners) { - // Make a copy since it's possible that a listener will be removed from the list - listenersCopy = new ArrayList(connectionListeners); - for (ConnectionListener listener : listenersCopy) { - try { - listener.connectionClosed(); - } - catch (Exception e) { - // Cath and print any exception so we can recover - // from a faulty listener and finish the shutdown process - e.printStackTrace(); - } + for (ConnectionListener listener : connectionListeners) { + try { + listener.connectionClosed(); + } + catch (Exception e) { + // Cath and print any exception so we can recover + // from a faulty listener and finish the shutdown process + e.printStackTrace(); } } } @@ -217,19 +217,14 @@ class PacketReader { // Print the stack trace to help catch the problem e.printStackTrace(); // Notify connection listeners of the error. - List listenersCopy; - synchronized (connectionListeners) { - // Make a copy since it's possible that a listener will be removed from the list - listenersCopy = new ArrayList(connectionListeners); - for (ConnectionListener listener : listenersCopy) { - try { - listener.connectionClosedOnError(e); - } - catch (Exception e2) { - // Cath and print any exception so we can recover - // from a faulty listener - e2.printStackTrace(); - } + for (ConnectionListener listener : connectionListeners) { + try { + listener.connectionClosedOnError(e); + } + catch (Exception e2) { + // Cath and print any exception so we can recover + // from a faulty listener + e2.printStackTrace(); } } @@ -290,9 +285,7 @@ class PacketReader { private void processListeners(Thread thread) { while (!done && thread == listenerThread) { boolean processedPacket = false; - Iterator it = listeners.getIterator(); - while (it.hasNext()) { - ListenerWrapper wrapper = (ListenerWrapper) it.next(); + for (ListenerWrapper wrapper: listeners) { processedPacket = processedPacket || wrapper.notifyListener(); } if (!processedPacket) { @@ -454,9 +447,7 @@ class PacketReader { } // Loop through all collectors and notify the appropriate ones. - Iterator it = collectors.getIterator(); - while (it.hasNext()) { - PacketCollector collector = (PacketCollector) it.next(); + for (PacketCollector collector: collectors) { collector.processPacket(packet); } @@ -837,117 +828,6 @@ class PacketReader { return bind; } - /** - * When an object is added it the first attempt is to add it to a 'null' space and when it is - * removed it is not removed from the list but instead the position is nulled so as not to - * interfere with list iteration as the Collection memebres are thought to be extermely - * volatile. In other words, many are added and deleted and 'null' values are skipped by the - * returned iterator. - */ - static class VolatileMemberCollection { - - private final Object mutex = new Object(); - private final ArrayList collectors; - private int nullIndex = -1; - private int[] nullArray; - - VolatileMemberCollection(int initialCapacity) { - collectors = new ArrayList(initialCapacity); - nullArray = new int[initialCapacity]; - } - - public void add(E member) { - synchronized (mutex) { - if (nullIndex < 0) { - ensureCapacity(); - collectors.add(member); - } - else { - collectors.set(nullArray[nullIndex--], member); - } - } - } - - private void ensureCapacity() { - int current = nullArray.length; - if (collectors.size() + 1 >= current) { - int newCapacity = current * 2; - int oldData[] = nullArray; - - collectors.ensureCapacity(newCapacity); - nullArray = new int[newCapacity]; - System.arraycopy(oldData, 0, nullArray, 0, nullIndex + 1); - } - } - - public void remove(E member) { - synchronized (mutex) { - for (int i = collectors.size()-1; i >= 0; i--) { - E element = collectors.get(i); - if (element != null && element.equals(member)) { - collectors.set(i, null); - nullArray[++nullIndex] = i; - return; - } - } - } - } - - /** - * One thread should be using an iterator at a time. - * - * @return Iterator over PacketCollector. - */ - public Iterator getIterator() { - return new Iterator() { - private int index = 0; - private Object next; - private int size = collectors.size(); - - public void remove() { - } - - public boolean hasNext() { - return next != null || grabNext() != null; - } - - private Object grabNext() { - Object next; - while (index < size) { - next = collectors.get(index++); - if (next != null) { - this.next = next; - return next; - } - } - this.next = null; - return null; - } - - public Object next() { - Object toReturn = (this.next != null ? this.next : grabNext()); - this.next = null; - return toReturn; - } - }; - } - - /** - * Returns the number of elements in this collection. - * - * @return the number of elements in this collection - */ - public int size() { - int size = 0; - for (E element : collectors) { - if (element != null) { - size++; - } - } - return size; - } - } - /** * A wrapper class to associate a packet collector with a listener. */ @@ -970,6 +850,7 @@ class PacketReader { if (object instanceof ListenerWrapper) { return ((ListenerWrapper)object).packetListener.equals(this.packetListener); } + // If the packet listener is equal to the wrapped packet listener, return true. else if (object instanceof PacketListener) { return object.equals(this.packetListener); } diff --git a/source/org/jivesoftware/smack/ReconnectionManager.java b/source/org/jivesoftware/smack/ReconnectionManager.java index b43d72a01..f4c0544e3 100644 --- a/source/org/jivesoftware/smack/ReconnectionManager.java +++ b/source/org/jivesoftware/smack/ReconnectionManager.java @@ -184,13 +184,8 @@ public class ReconnectionManager implements ConnectionListener { protected void notifyReconnectionFailed(Exception exception) { List listenersCopy; if (isReconnectionAllowed()) { - synchronized (connection.packetReader.connectionListeners) { - // Makes a copy since it's possible that a listener will be removed from the list - listenersCopy = new ArrayList( - connection.packetReader.connectionListeners); - for (ConnectionListener listener : listenersCopy) { - listener.reconnectionFailed(exception); - } + for (ConnectionListener listener : connection.packetReader.connectionListeners) { + listener.reconnectionFailed(exception); } } } @@ -201,15 +196,9 @@ public class ReconnectionManager implements ConnectionListener { * @param seconds the number of seconds that a reconnection will be attempted in. */ protected void notifyAttemptToReconnectIn(int seconds) { - List listenersCopy; if (isReconnectionAllowed()) { - synchronized (connection.packetReader.connectionListeners) { - // Makes a copy since it's possible that a listener will be removed from the list - listenersCopy = new ArrayList( - connection.packetReader.connectionListeners); - for (ConnectionListener listener : listenersCopy) { - listener.reconnectingIn(seconds); - } + for (ConnectionListener listener : connection.packetReader.connectionListeners) { + listener.reconnectingIn(seconds); } } } diff --git a/source/org/jivesoftware/smack/XMPPConnection.java b/source/org/jivesoftware/smack/XMPPConnection.java index 1bbf02815..5221e3c62 100644 --- a/source/org/jivesoftware/smack/XMPPConnection.java +++ b/source/org/jivesoftware/smack/XMPPConnection.java @@ -744,10 +744,8 @@ public class XMPPConnection { if (connectionListener == null) { return; } - synchronized (packetReader.connectionListeners) { - if (!packetReader.connectionListeners.contains(connectionListener)) { - packetReader.connectionListeners.add(connectionListener); - } + if (!packetReader.connectionListeners.contains(connectionListener)) { + packetReader.connectionListeners.add(connectionListener); } } @@ -757,9 +755,7 @@ public class XMPPConnection { * @param connectionListener a connection listener. */ public void removeConnectionListener(ConnectionListener connectionListener) { - synchronized (packetReader.connectionListeners) { - packetReader.connectionListeners.remove(connectionListener); - } + packetReader.connectionListeners.remove(connectionListener); } /**