package org.mercury_im.messenger.core.stores; import org.jivesoftware.smack.chat2.Chat; import org.jivesoftware.smack.chat2.ChatManager; import org.jivesoftware.smack.packet.Message; import org.jivesoftware.smackx.carbons.CarbonManager; import org.jivesoftware.smackx.carbons.packet.CarbonExtension; import org.jivesoftware.smackx.delay.packet.DelayInformation; import org.jivesoftware.smackx.mam.MamManager; import org.jxmpp.jid.EntityBareJid; import org.mercury_im.messenger.core.NotificationManager; import org.mercury_im.messenger.core.connection.MercuryConnection; import org.mercury_im.messenger.persistence.model.MessageModel; import org.mercury_im.messenger.persistence.repository.MessageRepository; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.logging.Level; import java.util.logging.Logger; import io.reactivex.disposables.CompositeDisposable; import io.reactivex.functions.Consumer; import io.reactivex.schedulers.Schedulers; public class PlainMessageStore { private static final Logger LOGGER = Logger.getLogger(PlainMessageStore.class.getName()); private static final CompositeDisposable disposable = new CompositeDisposable(); private final MessageRepository messageRepository; private final NotificationManager notificationManager; public PlainMessageStore(MessageRepository messageRepository, NotificationManager notificationManager) { this.messageRepository = messageRepository; this.notificationManager = notificationManager; } public void newIncomingMessage(long accountId, EntityBareJid from, Message message, Chat chat) { if (message.getBody() == null) { return; } org.mercury_im.messenger.persistence.pojo.Chat chatPojo = new org.mercury_im.messenger.persistence.pojo.Chat(); chatPojo.jid = from; chatPojo.accountId = accountId; chatPojo.contactName = null; notificationManager.chatMessageReceived(chatPojo, null, message.getBody()); MessageModel messageModel = messageRepository.newMessageModel(); messageModel.setAccountId(accountId); messageModel.setFrom(chat.getXmppAddressOfChatPartner()); messageModel.setTo(message.getTo().asEntityBareJidIfPossible()); messageModel.setIncoming(true); messageModel.setBody(message.getBody()); messageModel.setSendDate(new Date()); disposable.add( messageRepository.insertMessage(messageModel) .subscribeOn(Schedulers.io()) .subscribe(messageId -> LOGGER.log(Level.INFO, "Inserted incoming message " + messageId))); } public void newOutgoingMessage(long accountId, EntityBareJid to, Message message, Chat chat) { MessageModel messageModel = messageRepository.newMessageModel(); messageModel.setAccountId(accountId); messageModel.setFrom(message.getFrom() != null ? message.getFrom().asEntityBareJidIfPossible() : null); messageModel.setTo(chat.getXmppAddressOfChatPartner()); messageModel.setIncoming(false); messageModel.setBody(message.getBody()); messageModel.setSendDate(new Date()); disposable.add( messageRepository.insertMessage(messageModel) .subscribeOn(Schedulers.io()) .subscribe(messageId -> LOGGER.log(Level.INFO, "Inserted outgoing message " + messageId))); } public void onCarbonCopyReceived(long accountId, CarbonExtension.Direction direction, Message carbonCopy, Message wrappingMessage) { if (carbonCopy.getBody() == null) { return; } MessageModel messageModel = messageRepository.newMessageModel(); messageModel.setAccountId(accountId); messageModel.setFrom(carbonCopy.getFrom() != null ? carbonCopy.getFrom().asEntityBareJidIfPossible() : null); messageModel.setTo(carbonCopy.getTo() != null ? carbonCopy.getTo().asEntityBareJidIfPossible() : null); messageModel.setIncoming(direction == CarbonExtension.Direction.received); messageModel.setBody(carbonCopy.getBody()); messageModel.setSendDate(new Date()); disposable.add( messageRepository.insertMessage(messageModel) .subscribeOn(Schedulers.io()) .subscribe(messageId -> LOGGER.log(Level.INFO, "Inserted carbon message " + messageId))); } public void registerForMercuryConnection(MercuryConnection connection) { ChatManager chatManager = ChatManager.getInstanceFor(connection.getConnection()); CarbonManager carbonManager = CarbonManager.getInstanceFor(connection.getConnection()); // Add account ID to chatManager.addIncomingListener((from, message, chat) -> PlainMessageStore.this.newIncomingMessage( connection.getAccountId(), from, message, chat)); chatManager.addOutgoingListener((to, message, chat) -> PlainMessageStore.this.newOutgoingMessage( connection.getAccountId(), to, message, chat)); carbonManager.addCarbonCopyReceivedListener((direction, carbonCopy, wrappingMessage) -> PlainMessageStore.this.onCarbonCopyReceived( connection.getAccountId(), direction, carbonCopy, wrappingMessage)); } public void dispose() { disposable.clear(); } public void onMamResult(long accountId, EntityBareJid peerJid, MamManager.MamQuery query) { List messageModels = new ArrayList<>(); for (Message message : query.getMessages()) { Date date = new Date(); DelayInformation delay = DelayInformation.from(message); if (delay != null) { date = delay.getStamp(); } MessageModel messageModel = messageRepository.newMessageModel(); messageModel.setAccountId(accountId); messageModel.setBody(message.getBody()); messageModel.setFrom(message.getFrom().asEntityBareJidOrThrow()); messageModel.setTo(message.getTo().asEntityBareJidOrThrow()); messageModel.setIncoming(peerJid.equals(message.getFrom().asEntityBareJidOrThrow())); messageModel.setSendDate(date); messageModels.add(messageModel); } disposable.add( messageRepository.insertMessages(messageModels) .subscribeOn(Schedulers.io()) .subscribe()); } }