package org.mercury_im.messenger.core.xmpp; import org.jivesoftware.smack.XMPPConnection; import org.jivesoftware.smack.chat2.Chat; import org.jivesoftware.smack.chat2.ChatManager; import org.jivesoftware.smack.chat2.IncomingChatMessageListener; import org.jivesoftware.smack.packet.MessageBuilder; import org.jivesoftware.smackx.sid.element.OriginIdElement; import org.jxmpp.jid.EntityBareJid; import org.jxmpp.jid.impl.JidCreate; import org.mercury_im.messenger.core.MessageCenter; import org.mercury_im.messenger.core.Messenger; import org.mercury_im.messenger.core.data.repository.AccountRepository; import org.mercury_im.messenger.core.data.repository.DirectChatRepository; import org.mercury_im.messenger.core.data.repository.MessageRepository; import org.mercury_im.messenger.core.data.repository.PeerRepository; import org.mercury_im.messenger.core.data.repository.Repositories; import org.mercury_im.messenger.entity.Account; import org.mercury_im.messenger.entity.chat.DirectChat; import org.mercury_im.messenger.entity.message.Message; import org.mercury_im.messenger.core.listener.IncomingDirectMessageListener; import java.util.LinkedHashSet; import java.util.Set; import javax.inject.Inject; import io.reactivex.Completable; import io.reactivex.disposables.CompositeDisposable; public class XmppDirectMessageCenter implements MessageCenter, IncomingChatMessageListener { private final PeerRepository peerRepository; private final AccountRepository accountRepository; private final DirectChatRepository directChatRepository; private final MessageRepository messageRepository; private final Messenger messenger; private final Account account; private final CompositeDisposable disposable = new CompositeDisposable(); private Set messageListeners = new LinkedHashSet<>(); @Inject public XmppDirectMessageCenter(Account account, Messenger messenger, Repositories repositories) { this.messenger = messenger; this.account = account; this.peerRepository = repositories.getPeerRepository(); this.accountRepository = repositories.getAccountRepository(); this.directChatRepository = repositories.getDirectChatRepository(); this.messageRepository = repositories.getMessageRepository(); XMPPConnection connection = getMessenger().getConnectionManager().getConnection(account).getConnection(); ChatManager.getInstanceFor(connection).addIncomingListener(this); } @Override public Messenger getMessenger() { return messenger; } @Override public Completable sendMessage(Message message, DirectChat chat) { ChatManager chatManager = getChatManager(chat); EntityBareJid accountAddress = JidCreate.entityBareFromOrThrowUnchecked(chat.getAccount().getAddress()); EntityBareJid peerAddress = JidCreate.entityBareFromOrThrowUnchecked(chat.getPeer().getAddress()); MessageBuilder messageBuilder = MessageBuilder.buildMessage() .from(accountAddress) .to(peerAddress) .ofType(org.jivesoftware.smack.packet.Message.Type.chat); String originId = OriginIdElement.addTo(messageBuilder).getId(); String legacyStanzaId = messageBuilder.getStanzaId(); message.setLegacyStanzaId(legacyStanzaId); message.setOriginId(originId); Chat smackChat = chatManager.chatWith(peerAddress); return messageRepository.insertMessage(chat, message) .ignoreElement() .andThen(Completable.fromAction(() -> smackChat.send(messageBuilder.build()))); } @Override public void addIncomingMessageListener(IncomingDirectMessageListener listener) { messageListeners.add(listener); } protected ChatManager getChatManager(DirectChat chat) { MercuryConnection mercuryConnection = getMessenger().getConnectionManager().getConnection(chat.getAccount()); return ChatManager.getInstanceFor(mercuryConnection.getConnection()); } @Override public void newIncomingMessage(EntityBareJid from, org.jivesoftware.smack.packet.Message message, Chat chat) { disposable.add(peerRepository // get peer .getOrCreatePeer(account, from) // get chat .flatMap(directChatRepository::getOrCreateChatWithPeer) // notify listeners .subscribe(chatEntity -> { for (IncomingDirectMessageListener listener : messageListeners) { listener.onIncomingDirectMessage(account, chatEntity, new Message()); } })); } }