New logic for delivering messages without a thread ID to a Chat object. This improves compatibility with clients that don't support thread ID. Also some misc formatting updates.

git-svn-id: http://svn.igniterealtime.org/svn/repos/smack/trunk@2779 b35dd754-fafc-0310-a699-88a17e54d16e
This commit is contained in:
Matt Tucker 2005-09-05 20:00:45 +00:00 committed by matt
parent afd7e6f2d6
commit 06b7a0eacc
6 changed files with 148 additions and 93 deletions

View File

@ -24,15 +24,16 @@ import org.jivesoftware.smack.packet.Message;
import org.jivesoftware.smack.util.StringUtils;
import org.jivesoftware.smack.filter.*;
import java.util.*;
import java.lang.ref.WeakReference;
/**
* A chat is a series of messages sent between two users. Each chat can have
* a unique thread ID, which is used to track which messages are part of a particular
* conversation.<p>
*
* In some situations, it is better to have all messages from the other user delivered
* to a Chat rather than just the messages that have a particular thread ID. To
* enable this behavior, call {@link #setFilteredOnThreadID(boolean)} with
* <tt>false</tt> as the parameter.
* A chat is a series of messages sent between two users. Each chat has a unique
* thread ID, which is used to track which messages are part of a particular
* conversation. Some messages are sent without a thread ID, and some clients
* don't send thread IDs at all. Therefore, if a message without a thread ID
* arrives it is routed to the most recently created Chat with the message
* sender.
*
* @see XMPPConnection#createChat(String)
* @author Matt Tucker
@ -43,13 +44,7 @@ public class Chat {
* A prefix helps to make sure that ID's are unique across mutliple instances.
*/
private static String prefix = StringUtils.randomString(5);
/**
* True if only messages that have a matching threadID will be delivered to a Chat. When
* false, any message from the other participant will be delivered to a Chat.
*/
private static boolean filteredOnThreadID = true;
/**
* Keeps track of the current increment, which is appended to the prefix to
* forum a unique ID.
@ -71,6 +66,7 @@ public class Chat {
private String participant;
private PacketFilter messageFilter;
private PacketCollector messageCollector;
private Set listeners = new HashSet();
/**
* Creates a new chat with the specified user.
@ -81,10 +77,6 @@ public class Chat {
public Chat(XMPPConnection connection, String participant) {
// Automatically assign the next chat ID.
this(connection, participant, nextID());
// If not filtering on thread ID, force the thread ID for this Chat to be null.
if (!filteredOnThreadID) {
this.threadID = null;
}
}
/**
@ -99,42 +91,17 @@ public class Chat {
this.participant = participant;
this.threadID = threadID;
if (filteredOnThreadID) {
// Filter the messages whose thread equals Chat's id
messageFilter = new ThreadFilter(threadID);
}
else {
// Filter the messages of type "chat" and sender equals Chat's participant
messageFilter =
new OrFilter(
new AndFilter(
new MessageTypeFilter(Message.Type.CHAT),
new FromContainsFilter(participant)),
new ThreadFilter(threadID));
}
// Register with the map of chats so that messages with no thread ID
// set will be delivered to this Chat.
connection.chats.put(StringUtils.parseBareAddress(participant),
new WeakReference(this));
// Filter the messages whose thread equals Chat's id
messageFilter = new ThreadFilter(threadID);
messageCollector = connection.createPacketCollector(messageFilter);
}
/**
* Returns true if only messages that have a matching threadID will be delivered to Chat
* instances. When false, any message from the other participant will be delivered to Chat instances.
*
* @return true if messages delivered to Chat instances are filtered on thread ID.
*/
public static boolean isFilteredOnThreadID() {
return filteredOnThreadID;
}
/**
* Sets whether only messages that have a matching threadID will be delivered to Chat instances.
* When false, any message from the other participant will be delivered to a Chat instances.
*
* @param value true if messages delivered to Chat instances are filtered on thread ID.
*/
public static void setFilteredOnThreadID(boolean value) {
filteredOnThreadID = value;
}
/**
* Returns the thread id associated with this chat, which corresponds to the
* <tt>thread</tt> field of XMPP messages. This method may return <tt>null</tt>
@ -252,6 +219,41 @@ public class Chat {
*/
public void addMessageListener(PacketListener listener) {
connection.addPacketListener(listener, messageFilter);
// Keep track of the listener so that we can manually deliver extra
// messages to it later if needed.
synchronized (listeners) {
listeners.add(new WeakReference(listener));
}
}
/**
* Delivers a message directly to this chat, which will add the message
* to the collector and deliver it to all listeners registered with the
* Chat. This is used by the XMPPConnection class to deliver messages
* without a thread ID.
*
* @param message the message.
*/
void deliver(Message message) {
// Because the collector and listeners are expecting a thread ID with
// a specific value, set the thread ID on the message even though it
// probably never had one.
message.setThread(threadID);
messageCollector.processPacket(message);
synchronized (listeners) {
for (Iterator i=listeners.iterator(); i.hasNext(); ) {
WeakReference listenerRef = (WeakReference)i.next();
PacketListener listener;
if ((listener = (PacketListener)listenerRef.get()) != null) {
listener.processPacket(message);
}
// If the reference was cleared, remove it from the set.
else {
i.remove();
}
}
}
}
public void finalize() throws Throwable {
@ -261,6 +263,8 @@ public class Chat {
messageCollector.cancel();
}
}
catch (Exception e) {}
catch (Exception e) {
// Ignore.
}
}
}

View File

@ -76,10 +76,7 @@ public class PacketCollector {
*/
public void cancel() {
// If the packet collector has already been cancelled, do nothing.
if (cancelled) {
return;
}
else {
if (!cancelled) {
cancelled = true;
// Remove object from collectors list by setting the value in the
// list at the correct index to null. The collector thread will
@ -130,7 +127,9 @@ public class PacketCollector {
try {
wait();
}
catch (InterruptedException ie) { }
catch (InterruptedException ie) {
// Ignore.
}
}
return (Packet)resultQueue.removeLast();
}
@ -149,7 +148,9 @@ public class PacketCollector {
try {
wait(timeout);
}
catch (InterruptedException ie) { }
catch (InterruptedException ie) {
// Ignore.
}
}
// If still no result, return null.
if (resultQueue.isEmpty()) {

View File

@ -165,7 +165,9 @@ class PacketReader {
}
}
}
catch (InterruptedException ie) { }
catch (InterruptedException ie) {
// Ignore.
}
if (connectionID == null) {
throw new XMPPException("Connection failed. No response from server.");
}
@ -229,7 +231,6 @@ class PacketReader {
* Process listeners.
*/
private void processListeners() {
boolean processedPacket = false;
while (!done) {
synchronized (listeners) {
if (listeners.size() > 0) {
@ -240,7 +241,7 @@ class PacketReader {
}
}
}
processedPacket = false;
boolean processedPacket = false;
int size = listeners.size();
for (int i=0; i<size; i++) {
ListenerWrapper wrapper = (ListenerWrapper)listeners.get(i);
@ -250,9 +251,14 @@ class PacketReader {
}
if (!processedPacket) {
try {
Thread.sleep(100);
// Wait until more packets are ready to be processed.
synchronized (listenerThread) {
listenerThread.wait();
}
}
catch (InterruptedException ie) {
// Ignore.
}
catch (InterruptedException ie) { }
}
}
}
@ -376,6 +382,11 @@ class PacketReader {
collector.processPacket(packet);
}
}
// Notify the listener thread that packets are waiting.
synchronized (listenerThread) {
listenerThread.notifyAll();
}
}
private void parseFeatures(XmlPullParser parser) throws Exception {
@ -584,9 +595,8 @@ class PacketReader {
RosterPacket.ItemType type = RosterPacket.ItemType.fromString(subscription);
item.setItemType(type);
}
if (parser.getName().equals("group")) {
String groupName = parser.nextText();
item.addGroupName(groupName);
if (parser.getName().equals("group") && item!= null) {
item.addGroupName(parser.nextText());
}
}
else if (eventType == XmlPullParser.END_TAG) {

View File

@ -172,8 +172,8 @@ public class SASLAuthentication implements UserAuthentication {
// Send the packet
connection.sendPacket(bindResource);
// Wait up to a certain number of seconds for a response from the server.
Bind response = (Bind) collector
.nextResult(SmackConfiguration.getPacketReplyTimeout());
Bind response = (Bind)collector.nextResult(
SmackConfiguration.getPacketReplyTimeout());
collector.cancel();
if (response == null) {
throw new XMPPException("No response from the server.");

View File

@ -23,6 +23,7 @@ package org.jivesoftware.smack;
import org.jivesoftware.smack.debugger.SmackDebugger;
import org.jivesoftware.smack.filter.PacketFilter;
import org.jivesoftware.smack.filter.PacketIDFilter;
import org.jivesoftware.smack.filter.PacketTypeFilter;
import org.jivesoftware.smack.packet.*;
import org.jivesoftware.smack.util.StringUtils;
@ -31,10 +32,13 @@ import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocket;
import java.io.*;
import java.lang.reflect.Constructor;
import java.lang.ref.WeakReference;
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
/**
* Creates a connection to a XMPP server. A simple use of this API might
@ -77,6 +81,7 @@ public class XMPPConnection {
DEBUG_ENABLED = Boolean.getBoolean("smack.debugEnabled");
}
catch (Exception e) {
// Ignore.
}
// Ensure the SmackConfiguration class is loaded by calling a method in it.
SmackConfiguration.getVersion();
@ -91,6 +96,7 @@ public class XMPPConnection {
String host;
int port;
Socket socket;
/**
* Hostname of the XMPP server. Usually servers use the same service name as the name
* of the server. However, there are some servers like google where host would be
@ -115,6 +121,14 @@ public class XMPPConnection {
Writer writer;
Reader reader;
/**
* A map between JIDs and the most recently created Chat object with that JID.
* Reference to the Chat is stored via a WeakReference so that the map
* does not interfere with garbage collection. The map of chats must be stored
* with each connection.
*/
Map chats = new HashMap();
/**
* Creates a new connection to the specified XMPP server. The default port of 5222 will
* be used. The IP address of the server is assumed to match the service name.
@ -487,7 +501,9 @@ public class XMPPConnection {
}
}
}
catch (InterruptedException ie) { }
catch (InterruptedException ie) {
// Ignore.
}
}
return roster;
}
@ -587,24 +603,26 @@ public class XMPPConnection {
Thread.sleep(150);
}
catch (Exception e) {
// Ignore.
}
// Close down the readers and writers.
if (reader != null)
{
try { reader.close(); } catch (Throwable ignore) { }
reader = null;
}
if (writer != null)
{
try { writer.close(); } catch (Throwable ignore) { }
writer = null;
}
// Close down the readers and writers.
if (reader != null)
{
try { reader.close(); } catch (Throwable ignore) { /* ignore */ }
reader = null;
}
if (writer != null)
{
try { writer.close(); } catch (Throwable ignore) { /* ignore */ }
writer = null;
}
try {
socket.close();
}
catch (Exception e) {
// Ignore.
}
authenticated = false;
connected = false;
@ -770,30 +788,55 @@ public class XMPPConnection {
// Notify that a new connection has been established
connectionEstablished(this);
// Add a listener for all message packets so that we can deliver errant
// messages to the best Chat instance available.
addPacketListener(new PacketListener() {
public void processPacket(Packet packet) {
Message message = (Message)packet;
// Ignore any messages with a thread ID, as they will likely
// already be associated with a Chat. This will miss messages
// with new thread ID values, but we can only assume that a
// listener is registered to deal with this case.
if (message.getThread() == null) {
WeakReference chatRef = (WeakReference)chats.get(
StringUtils.parseBareAddress(message.getFrom()));
if (chatRef != null) {
// Do some extra clean-up if the reference was cleared.
Chat chat;
if ((chat = (Chat)chatRef.get()) == null) {
chats.remove(message.getFrom());
}
else {
chat.deliver(message);
}
}
}
}
}, new PacketTypeFilter(Message.class));
}
catch (XMPPException ex)
{
catch (XMPPException ex) {
// An exception occurred in setting up the connection. Make sure we shut down the
// readers and writers and close the socket.
if (packetWriter != null) {
try { packetWriter.shutdown(); } catch (Throwable ignore) { }
try { packetWriter.shutdown(); } catch (Throwable ignore) { /* ignore */ }
packetWriter = null;
}
if (packetReader != null) {
try { packetReader.shutdown(); } catch (Throwable ignore) { }
try { packetReader.shutdown(); } catch (Throwable ignore) { /* ignore */ }
packetReader = null;
}
if (reader != null) {
try { reader.close(); } catch (Throwable ignore) { }
try { reader.close(); } catch (Throwable ignore) { /* ignore */ }
reader = null;
}
if (writer != null) {
try { writer.close(); } catch (Throwable ignore) { }
try { writer.close(); } catch (Throwable ignore) { /* ignore */}
writer = null;
}
if (socket != null) {
try { socket.close(); } catch (Exception e) { }
try { socket.close(); } catch (Exception e) { /* ignore */ }
socket = null;
}
authenticated = false;
@ -921,7 +964,8 @@ public class XMPPConnection {
try {
writer.write("<starttls xmlns=\"urn:ietf:params:xml:ns:xmpp-tls\"/>");
writer.flush();
} catch (IOException e) {
}
catch (IOException e) {
packetReader.notifyConnectionError(e);
}
}

View File

@ -31,8 +31,6 @@ public class PresencePriorityTest extends SmackTestCase {
* Connection(1) has logged from two different places with different presence priorities.
*/
public void testMessageToHighestPriority() {
boolean wasFiltering = Chat.isFilteredOnThreadID();
Chat.setFilteredOnThreadID(false);
XMPPConnection conn = null;
try {
// User_1 will log in again using another resource
@ -118,8 +116,6 @@ public class PresencePriorityTest extends SmackTestCase {
fail(e.getMessage());
}
finally {
// Restore the previous filtering value so we don't affect other test cases
Chat.setFilteredOnThreadID(wasFiltering);
if (conn != null) {
conn.close();
}