Start work on dedicated message center

This commit is contained in:
Paul Schaub 2020-07-18 21:38:13 +02:00
parent 3b66968ad5
commit 9ea08b11da
Signed by: vanitasvitae
GPG Key ID: 62BEE9264BF17311
18 changed files with 259 additions and 179 deletions

View File

@ -0,0 +1,37 @@
package org.mercury_im.messenger.data.converter;
import org.jxmpp.jid.EntityFullJid;
import org.jxmpp.jid.impl.JidCreate;
import javax.annotation.Nullable;
import io.requery.Converter;
public class EntityFullJidConverter implements Converter<EntityFullJid, String> {
@Override
public Class<EntityFullJid> getMappedType() {
return EntityFullJid.class;
}
@Override
public Class<String> getPersistedType() {
return String.class;
}
@Nullable
@Override
public Integer getPersistedSize() {
return null;
}
@Override
public String convertToPersisted(EntityFullJid value) {
return value == null ? null : value.toString();
}
@Override
public EntityFullJid convertToMapped(Class<? extends EntityFullJid> type, @Nullable String value) {
return value == null ? null : JidCreate.entityFullFromOrThrowUnchecked(value);
}
}

View File

@ -0,0 +1,44 @@
package org.mercury_im.messenger.data.converter;
import org.jxmpp.jid.EntityFullJid;
import org.jxmpp.jid.EntityJid;
import org.jxmpp.jid.impl.JidCreate;
import javax.annotation.Nullable;
import io.requery.Converter;
public class EntityJidConverter implements Converter<EntityJid, String> {
@Override
public Class<EntityJid> getMappedType() {
return EntityJid.class;
}
@Override
public Class<String> getPersistedType() {
return String.class;
}
@Nullable
@Override
public Integer getPersistedSize() {
return null;
}
@Override
public String convertToPersisted(EntityJid value) {
return value.toString();
}
@Override
public EntityJid convertToMapped(Class<? extends EntityJid> type, @Nullable String value) {
switch (type.getName()) {
case "EntityFullJid":
return JidCreate.entityFullFromOrThrowUnchecked(value);
case "EntityBareJid":
return JidCreate.entityBareFromOrThrowUnchecked(value);
default:
throw new IllegalArgumentException("Unknown jid type encountered: " + type.getName());
}
}
}

View File

@ -1,6 +1,9 @@
package org.mercury_im.messenger.data.model;
import org.jxmpp.jid.EntityFullJid;
import org.mercury_im.messenger.data.converter.EntityFullJidConverter;
import org.mercury_im.messenger.data.converter.MessageDirectionConverter;
import org.mercury_im.messenger.entity.Encryption;
import org.mercury_im.messenger.entity.message.MessageDirection;
import org.pgpainless.key.OpenPgpV4Fingerprint;
@ -27,10 +30,12 @@ public abstract class AbstractMessageModel implements Persistable {
UUID chatId;
@Column(nullable = false)
String sender;
@Convert(EntityFullJidConverter.class)
EntityFullJid sender;
@Column(nullable = false)
String recipient;
@Convert(EntityFullJidConverter.class)
EntityFullJid recipient;
@Column(name = "\"timestamp\"", nullable = false)
Date timestamp;
@ -57,6 +62,10 @@ public abstract class AbstractMessageModel implements Persistable {
@Column
String stanzaIdBy;
@Column
Encryption encryption;
@Column
OpenPgpV4Fingerprint senderOXFingerprint;
}

View File

@ -54,29 +54,11 @@ public class RxMessageRepository
}
@Override
public Single<Message> insertMessage(DirectChat chat, Message message) {
return directChatDao.get(chat.getId()).maybe()
.switchIfEmpty(directChatDao.insert(directChatMapping.toModel(chat)))
.map(chatModel -> {
MessageModel messageModel = messageMapping.toModel(message);
messageModel.setChatId(chat.getId());
return messageModel;
})
.flatMap(messageModel -> data().upsert(messageModel))
.map(messageModel -> messageMapping.toEntity(messageModel, message));
}
@Override
public Single<Message> insertMessage(GroupChat chat, Message message) {
return groupChatDao.get(chat.getId()).maybe()
.switchIfEmpty(groupChatDao.insert(groupChatMapping.toModel(chat)))
.map(chatModel -> {
MessageModel messageModel = messageMapping.toModel(message);
messageModel.setChatId(chat.getId());
return messageModel;
})
.flatMap(data()::upsert)
.map(messageModel -> messageMapping.toEntity(messageModel, message));
public Single<Message> insertMessage(Message message) {
return Single.just(message)
.map(messageMapping::toModel)
.flatMap(dao::insert)
.map(messageMapping::toEntity);
}
@Override
@ -142,15 +124,7 @@ public class RxMessageRepository
}
@Override
public Single<Message> upsertMessage(DirectChat chat, Message message) {
return Single.just(message)
.map(messageMapping::toModel)
.flatMap(data()::upsert)
.map(messageMapping::toEntity);
}
@Override
public Single<Message> upsertMessage(GroupChat chat, Message message) {
public Single<Message> upsertMessage(Message message) {
return Single.just(message)
.map(messageMapping::toModel)
.flatMap(data()::upsert)

View File

@ -1,16 +0,0 @@
package org.mercury_im.messenger.core;
import org.mercury_im.messenger.core.listener.IncomingDirectMessageListener;
import org.mercury_im.messenger.entity.chat.Chat;
import org.mercury_im.messenger.entity.message.Message;
import io.reactivex.Completable;
public interface MessageCenter<C extends Chat> {
Messenger getMessenger();
Completable sendMessage(Message message, C chat);
void addIncomingMessageListener(IncomingDirectMessageListener listener);
}

View File

@ -0,0 +1,27 @@
package org.mercury_im.messenger.core.connection;
import org.mercury_im.messenger.entity.chat.Chat;
import org.mercury_im.messenger.entity.message.Message;
import org.mercury_im.messenger.entity.message.MessageDeliveryState;
import org.mercury_im.messenger.entity.message.MessageDirection;
import java.util.UUID;
public class MercuryMessageComposer implements MessageCenter.Composer {
@Override
public Message createChatMessage(Chat chat, String body) {
Message message = new Message();
message.setBody(body);
message.setSender(chat.getAccount().getJid());
message.setRecipient(chat.getAddress());
message.setDeliveryState(MessageDeliveryState.pending_delivery);
message.setDirection(MessageDirection.outgoing);
message.setRead(false);
UUID messageId = UUID.randomUUID();
message.setId(messageId);
message.setOriginId(messageId.toString());
return message;
}
}

View File

@ -0,0 +1,42 @@
package org.mercury_im.messenger.core.connection
import org.mercury_im.messenger.entity.message.Message as MercuryMessage;
import org.jivesoftware.smack.packet.Message as SmackMessage;
import java.util.UUID
import io.reactivex.Completable
import io.reactivex.Single
import org.mercury_im.messenger.core.util.AppendCompletableToSingle
import org.mercury_im.messenger.entity.chat.Chat
class MessageCenter(private val composer: Composer,
private val persister: Persister,
private val encryptor: Encryptor,
private val sender: Sender) {
fun sendSingleTextMessage(chat: Chat, body: String): Single<UUID> {
val messageEntity = composer.createChatMessage(chat, body)
val messageId = persister.persistPendingMessage(messageEntity)
val smackMessage = encryptor.encrypt(messageEntity)
val sendCompletable = sender.send(smackMessage);
return messageId.compose(AppendCompletableToSingle(sendCompletable));
}
interface Composer {
fun createChatMessage(chat: Chat, body: String): MercuryMessage
}
interface Persister {
fun persistPendingMessage(message: MercuryMessage): Single<UUID>
}
interface Encryptor {
fun encrypt(message: MercuryMessage): SmackMessage
}
interface Sender {
fun send(message: SmackMessage): Completable
}
}

View File

@ -0,0 +1,22 @@
package org.mercury_im.messenger.core.connection;
import org.mercury_im.messenger.core.data.repository.MessageRepository;
import org.mercury_im.messenger.entity.message.Message;
import java.util.UUID;
import io.reactivex.Single;
public class MessagePersister implements MessageCenter.Persister {
private final MessageRepository messageRepository;
public MessagePersister(MessageRepository messageRepository) {
this.messageRepository = messageRepository;
}
@Override
public Single<UUID> persistPendingMessage(Message message) {
return null;
}
}

View File

@ -1,115 +0,0 @@
package org.mercury_im.messenger.core.connection;
import org.jivesoftware.smack.XMPPConnection;
import org.jivesoftware.smack.chat2.Chat;
import org.jivesoftware.smack.chat2.ChatManager;
import org.jivesoftware.smack.chat2.IncomingChatMessageListener;
import org.jivesoftware.smack.packet.MessageBuilder;
import org.jivesoftware.smackx.sid.element.OriginIdElement;
import org.jxmpp.jid.EntityBareJid;
import org.jxmpp.jid.impl.JidCreate;
import org.mercury_im.messenger.core.MessageCenter;
import org.mercury_im.messenger.core.Messenger;
import org.mercury_im.messenger.core.data.repository.AccountRepository;
import org.mercury_im.messenger.core.data.repository.DirectChatRepository;
import org.mercury_im.messenger.core.data.repository.MessageRepository;
import org.mercury_im.messenger.core.data.repository.PeerRepository;
import org.mercury_im.messenger.core.listener.IncomingDirectMessageListener;
import org.mercury_im.messenger.entity.Account;
import org.mercury_im.messenger.entity.chat.DirectChat;
import org.mercury_im.messenger.entity.message.Message;
import java.util.LinkedHashSet;
import java.util.Set;
import javax.inject.Inject;
import io.reactivex.Completable;
import io.reactivex.disposables.CompositeDisposable;
public class XmppDirectMessageCenter
implements MessageCenter<DirectChat>, IncomingChatMessageListener {
private final PeerRepository peerRepository;
private final AccountRepository accountRepository;
private final DirectChatRepository directChatRepository;
private final MessageRepository messageRepository;
private final Messenger messenger;
private final Account account;
private final CompositeDisposable disposable = new CompositeDisposable();
private Set<IncomingDirectMessageListener> messageListeners = new LinkedHashSet<>();
@Inject
public XmppDirectMessageCenter(Account account, Messenger messenger, PeerRepository peerRepository,
AccountRepository accountRepository, DirectChatRepository directChatRepository,
MessageRepository messageRepository) {
this.messenger = messenger;
this.account = account;
this.peerRepository = peerRepository;
this.accountRepository = accountRepository;
this.directChatRepository = directChatRepository;
this.messageRepository = messageRepository;
XMPPConnection connection = getMessenger().getConnectionManager().getConnection(account).getConnection();
ChatManager.getInstanceFor(connection).addIncomingListener(this);
}
@Override
public Messenger getMessenger() {
return messenger;
}
@Override
public Completable sendMessage(Message message, DirectChat chat) {
ChatManager chatManager = getChatManager(chat);
EntityBareJid accountAddress =
JidCreate.entityBareFromOrThrowUnchecked(chat.getAccount().getAddress());
EntityBareJid peerAddress =
JidCreate.entityBareFromOrThrowUnchecked(chat.getPeer().getAddress());
MessageBuilder messageBuilder = MessageBuilder.buildMessage()
.from(accountAddress)
.to(peerAddress)
.ofType(org.jivesoftware.smack.packet.Message.Type.chat);
String originId = OriginIdElement.addTo(messageBuilder).getId();
String legacyStanzaId = messageBuilder.getStanzaId();
message.setLegacyStanzaId(legacyStanzaId);
message.setOriginId(originId);
Chat smackChat = chatManager.chatWith(peerAddress);
return messageRepository.insertMessage(chat, message)
.ignoreElement()
.andThen(Completable.fromAction(() -> smackChat.send(messageBuilder.build())));
}
@Override
public void addIncomingMessageListener(IncomingDirectMessageListener listener) {
messageListeners.add(listener);
}
protected ChatManager getChatManager(DirectChat chat) {
MercuryConnection mercuryConnection = getMessenger().getConnectionManager().getConnection(chat.getAccount());
return ChatManager.getInstanceFor(mercuryConnection.getConnection());
}
@Override
public void newIncomingMessage(EntityBareJid from, org.jivesoftware.smack.packet.Message message, Chat chat) {
disposable.add(peerRepository
// get peer
.getOrCreatePeer(account, from)
// get chat
.flatMap(directChatRepository::getOrCreateChatWithPeer)
// notify listeners
.subscribe(chatEntity -> {
for (IncomingDirectMessageListener listener : messageListeners) {
listener.onIncomingDirectMessage(account, chatEntity, new Message());
}
}));
}
}

View File

@ -12,9 +12,7 @@ import io.reactivex.Single;
public interface MessageRepository {
Single<Message> insertMessage(DirectChat chat, Message message);
Single<Message> insertMessage(GroupChat chat, Message message);
Single<Message> insertMessage(Message message);
Observable<List<Message>> observeMessages(DirectChat chat);
@ -26,9 +24,7 @@ public interface MessageRepository {
Observable<List<Message>> findMessagesWithBody(GroupChat chat, String body);
Single<Message> upsertMessage(DirectChat chat, Message message);
Single<Message> upsertMessage(GroupChat chat, Message message);
Single<Message> upsertMessage(Message message);
Single<Message> updateMessage(Message message);

View File

@ -97,7 +97,11 @@ public class MercuryMessageStore implements IncomingChatMessageListener, Outgoin
private Disposable writeMessageToStore(EntityBareJid peer, Message message) {
return peerRepository.getOrCreatePeer(account, peer)
.flatMap(directChatRepository::getOrCreateChatWithPeer)
.flatMap(chat -> messageRepository.insertMessage(chat, message))
.map(chat -> {
message.setChatId(chat.getId());
return chat;
})
.flatMap(chat -> messageRepository.insertMessage(message))
.subscribeOn(schedulers.getIoScheduler())
.subscribe(m -> LOGGER.log(Level.INFO, "Message written"), e -> LOGGER.log(Level.SEVERE, "Error: ", e));
}

View File

@ -0,0 +1,20 @@
package org.mercury_im.messenger.core.util;
import io.reactivex.Completable;
import io.reactivex.Single;
import io.reactivex.SingleSource;
import io.reactivex.SingleTransformer;
public class AppendCompletableToSingle<T> implements SingleTransformer<T, T> {
private final Completable completable;
public AppendCompletableToSingle(Completable completable) {
this.completable = completable;
}
@Override
public SingleSource<T> apply(Single<T> upstream) {
return upstream.flatMap(result -> completable.toSingle(() -> result));
}
}

View File

@ -12,6 +12,9 @@ import lombok.Data;
*/
@Data
public class Account {
static String EMPTY_ROSTER_VERSION = "";
UUID id;
String address;
String password;
@ -22,7 +25,7 @@ public class Account {
public Account() {
this.id = UUID.randomUUID();
this.rosterVersion = "";
this.rosterVersion = EMPTY_ROSTER_VERSION;
}
public EntityBareJid getJid() {

View File

@ -0,0 +1,9 @@
package org.mercury_im.messenger.entity;
public enum Encryption {
plain,
ox_sign,
ox_crypt,
ox_signcrypt,
;
}

View File

@ -20,4 +20,6 @@ public abstract class Chat {
public Chat() {
this.id = UUID.randomUUID();
}
public abstract String getAddress();
}

View File

@ -16,4 +16,9 @@ public class DirectChat extends Chat {
public DirectChat() {
super();
}
@Override
public String getAddress() {
return getPeer().getAddress();
}
}

View File

@ -16,4 +16,9 @@ public class GroupChat extends Chat {
Set<Peer> participants;
String roomAddress;
String roomName;
}
@Override
public String getAddress() {
return roomAddress;
}
}

View File

@ -1,5 +1,8 @@
package org.mercury_im.messenger.entity.message;
import org.jxmpp.jid.EntityFullJid;
import org.mercury_im.messenger.entity.Encryption;
import java.util.Date;
import java.util.List;
import java.util.UUID;
@ -9,17 +12,26 @@ import lombok.Data;
@Data
public class Message {
UUID id;
String sender;
String recipient;
UUID chatId;
EntityFullJid sender;
EntityFullJid recipient;
String body;
Date timestamp;
MessageDirection direction;
String body;
MessageDeliveryState deliveryState;
// <message id="blabla"/>
String legacyStanzaId;
String originId;
// XEP-0359: Unique and Stable Stanza IDs
String stanzaId;
String originId;
// Serialized message
String xml;
boolean encrypted;
Encryption encryption;
boolean received;
boolean read;