Switched from volatile collection to copy on write array. Fixes concurrency bugs and leaking resources, but may have performance ramifications.

git-svn-id: http://svn.igniterealtime.org/svn/repos/smack/trunk@7159 b35dd754-fafc-0310-a699-88a17e54d16e
This commit is contained in:
Matt Tucker 2007-02-16 00:12:17 +00:00 committed by matt
parent 4cfbf00e48
commit 7c847dad6a
3 changed files with 41 additions and 175 deletions

View File

@ -33,6 +33,7 @@ import java.io.IOException;
import java.util.*;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.CopyOnWriteArrayList;
/**
* Listens for XML traffic from the XMPP server and parses it into packet objects.
@ -50,11 +51,10 @@ class PacketReader {
private XMPPConnection connection;
private XmlPullParser parser;
private boolean done;
private final VolatileMemberCollection<PacketCollector> collectors =
new VolatileMemberCollection<PacketCollector>(50);
protected final VolatileMemberCollection listeners = new VolatileMemberCollection(50);
private List<PacketCollector> collectors = new CopyOnWriteArrayList<PacketCollector>();
protected final List<ListenerWrapper> listeners = new CopyOnWriteArrayList<ListenerWrapper>();
protected final List<ConnectionListener> connectionListeners =
new ArrayList<ConnectionListener>();
new CopyOnWriteArrayList<ConnectionListener>();
private String connectionID = null;
private Semaphore connectionSemaphore;
@ -122,11 +122,8 @@ class PacketReader {
* @param packetFilter the packet filter to use.
*/
public void addPacketListener(PacketListener packetListener, PacketFilter packetFilter) {
ListenerWrapper wrapper = new ListenerWrapper(this, packetListener,
packetFilter);
synchronized (listeners) {
listeners.add(wrapper);
}
ListenerWrapper wrapper = new ListenerWrapper(this, packetListener, packetFilter);
listeners.add(wrapper);
}
/**
@ -135,7 +132,15 @@ class PacketReader {
* @param packetListener the packet listener to remove.
*/
public void removePacketListener(PacketListener packetListener) {
listeners.remove(packetListener);
// Find the index of the wrapper in the list of listeners. This operation will
// work because of a special equals() implementation in the ListenerWrapper class.
int index = listeners.indexOf(packetListener);
if (index == -1) {
return;
}
ListenerWrapper wrapper = listeners.remove(index);
// Cancel the wrapper since it's been removed.
wrapper.cancel();
}
/**
@ -180,19 +185,14 @@ class PacketReader {
public void shutdown() {
// Notify connection listeners of the connection closing if done hasn't already been set.
if (!done) {
List<ConnectionListener> listenersCopy;
synchronized (connectionListeners) {
// Make a copy since it's possible that a listener will be removed from the list
listenersCopy = new ArrayList<ConnectionListener>(connectionListeners);
for (ConnectionListener listener : listenersCopy) {
try {
listener.connectionClosed();
}
catch (Exception e) {
// Cath and print any exception so we can recover
// from a faulty listener and finish the shutdown process
e.printStackTrace();
}
for (ConnectionListener listener : connectionListeners) {
try {
listener.connectionClosed();
}
catch (Exception e) {
// Cath and print any exception so we can recover
// from a faulty listener and finish the shutdown process
e.printStackTrace();
}
}
}
@ -217,19 +217,14 @@ class PacketReader {
// Print the stack trace to help catch the problem
e.printStackTrace();
// Notify connection listeners of the error.
List<ConnectionListener> listenersCopy;
synchronized (connectionListeners) {
// Make a copy since it's possible that a listener will be removed from the list
listenersCopy = new ArrayList<ConnectionListener>(connectionListeners);
for (ConnectionListener listener : listenersCopy) {
try {
listener.connectionClosedOnError(e);
}
catch (Exception e2) {
// Cath and print any exception so we can recover
// from a faulty listener
e2.printStackTrace();
}
for (ConnectionListener listener : connectionListeners) {
try {
listener.connectionClosedOnError(e);
}
catch (Exception e2) {
// Cath and print any exception so we can recover
// from a faulty listener
e2.printStackTrace();
}
}
@ -290,9 +285,7 @@ class PacketReader {
private void processListeners(Thread thread) {
while (!done && thread == listenerThread) {
boolean processedPacket = false;
Iterator it = listeners.getIterator();
while (it.hasNext()) {
ListenerWrapper wrapper = (ListenerWrapper) it.next();
for (ListenerWrapper wrapper: listeners) {
processedPacket = processedPacket || wrapper.notifyListener();
}
if (!processedPacket) {
@ -454,9 +447,7 @@ class PacketReader {
}
// Loop through all collectors and notify the appropriate ones.
Iterator it = collectors.getIterator();
while (it.hasNext()) {
PacketCollector collector = (PacketCollector) it.next();
for (PacketCollector collector: collectors) {
collector.processPacket(packet);
}
@ -837,117 +828,6 @@ class PacketReader {
return bind;
}
/**
* When an object is added it the first attempt is to add it to a 'null' space and when it is
* removed it is not removed from the list but instead the position is nulled so as not to
* interfere with list iteration as the Collection memebres are thought to be extermely
* volatile. In other words, many are added and deleted and 'null' values are skipped by the
* returned iterator.
*/
static class VolatileMemberCollection<E> {
private final Object mutex = new Object();
private final ArrayList<E> collectors;
private int nullIndex = -1;
private int[] nullArray;
VolatileMemberCollection(int initialCapacity) {
collectors = new ArrayList<E>(initialCapacity);
nullArray = new int[initialCapacity];
}
public void add(E member) {
synchronized (mutex) {
if (nullIndex < 0) {
ensureCapacity();
collectors.add(member);
}
else {
collectors.set(nullArray[nullIndex--], member);
}
}
}
private void ensureCapacity() {
int current = nullArray.length;
if (collectors.size() + 1 >= current) {
int newCapacity = current * 2;
int oldData[] = nullArray;
collectors.ensureCapacity(newCapacity);
nullArray = new int[newCapacity];
System.arraycopy(oldData, 0, nullArray, 0, nullIndex + 1);
}
}
public void remove(E member) {
synchronized (mutex) {
for (int i = collectors.size()-1; i >= 0; i--) {
E element = collectors.get(i);
if (element != null && element.equals(member)) {
collectors.set(i, null);
nullArray[++nullIndex] = i;
return;
}
}
}
}
/**
* One thread should be using an iterator at a time.
*
* @return Iterator over PacketCollector.
*/
public Iterator getIterator() {
return new Iterator() {
private int index = 0;
private Object next;
private int size = collectors.size();
public void remove() {
}
public boolean hasNext() {
return next != null || grabNext() != null;
}
private Object grabNext() {
Object next;
while (index < size) {
next = collectors.get(index++);
if (next != null) {
this.next = next;
return next;
}
}
this.next = null;
return null;
}
public Object next() {
Object toReturn = (this.next != null ? this.next : grabNext());
this.next = null;
return toReturn;
}
};
}
/**
* Returns the number of elements in this collection.
*
* @return the number of elements in this collection
*/
public int size() {
int size = 0;
for (E element : collectors) {
if (element != null) {
size++;
}
}
return size;
}
}
/**
* A wrapper class to associate a packet collector with a listener.
*/
@ -970,6 +850,7 @@ class PacketReader {
if (object instanceof ListenerWrapper) {
return ((ListenerWrapper)object).packetListener.equals(this.packetListener);
}
// If the packet listener is equal to the wrapped packet listener, return true.
else if (object instanceof PacketListener) {
return object.equals(this.packetListener);
}

View File

@ -184,13 +184,8 @@ public class ReconnectionManager implements ConnectionListener {
protected void notifyReconnectionFailed(Exception exception) {
List<ConnectionListener> listenersCopy;
if (isReconnectionAllowed()) {
synchronized (connection.packetReader.connectionListeners) {
// Makes a copy since it's possible that a listener will be removed from the list
listenersCopy = new ArrayList<ConnectionListener>(
connection.packetReader.connectionListeners);
for (ConnectionListener listener : listenersCopy) {
listener.reconnectionFailed(exception);
}
for (ConnectionListener listener : connection.packetReader.connectionListeners) {
listener.reconnectionFailed(exception);
}
}
}
@ -201,15 +196,9 @@ public class ReconnectionManager implements ConnectionListener {
* @param seconds the number of seconds that a reconnection will be attempted in.
*/
protected void notifyAttemptToReconnectIn(int seconds) {
List<ConnectionListener> listenersCopy;
if (isReconnectionAllowed()) {
synchronized (connection.packetReader.connectionListeners) {
// Makes a copy since it's possible that a listener will be removed from the list
listenersCopy = new ArrayList<ConnectionListener>(
connection.packetReader.connectionListeners);
for (ConnectionListener listener : listenersCopy) {
listener.reconnectingIn(seconds);
}
for (ConnectionListener listener : connection.packetReader.connectionListeners) {
listener.reconnectingIn(seconds);
}
}
}

View File

@ -744,10 +744,8 @@ public class XMPPConnection {
if (connectionListener == null) {
return;
}
synchronized (packetReader.connectionListeners) {
if (!packetReader.connectionListeners.contains(connectionListener)) {
packetReader.connectionListeners.add(connectionListener);
}
if (!packetReader.connectionListeners.contains(connectionListener)) {
packetReader.connectionListeners.add(connectionListener);
}
}
@ -757,9 +755,7 @@ public class XMPPConnection {
* @param connectionListener a connection listener.
*/
public void removeConnectionListener(ConnectionListener connectionListener) {
synchronized (packetReader.connectionListeners) {
packetReader.connectionListeners.remove(connectionListener);
}
packetReader.connectionListeners.remove(connectionListener);
}
/**