Mercury-IM/core/src/main/java/org/mercury_im/messenger/core/stores/PlainMessageStore.java

136 lines
6.0 KiB
Java
Raw Normal View History

2019-08-25 17:54:03 +02:00
package org.mercury_im.messenger.core.stores;
import org.jivesoftware.smack.chat2.Chat;
import org.jivesoftware.smack.chat2.ChatManager;
import org.jivesoftware.smack.packet.Message;
import org.jivesoftware.smackx.carbons.CarbonManager;
import org.jivesoftware.smackx.carbons.packet.CarbonExtension;
2019-09-08 04:47:59 +02:00
import org.jivesoftware.smackx.delay.packet.DelayInformation;
import org.jivesoftware.smackx.mam.MamManager;
2019-08-25 17:54:03 +02:00
import org.jxmpp.jid.EntityBareJid;
import org.mercury_im.messenger.core.connection.MercuryConnection;
import org.mercury_im.messenger.persistence.model.MessageModel;
import org.mercury_im.messenger.persistence.repository.MessageRepository;
2019-09-08 04:47:59 +02:00
import java.util.ArrayList;
2019-08-25 17:54:03 +02:00
import java.util.Date;
2019-09-08 04:47:59 +02:00
import java.util.List;
2019-08-25 17:54:03 +02:00
import java.util.logging.Level;
import java.util.logging.Logger;
import io.reactivex.disposables.CompositeDisposable;
2019-09-08 04:47:59 +02:00
import io.reactivex.functions.Consumer;
2019-08-25 17:54:03 +02:00
import io.reactivex.schedulers.Schedulers;
public class PlainMessageStore {
private static final Logger LOGGER = Logger.getLogger(PlainMessageStore.class.getName());
private static final CompositeDisposable disposable = new CompositeDisposable();
private final MessageRepository messageRepository;
public PlainMessageStore(MessageRepository messageRepository) {
this.messageRepository = messageRepository;
}
public void newIncomingMessage(long accountId, EntityBareJid from, Message message, Chat chat) {
if (message.getBody() == null) {
return;
}
MessageModel messageModel = messageRepository.newMessageModel();
messageModel.setAccountId(accountId);
messageModel.setFrom(chat.getXmppAddressOfChatPartner());
messageModel.setTo(message.getTo().asEntityBareJidIfPossible());
messageModel.setIncoming(true);
messageModel.setBody(message.getBody());
messageModel.setSendDate(new Date());
disposable.add(
messageRepository.insertMessage(messageModel)
.subscribeOn(Schedulers.io())
.subscribe(messageId ->
LOGGER.log(Level.INFO, "Inserted incoming message " + messageId)));
}
public void newOutgoingMessage(long accountId, EntityBareJid to, Message message, Chat chat) {
MessageModel messageModel = messageRepository.newMessageModel();
messageModel.setAccountId(accountId);
messageModel.setFrom(message.getFrom() != null ? message.getFrom().asEntityBareJidIfPossible() : null);
messageModel.setTo(chat.getXmppAddressOfChatPartner());
messageModel.setIncoming(false);
messageModel.setBody(message.getBody());
messageModel.setSendDate(new Date());
disposable.add(
messageRepository.insertMessage(messageModel)
.subscribeOn(Schedulers.io())
.subscribe(messageId ->
LOGGER.log(Level.INFO, "Inserted outgoing message " + messageId)));
}
public void onCarbonCopyReceived(long accountId, CarbonExtension.Direction direction, Message carbonCopy, Message wrappingMessage) {
if (carbonCopy.getBody() == null) {
return;
}
MessageModel messageModel = messageRepository.newMessageModel();
messageModel.setAccountId(accountId);
messageModel.setFrom(carbonCopy.getFrom() != null ? carbonCopy.getFrom().asEntityBareJidIfPossible() : null);
messageModel.setTo(carbonCopy.getTo() != null ? carbonCopy.getTo().asEntityBareJidIfPossible() : null);
messageModel.setIncoming(direction == CarbonExtension.Direction.received);
messageModel.setBody(carbonCopy.getBody());
messageModel.setSendDate(new Date());
disposable.add(
messageRepository.insertMessage(messageModel)
.subscribeOn(Schedulers.io())
.subscribe(messageId ->
LOGGER.log(Level.INFO, "Inserted carbon message " + messageId)));
}
public void registerForMercuryConnection(MercuryConnection connection) {
ChatManager chatManager = ChatManager.getInstanceFor(connection.getConnection());
CarbonManager carbonManager = CarbonManager.getInstanceFor(connection.getConnection());
// Add account ID to
chatManager.addIncomingListener((from, message, chat) ->
PlainMessageStore.this.newIncomingMessage(
connection.getAccountId(), from, message, chat));
chatManager.addOutgoingListener((to, message, chat) ->
PlainMessageStore.this.newOutgoingMessage(
connection.getAccountId(), to, message, chat));
carbonManager.addCarbonCopyReceivedListener((direction, carbonCopy, wrappingMessage) ->
PlainMessageStore.this.onCarbonCopyReceived(
connection.getAccountId(), direction, carbonCopy, wrappingMessage));
}
public void dispose() {
disposable.clear();
}
2019-09-08 04:47:59 +02:00
public void onMamResult(long accountId, EntityBareJid peerJid, MamManager.MamQuery query) {
List<MessageModel> messageModels = new ArrayList<>();
for (Message message : query.getMessages()) {
Date date = new Date();
DelayInformation delay = DelayInformation.from(message);
if (delay != null) {
date = delay.getStamp();
}
MessageModel messageModel = messageRepository.newMessageModel();
messageModel.setAccountId(accountId);
messageModel.setBody(message.getBody());
messageModel.setFrom(message.getFrom().asEntityBareJidOrThrow());
messageModel.setTo(message.getTo().asEntityBareJidOrThrow());
messageModel.setIncoming(peerJid.equals(message.getFrom().asEntityBareJidOrThrow()));
messageModel.setSendDate(date);
messageModels.add(messageModel);
}
disposable.add(
messageRepository.insertMessages(messageModels)
.subscribeOn(Schedulers.io())
.subscribe());
}
2019-08-25 17:54:03 +02:00
}