package org.mercury_im.messenger.xmpp; import org.jivesoftware.smack.XMPPConnection; import org.jivesoftware.smack.chat2.Chat; import org.jivesoftware.smack.chat2.ChatManager; import org.jivesoftware.smack.chat2.IncomingChatMessageListener; import org.jivesoftware.smackx.sid.element.OriginIdElement; import org.jxmpp.jid.EntityBareJid; import org.jxmpp.jid.impl.JidCreate; import org.mercury_im.messenger.MessageCenter; import org.mercury_im.messenger.Messenger; import org.mercury_im.messenger.data.repository.AccountRepository; import org.mercury_im.messenger.data.repository.DirectChatRepository; import org.mercury_im.messenger.data.repository.MessageRepository; import org.mercury_im.messenger.data.repository.PeerRepository; import org.mercury_im.messenger.data.repository.Repositories; import org.mercury_im.messenger.entity.Account; import org.mercury_im.messenger.entity.chat.DirectChat; import org.mercury_im.messenger.entity.message.IMessage; import org.mercury_im.messenger.entity.message.IMessageMetadata; import org.mercury_im.messenger.entity.message.Message; import org.mercury_im.messenger.listener.IncomingDirectMessageListener; import java.util.LinkedHashSet; import java.util.Set; import javax.inject.Inject; import io.reactivex.Completable; import io.reactivex.disposables.CompositeDisposable; public class XmppDirectMessageCenter implements MessageCenter, IncomingChatMessageListener { private final PeerRepository peerRepository; private final AccountRepository accountRepository; private final DirectChatRepository directChatRepository; private final MessageRepository messageRepository; private final Messenger messenger; private final Account account; private final CompositeDisposable disposable = new CompositeDisposable(); private Set messageListeners = new LinkedHashSet<>(); @Inject public XmppDirectMessageCenter(Account account, Messenger messenger, Repositories repositories) { this.messenger = messenger; this.account = account; this.peerRepository = repositories.getPeerRepository(); this.accountRepository = repositories.getAccountRepository(); this.directChatRepository = repositories.getDirectChatRepository(); this.messageRepository = repositories.getMessageRepository(); XMPPConnection connection = getMessenger().getConnection(account).getConnection(); ChatManager.getInstanceFor(connection).addIncomingListener(this); } @Override public Messenger getMessenger() { return messenger; } @Override public Completable sendMessage(Message message, DirectChat chat) { ChatManager chatManager = getChatManager(chat); EntityBareJid accountAddress = JidCreate.entityBareFromOrThrowUnchecked(chat.getAccount().getAddress()); EntityBareJid peerAddress = JidCreate.entityBareFromOrThrowUnchecked(chat.getPeer().getAddress()); org.jivesoftware.smack.packet.Message smackMessage = new org.jivesoftware.smack.packet.Message(); smackMessage.setFrom(accountAddress); smackMessage.setTo(peerAddress); smackMessage.setType(org.jivesoftware.smack.packet.Message.Type.chat); String originId = OriginIdElement.addOriginId(smackMessage).getId(); String legacyStanzaId = smackMessage.getStanzaId(); IMessageMetadata metadata = new IMessageMetadata(); metadata.setLegacyStanzaId(legacyStanzaId); metadata.setOriginId(originId); message.setMetadata(metadata); Chat smackChat = chatManager.chatWith(peerAddress); return messageRepository.insertMessage(chat, message) .ignoreElement() .andThen(Completable.fromAction(() -> smackChat.send(smackMessage))); } @Override public void addIncomingMessageListener(IncomingDirectMessageListener listener) { messageListeners.add(listener); } protected ChatManager getChatManager(DirectChat chat) { MercuryConnection mercuryConnection = getMessenger().getConnection(chat.getAccount()); return ChatManager.getInstanceFor(mercuryConnection.getConnection()); } @Override public void newIncomingMessage(EntityBareJid from, org.jivesoftware.smack.packet.Message message, Chat chat) { disposable.add(peerRepository // get peer .getOrCreatePeer(account, from.asEntityBareJidString()) // get chat .flatMap(peer -> directChatRepository.getOrCreateChatWithPeer(peer)) // notify listeners .subscribe(chatEntity -> { for (IncomingDirectMessageListener listener : messageListeners) { listener.onIncomingDirectMessage(account, chatEntity, new IMessage()); } })); } }