diff --git a/domain/src/main/java/org/mercury_im/messenger/Messenger.java b/domain/src/main/java/org/mercury_im/messenger/Messenger.java index 427a156..a3cb79d 100644 --- a/domain/src/main/java/org/mercury_im/messenger/Messenger.java +++ b/domain/src/main/java/org/mercury_im/messenger/Messenger.java @@ -1,15 +1,31 @@ package org.mercury_im.messenger; +import org.mercury_im.messenger.data.repository.AccountRepository; +import org.mercury_im.messenger.data.repository.PeerRepository; +import org.mercury_im.messenger.entity.contact.Peer; import org.mercury_im.messenger.transport.connection.ConnectionMethod; import org.mercury_im.messenger.entity.Account; import java.util.HashMap; import java.util.Map; +import javax.inject.Inject; + public class Messenger { private final Map connections = new HashMap<>(); + @Inject + AccountRepository accountRepository; + + @Inject + PeerRepository peerRepository; + + @Inject + public Messenger() { + + } + public void addConnection(ConnectionMethod connection) { connections.put(connection.getAccount().getId(), connection); } diff --git a/domain/src/main/java/org/mercury_im/messenger/data/repository/DirectChatRepository.java b/domain/src/main/java/org/mercury_im/messenger/data/repository/DirectChatRepository.java index 5b04a74..8dbaaf2 100644 --- a/domain/src/main/java/org/mercury_im/messenger/data/repository/DirectChatRepository.java +++ b/domain/src/main/java/org/mercury_im/messenger/data/repository/DirectChatRepository.java @@ -1,9 +1,9 @@ package org.mercury_im.messenger.data.repository; +import org.mercury_im.messenger.data.util.Optional; import org.mercury_im.messenger.entity.chat.DirectChat; import org.mercury_im.messenger.entity.contact.Peer; import org.mercury_im.messenger.entity.message.Message; -import org.mercury_im.messenger.data.util.Optional; import java.util.List; @@ -20,6 +20,8 @@ public interface DirectChatRepository { Maybe getDirectChat(long chatId); + Single getOrCreateChatWithPeer(Peer peer); + Observable> observeDirectChatByPeer(Peer peer); Maybe getDirectChatByPeer(Peer peer); diff --git a/xmpp/src/main/java/org/mercury_im/messenger/domain/xmpp/XmppDirectMessageCenter.java b/xmpp/src/main/java/org/mercury_im/messenger/domain/xmpp/XmppDirectMessageCenter.java index abaa683..4ff920c 100644 --- a/xmpp/src/main/java/org/mercury_im/messenger/domain/xmpp/XmppDirectMessageCenter.java +++ b/xmpp/src/main/java/org/mercury_im/messenger/domain/xmpp/XmppDirectMessageCenter.java @@ -8,8 +8,12 @@ import org.jivesoftware.smackx.sid.element.OriginIdElement; import org.jxmpp.jid.EntityBareJid; import org.jxmpp.jid.impl.JidCreate; import org.mercury_im.messenger.Messenger; +import org.mercury_im.messenger.data.repository.AccountRepository; +import org.mercury_im.messenger.data.repository.DirectChatRepository; +import org.mercury_im.messenger.data.repository.PeerRepository; import org.mercury_im.messenger.entity.Account; import org.mercury_im.messenger.entity.contact.Peer; +import org.mercury_im.messenger.entity.message.IMessage; import org.mercury_im.messenger.transport.listener.IncomingDirectMessageListener; import org.mercury_im.messenger.entity.message.Message; import org.mercury_im.messenger.MessageCenter; @@ -22,23 +26,41 @@ import java.util.List; import java.util.Set; import java.util.TreeSet; +import javax.inject.Inject; + import io.reactivex.Completable; +import io.reactivex.disposables.CompositeDisposable; +import io.reactivex.disposables.Disposable; import sun.reflect.generics.tree.Tree; public class XmppDirectMessageCenter - implements MessageCenter { + implements MessageCenter, IncomingChatMessageListener { + + @Inject + PeerRepository peerRepository; + + @Inject + AccountRepository accountRepository; + + @Inject + DirectChatRepository directChatRepository; private final Messenger messenger; private final Account account; + private final CompositeDisposable disposable = new CompositeDisposable(); + private Set messageListeners = new LinkedHashSet<>(); + @Inject public XmppDirectMessageCenter(Account account, Messenger messenger) { this.messenger = messenger; this.account = account; + XMPPConnection connection = ((XmppTcpConnectionMethod) getMessenger() .getConnection(account)).getConnection(); + ChatManager.getInstanceFor(connection).addIncomingListener(this); } @Override @@ -77,4 +99,19 @@ public class XmppDirectMessageCenter XmppTcpConnectionMethod connectionMethod = (XmppTcpConnectionMethod) getMessenger().getConnection(chat.getAccount()); return ChatManager.getInstanceFor(connectionMethod.getConnection()); } + + @Override + public void newIncomingMessage(EntityBareJid from, org.jivesoftware.smack.packet.Message message, Chat chat) { + disposable.add(peerRepository + // get peer + .getOrCreatePeer(account, from.asEntityBareJidString()) + // get chat + .flatMap(peer -> directChatRepository.getOrCreateChatWithPeer(peer)) + // notify listeners + .subscribe(chatEntity -> { + for (IncomingDirectMessageListener listener : messageListeners) { + listener.onIncomingDirectMessage(account, chatEntity, new IMessage()); + } + })); + } }