package org.mercury_im.messenger.core.stores; import org.jivesoftware.smack.chat2.Chat; import org.jivesoftware.smack.chat2.ChatManager; import org.jivesoftware.smack.packet.Message; import org.jivesoftware.smackx.carbons.CarbonManager; import org.jivesoftware.smackx.carbons.packet.CarbonExtension; import org.jivesoftware.smackx.delay.DelayInformationManager; import org.jivesoftware.smackx.delay.packet.DelayInformation; import org.jivesoftware.smackx.mam.MamManager; import org.jivesoftware.smackx.sid.element.OriginIdElement; import org.jivesoftware.smackx.sid.element.StanzaIdElement; import org.jxmpp.jid.EntityBareJid; import org.jxmpp.jid.impl.JidCreate; import org.mercury_im.messenger.core.NotificationManager; import org.mercury_im.messenger.core.connection.MercuryConnection; import org.mercury_im.messenger.xmpp.model.ChatModel; import org.mercury_im.messenger.xmpp.model.ContactModel; import org.mercury_im.messenger.xmpp.model.EntityModel; import org.mercury_im.messenger.xmpp.model.LastChatMessageRelation; import org.mercury_im.messenger.xmpp.model.MessageModel; import org.mercury_im.messenger.xmpp.repository.ChatRepository; import org.mercury_im.messenger.xmpp.repository.MessageRepository; import org.mercury_im.messenger.xmpp.repository.RosterRepository; import org.mercury_im.messenger.xmpp.util.ChatAndPossiblyContact; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.logging.Level; import java.util.logging.Logger; import io.reactivex.Completable; import io.reactivex.disposables.CompositeDisposable; import io.reactivex.schedulers.Schedulers; public class PlainMessageStore { private static final Logger LOGGER = Logger.getLogger(PlainMessageStore.class.getName()); private static final CompositeDisposable disposable = new CompositeDisposable(); private final RosterRepository rosterRepository; private final ChatRepository chatRepository; private final MessageRepository messageRepository; private final NotificationManager notificationManager; public PlainMessageStore(RosterRepository rosterRepository, ChatRepository chatRepository, MessageRepository messageRepository, NotificationManager notificationManager) { this.rosterRepository = rosterRepository; this.chatRepository = chatRepository; this.messageRepository = messageRepository; this.notificationManager = notificationManager; } public void handleIncomingMessage(long accountId, EntityBareJid from, Message message, Chat chat) { if (message.getBody() == null) { return; } Completable.fromAction(() -> { EntityModel entityModel = rosterRepository.getOrCreateEntity(accountId, from) .blockingGet(); ContactModel contactModel = rosterRepository.getContact(accountId, entityModel.getJid()).blockingFirst().firstOrNull(); ChatModel chatModel = chatRepository.getChatWith(entityModel).blockingFirst().firstOr(() -> { ChatModel freshChatModel = new ChatModel(); freshChatModel.setPeer(entityModel); freshChatModel.setDisplayed(true); return freshChatModel; }); chatModel = chatRepository.upsert(chatModel).blockingGet(); MessageModel messageModel = setCommonMessageAttributes(message, chatModel); messageModel.setSender(from); messageModel.setIncoming(true); final ChatModel fChatModel = chatModel; disposable.add(messageRepository.insert(messageModel) .subscribe(insertedMessageModel -> { if (message.getBody() != null) { notificationManager.chatMessageReceived(new ChatAndPossiblyContact(fChatModel, contactModel), message.getBody()); } LastChatMessageRelation lastMessage = new LastChatMessageRelation(); lastMessage.setChat(fChatModel); lastMessage.setMessage(insertedMessageModel); })); }).subscribeOn(Schedulers.io()) .subscribe(); } public void handleOutgoingMessage(long accountId, EntityBareJid to, Message message, Chat chat) { MessageModel model = setCommonMessageAttributes(message, null); EntityModel entityModel = rosterRepository.getOrCreateEntity(accountId, to).blockingGet(); model.setIncoming(false); model.setTimestamp(new Date()); model.setSender(entityModel.getAccount().getJid()); model.setRecipient(to); ChatModel chatModel = chatRepository.getChatWith(entityModel).blockingFirst().firstOr(() -> { ChatModel freshChatModel = new ChatModel(); freshChatModel.setPeer(entityModel); freshChatModel.setDisplayed(true); return freshChatModel; }); model.setChat(chatModel); disposable.add(messageRepository.upsert(model) .subscribe(messageId -> LOGGER.log(Level.INFO, "Inserted outgoing message " + messageId))); } public void handleCarbonCopy(long accountId, CarbonExtension.Direction direction, Message carbonCopy, Message wrappingMessage) { if (carbonCopy.getBody() == null) { return; } MessageModel messageModel = new MessageModel(); messageModel.setSender(carbonCopy.getFrom() != null ? carbonCopy.getFrom().asEntityBareJidIfPossible() : null); messageModel.setRecipient(carbonCopy.getTo() != null ? carbonCopy.getTo().asEntityBareJidIfPossible() : null); messageModel.setIncoming(direction == CarbonExtension.Direction.received); messageModel.setBody(carbonCopy.getBody()); messageModel.setTimestamp(new Date()); disposable.add(messageRepository.upsert(messageModel) .subscribe(messageId -> LOGGER.log(Level.INFO, "Inserted carbon message " + messageId))); } public void registerForMercuryConnection(MercuryConnection connection) { ChatManager chatManager = ChatManager.getInstanceFor(connection.getConnection()); CarbonManager carbonManager = CarbonManager.getInstanceFor(connection.getConnection()); // Add account ID to chatManager.addIncomingListener((from, message, chat) -> PlainMessageStore.this.handleIncomingMessage( connection.getAccountId(), from, message, chat)); chatManager.addOutgoingListener((to, message, chat) -> PlainMessageStore.this.handleOutgoingMessage( connection.getAccountId(), to, message, chat)); carbonManager.addCarbonCopyReceivedListener((direction, carbonCopy, wrappingMessage) -> PlainMessageStore.this.handleCarbonCopy( connection.getAccountId(), direction, carbonCopy, wrappingMessage)); } public void dispose() { disposable.clear(); } public void handleMamResult(long accountId, EntityBareJid peerJid, MamManager.MamQuery query) { List messageModels = new ArrayList<>(); for (Message message : query.getMessages()) { Date date = new Date(); DelayInformation delay = DelayInformation.from(message); if (delay != null) { date = delay.getStamp(); } MessageModel messageModel = new MessageModel(); messageModel.setBody(message.getBody()); messageModel.setSender(message.getFrom().asEntityBareJidOrThrow()); messageModel.setRecipient(message.getTo().asEntityBareJidOrThrow()); messageModel.setIncoming(peerJid.equals(message.getFrom().asEntityBareJidOrThrow())); messageModel.setTimestamp(date); messageModels.add(messageModel); } disposable.add(messageRepository.upsert(messageModels).subscribe()); } private MessageModel incomingMessageToModel(Message message, ChatModel chat) { MessageModel model = setCommonMessageAttributes(message, chat); model.setIncoming(true); return model; } private MessageModel setCommonMessageAttributes(Message message, ChatModel chat) { MessageModel model = new MessageModel(); model.setBody(message.getBody()); Date timestamp = DelayInformationManager.getDelayTimestamp(message); model.setTimestamp(timestamp == null ? new Date() : timestamp); model.setThread(message.getThread()); model.setLegacyId(message.getStanzaId()); model.setChat(chat); model.setRecipient(message.getTo().asEntityBareJidOrThrow()); model.setSender(message.getFrom() != null ? message.getFrom().asEntityBareJidIfPossible() : null); OriginIdElement originId = OriginIdElement.getOriginId(message); model.setOriginId(originId != null ? originId.getId() : null); StanzaIdElement stanzaId = StanzaIdElement.getStanzaId(message); model.setStanzaId(stanzaId != null ? stanzaId.getId() : null); model.setStanzaIdBy(stanzaId != null ? JidCreate.entityBareFromOrThrowUnchecked(stanzaId.getBy()) : null); return model; } }