RXify message sending

This commit is contained in:
Paul Schaub 2019-08-25 17:54:03 +02:00
parent ab66770fd7
commit 2d6a9ef070
Signed by: vanitasvitae
GPG key ID: 62BEE9264BF17311
19 changed files with 158 additions and 266 deletions

View file

@ -7,13 +7,8 @@ import android.content.Context;
import android.content.Intent;
import android.os.Build;
import org.jivesoftware.smack.SmackConfiguration;
import org.jivesoftware.smack.SmackInitialization;
import org.jivesoftware.smack.im.SmackImInitializer;
import org.jivesoftware.smack.provider.ProviderManager;
import org.jivesoftware.smack.util.PacketParserUtils;
import org.mercury_im.messenger.core.ConnectionCenter;
import org.mercury_im.messenger.core.MercuryConfiguration;
import org.mercury_im.messenger.core.centers.ConnectionCenter;
import org.mercury_im.messenger.core.connection.MercuryConfiguration;
import org.mercury_im.messenger.di.component.AppComponent;
import org.mercury_im.messenger.di.component.DaggerAppComponent;
import org.mercury_im.messenger.di.module.AppModule;

View file

@ -1,8 +1,8 @@
package org.mercury_im.messenger.di.component;
import org.mercury_im.messenger.MercuryImApplication;
import org.mercury_im.messenger.core.stores.PlainMessageStore;
import org.mercury_im.messenger.di.module.AppModule;
import org.mercury_im.messenger.handler.RoomPlainMessageHandler;
import org.mercury_im.messenger.persistence.room.RoomModule;
import org.mercury_im.messenger.service.XmppConnectionService;
import org.mercury_im.messenger.ui.MainActivity;
@ -69,5 +69,5 @@ public interface AppComponent {
// Connectors
void inject(RoomPlainMessageHandler messageHandler);
void inject(PlainMessageStore messageHandler);
}

View file

@ -1,83 +0,0 @@
package org.mercury_im.messenger.handler;
import android.util.Log;
import org.jivesoftware.smack.chat2.Chat;
import org.jivesoftware.smack.packet.Message;
import org.jivesoftware.smackx.carbons.packet.CarbonExtension;
import org.jxmpp.jid.EntityBareJid;
import org.mercury_im.messenger.MercuryImApplication;
import org.mercury_im.messenger.persistence.model.MessageModel;
import org.mercury_im.messenger.persistence.repository.MessageRepository;
import org.mercury_im.messenger.persistence.room.model.RoomMessageModel;
import org.mercury_im.messenger.core.PlainMessageHandler;
import java.util.Date;
import javax.inject.Inject;
import io.reactivex.Single;
import static org.mercury_im.messenger.MercuryImApplication.TAG;
public class RoomPlainMessageHandler implements PlainMessageHandler {
private final long accountId;
@Inject
MessageRepository messageRepository;
public RoomPlainMessageHandler(long accountId) {
this.accountId = accountId;
MercuryImApplication.getApplication().getAppComponent().inject(this);
}
@Override
public void newIncomingMessage(EntityBareJid from, Message message, Chat chat) {
if (message.getBody() == null) {
return;
}
MessageModel messageModel = new RoomMessageModel();
messageModel.setAccountId(accountId);
messageModel.setFrom(chat.getXmppAddressOfChatPartner());
messageModel.setTo(message.getTo().asEntityBareJidIfPossible());
messageModel.setIncoming(true);
messageModel.setBody(message.getBody());
messageModel.setSendDate(new Date());
Single<Long> messageId = messageRepository.insertMessage(messageModel);
Log.d(TAG, "Inserted incoming message " + messageId.blockingGet());
}
@Override
public void newOutgoingMessage(EntityBareJid to, Message message, Chat chat) {
MessageModel messageModel = new RoomMessageModel();
messageModel.setAccountId(accountId);
messageModel.setFrom(message.getFrom() != null ? message.getFrom().asEntityBareJidIfPossible() : null);
messageModel.setTo(chat.getXmppAddressOfChatPartner());
messageModel.setIncoming(false);
messageModel.setBody(message.getBody());
messageModel.setSendDate(new Date());
Single<Long> messageId = messageRepository.insertMessage(messageModel);
Log.d(TAG, "Inserted outgoing message " + messageId.blockingGet());
}
@Override
public void onCarbonCopyReceived(CarbonExtension.Direction direction, Message carbonCopy, Message wrappingMessage) {
Log.d(TAG, "onCarbonReceived:" + carbonCopy.toXML());
if (carbonCopy.getBody() == null) {
return;
}
MessageModel messageModel = new RoomMessageModel();
messageModel.setAccountId(accountId);
messageModel.setFrom(carbonCopy.getFrom() != null ? carbonCopy.getFrom().asEntityBareJidIfPossible() : null);
messageModel.setTo(carbonCopy.getTo() != null ? carbonCopy.getTo().asEntityBareJidIfPossible() : null);
messageModel.setIncoming(direction == CarbonExtension.Direction.received);
messageModel.setBody(carbonCopy.getBody());
messageModel.setSendDate(new Date());
Single<Long> messageId = messageRepository.insertMessage(messageModel);
Log.d(TAG, "Inserted carbon message " + messageId.blockingGet());
}
}

View file

@ -11,10 +11,13 @@ import androidx.lifecycle.ViewModelProviders;
import androidx.recyclerview.widget.LinearLayoutManager;
import androidx.recyclerview.widget.RecyclerView;
import org.jivesoftware.smack.SmackException;
import org.jivesoftware.smack.chat2.ChatManager;
import org.jxmpp.jid.EntityBareJid;
import org.jxmpp.jid.impl.JidCreate;
import org.mercury_im.messenger.MercuryImApplication;
import org.mercury_im.messenger.R;
import org.mercury_im.messenger.core.centers.ConnectionCenter;
import org.mercury_im.messenger.persistence.model.AccountModel;
import org.mercury_im.messenger.persistence.model.ContactModel;
import org.mercury_im.messenger.persistence.model.MessageModel;
@ -22,10 +25,7 @@ import org.mercury_im.messenger.persistence.repository.AccountRepository;
import org.mercury_im.messenger.persistence.repository.MessageRepository;
import org.mercury_im.messenger.persistence.repository.RosterRepository;
import java.util.Date;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.inject.Inject;
@ -52,6 +52,9 @@ public class ChatActivity extends AppCompatActivity implements ChatInputFragment
@Inject
RosterRepository rosterRepository;
@Inject
ConnectionCenter connectionCenter;
@BindView(R.id.toolbar)
Toolbar toolbar;
@ -106,7 +109,6 @@ public class ChatActivity extends AppCompatActivity implements ChatInputFragment
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe((Consumer<ContactModel>) o -> {
Logger.getLogger("AAAAAAAA").log(Level.INFO, "Title change?");
String title = o.getRosterName();
if (title == null) {
title = jidString;
@ -126,6 +128,7 @@ public class ChatActivity extends AppCompatActivity implements ChatInputFragment
ChatInputFragment composer = (ChatInputFragment) getSupportFragmentManager()
.findFragmentById(R.id.fragment_compose);
}
}
@ -165,26 +168,12 @@ public class ChatActivity extends AppCompatActivity implements ChatInputFragment
return;
}
MessageModel messageModel = messageRepository.newMessageModel();
AccountModel account = (AccountModel) accountRepository.getAccount(accountId)
.subscribeOn(Schedulers.io()).blockingGet();
messageModel.setFrom(account.getJid());
messageModel.setTo(jid);
messageModel.setSendDate(new Date());
messageModel.setBody(msg);
messageModel.setIncoming(false);
messageModel.setAccountId(accountId);
disposable.add(messageRepository
.insertMessage(messageModel)
.subscribeOn(Schedulers.io())
.subscribe());
/*
// TODO: Improve by using rx
new Thread() {
@Override
public void run() {
try {
ChatManager.getInstanceFor(connectionService.getConnection(accountId).getConnection())
ChatManager.getInstanceFor(connectionCenter.getConnection(accountId).getConnection())
.chatWith(jid).send(msg);
} catch (SmackException.NotConnectedException e) {
e.printStackTrace();
@ -193,6 +182,5 @@ public class ChatActivity extends AppCompatActivity implements ChatInputFragment
}
}
}.start();
*/
}
}

View file

@ -5,10 +5,9 @@ import androidx.lifecycle.MutableLiveData;
import androidx.lifecycle.ViewModel;
import org.mercury_im.messenger.MercuryImApplication;
import org.mercury_im.messenger.core.ConnectionCenter;
import org.mercury_im.messenger.core.centers.ConnectionCenter;
import org.mercury_im.messenger.persistence.model.AccountModel;
import org.mercury_im.messenger.persistence.repository.AccountRepository;
import org.mercury_im.messenger.persistence.room.model.RoomAccountModel;
import java.util.List;

View file

@ -11,20 +11,19 @@ import androidx.lifecycle.ViewModel;
import org.jxmpp.jid.EntityBareJid;
import org.jxmpp.jid.impl.JidCreate;
import org.mercury_im.messenger.MercuryImApplication;
import org.mercury_im.messenger.core.ConnectionCenter;
import org.mercury_im.messenger.core.centers.ConnectionCenter;
import org.mercury_im.messenger.persistence.model.AccountModel;
import org.mercury_im.messenger.persistence.repository.AccountRepository;
import org.mercury_im.messenger.persistence.room.model.RoomAccountModel;
import javax.inject.Inject;
import io.reactivex.ObservableSource;
import io.reactivex.Single;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.observers.DisposableSingleObserver;
import io.reactivex.schedulers.Schedulers;
import static org.mercury_im.messenger.core.MercuryConnection.TAG;
import static org.mercury_im.messenger.core.connection.MercuryConnection.TAG;
public class LoginViewModel extends ViewModel {

View file

@ -1,10 +1,8 @@
package org.mercury_im.messenger;
import org.junit.Test;
import org.mercury_im.messenger.core.ConnectionState;
import org.mercury_im.messenger.core.connection.ConnectionState;
import io.reactivex.Observable;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.TestObserver;
import io.reactivex.schedulers.TestScheduler;
import io.reactivex.subjects.BehaviorSubject;

View file

@ -1,84 +0,0 @@
package org.mercury_im.messenger.core;
import org.jivesoftware.smack.packet.Presence;
import org.jivesoftware.smack.roster.Roster;
import org.jivesoftware.smack.roster.packet.RosterPacket;
import org.jxmpp.jid.BareJid;
import org.jxmpp.jid.FullJid;
import org.jxmpp.jid.Jid;
import java.util.Collection;
import java.util.List;
public class IRosterHandler implements RosterHandler {
@Override
public void presenceAvailable(FullJid address, Presence availablePresence) {
}
@Override
public void presenceUnavailable(FullJid address, Presence presence) {
}
@Override
public void presenceError(Jid address, Presence errorPresence) {
}
@Override
public void presenceSubscribed(BareJid address, Presence subscribedPresence) {
}
@Override
public void presenceUnsubscribed(BareJid address, Presence unsubscribedPresence) {
}
@Override
public void onRosterLoaded(Roster roster) {
}
@Override
public void onRosterLoadingFailed(Exception exception) {
}
@Override
public List<RosterPacket.Item> getEntries() {
return null;
}
@Override
public RosterPacket.Item getEntry(Jid bareJid) {
return null;
}
@Override
public String getRosterVersion() {
return null;
}
@Override
public boolean addEntry(RosterPacket.Item item, String version) {
return false;
}
@Override
public boolean resetEntries(Collection<RosterPacket.Item> items, String version) {
return false;
}
@Override
public boolean removeEntry(Jid bareJid, String version) {
return false;
}
@Override
public void resetStore() {
}
}

View file

@ -1,9 +0,0 @@
package org.mercury_im.messenger.core;
import org.jivesoftware.smack.chat2.IncomingChatMessageListener;
import org.jivesoftware.smack.chat2.OutgoingChatMessageListener;
import org.jivesoftware.smackx.carbons.CarbonCopyReceivedListener;
public interface PlainMessageHandler extends IncomingChatMessageListener, OutgoingChatMessageListener, CarbonCopyReceivedListener {
}

View file

@ -1,9 +0,0 @@
package org.mercury_im.messenger.core;
import org.jivesoftware.smack.roster.PresenceEventListener;
import org.jivesoftware.smack.roster.RosterLoadedListener;
import org.jivesoftware.smack.roster.rosterstore.RosterStore;
public interface RosterHandler extends RosterStore, RosterLoadedListener, PresenceEventListener {
}

View file

@ -1,10 +1,14 @@
package org.mercury_im.messenger.core;
package org.mercury_im.messenger.core.centers;
import org.jivesoftware.smack.AbstractXMPPConnection;
import org.jivesoftware.smack.SmackConfiguration;
import org.jivesoftware.smack.tcp.XMPPTCPConnection;
import org.jivesoftware.smack.tcp.XMPPTCPConnectionConfiguration;
import org.jivesoftware.smackx.caps.EntityCapsManager;
import org.mercury_im.messenger.core.stores.EntityCapsStore;
import org.mercury_im.messenger.core.connection.MercuryConfiguration;
import org.mercury_im.messenger.core.connection.MercuryConnection;
import org.mercury_im.messenger.core.stores.PlainMessageStore;
import org.mercury_im.messenger.core.stores.RosterStore;
import org.mercury_im.messenger.persistence.model.AccountModel;
import org.mercury_im.messenger.persistence.repository.AccountRepository;
import org.mercury_im.messenger.persistence.repository.RosterRepository;
@ -33,6 +37,7 @@ public class ConnectionCenter {
// Injected
private final AccountRepository accountRepository;
private final PlainMessageStore messageStore;
private final EntityCapsStore entityCapsStore;
private final RosterRepository rosterRepository;
@ -43,13 +48,17 @@ public class ConnectionCenter {
// Disposable for rx
private final CompositeDisposable disposable = new CompositeDisposable();
private final AtomicBoolean started = new AtomicBoolean(false);
private final AtomicBoolean isConnectionCenterStarted = new AtomicBoolean(false);
@Inject
public ConnectionCenter(EntityCapsStore capsStore, AccountRepository accountRepository, RosterRepository rosterRepository) {
public ConnectionCenter(EntityCapsStore capsStore,
PlainMessageStore messageStore,
AccountRepository accountRepository,
RosterRepository rosterRepository) {
LOGGER.log(Level.INFO, "ConnectionCenter initialized");
this.accountRepository = accountRepository;
this.entityCapsStore = capsStore;
this.messageStore = messageStore;
this.accountRepository = accountRepository;
this.rosterRepository = rosterRepository;
EntityCapsManager.setPersistentCache(capsStore);
@ -64,7 +73,7 @@ public class ConnectionCenter {
@SuppressWarnings("unchecked")
public synchronized void startUp() {
if (started.getAndSet(true)) {
if (isConnectionCenterStarted.getAndSet(true)) {
// already started.
return;
}
@ -83,14 +92,18 @@ public class ConnectionCenter {
if (connectionMap.get(account.getId()) != null) {
continue;
}
LOGGER.log(Level.INFO, "Add new connection " + account.getJid().toString() + " to ConnectionCenter list.");
MercuryConnection connection = createConnection(account);
connectionMap.put(account.getId(), connection);
RosterStore rosterStore = new RosterStore(rosterRepository);
rosterStore.setAccountId(account.getId());
rosterStore.subscribe();
connection.getRoster().setRosterStore(rosterStore);
messageStore.registerForMercuryConnection(connection);
if (account.getEnabled()) {
LOGGER.log(Level.INFO, "Connecting...");
connection.connect();

View file

@ -1,4 +1,4 @@
package org.mercury_im.messenger.core;
package org.mercury_im.messenger.core.connection;
/**
* {@link MercuryConnection} modeled as a finite state machine.

View file

@ -1,4 +1,4 @@
package org.mercury_im.messenger.core;
package org.mercury_im.messenger.core.connection;
import org.jivesoftware.smack.SmackConfiguration;

View file

@ -1,4 +1,4 @@
package org.mercury_im.messenger.core;
package org.mercury_im.messenger.core.connection;
import org.jivesoftware.smack.AbstractXMPPConnection;
import org.jivesoftware.smack.ConnectionListener;
@ -92,12 +92,6 @@ public class MercuryConnection {
return connection;
}
public void setPlainMessageHandler(PlainMessageHandler plainMessageHandler) {
carbonManager.addCarbonCopyReceivedListener(plainMessageHandler);
chatManager.addIncomingListener(plainMessageHandler);
chatManager.addOutgoingListener(plainMessageHandler);
}
public long getAccountId() {
return accountId;
}

View file

@ -1,8 +1,10 @@
package org.mercury_im.messenger.core.di;
import org.mercury_im.messenger.core.ConnectionCenter;
import org.mercury_im.messenger.core.EntityCapsStore;
import org.mercury_im.messenger.core.centers.ConnectionCenter;
import org.mercury_im.messenger.core.stores.EntityCapsStore;
import org.mercury_im.messenger.core.stores.PlainMessageStore;
import org.mercury_im.messenger.persistence.repository.AccountRepository;
import org.mercury_im.messenger.persistence.repository.MessageRepository;
import org.mercury_im.messenger.persistence.repository.RosterRepository;
import org.mercury_im.messenger.persistence.repository.EntityCapsRepository;
@ -16,8 +18,8 @@ public class CenterModule {
@Singleton
@Provides
static ConnectionCenter provideConnectionCenter(EntityCapsStore capsStore, AccountRepository accountRepository, RosterRepository rosterRepository) {
return new ConnectionCenter(capsStore, accountRepository, rosterRepository);
static ConnectionCenter provideConnectionCenter(EntityCapsStore capsStore, PlainMessageStore messageStore, AccountRepository accountRepository, RosterRepository rosterRepository) {
return new ConnectionCenter(capsStore, messageStore, accountRepository, rosterRepository);
}
@Singleton
@ -26,4 +28,10 @@ public class CenterModule {
return new EntityCapsStore(capsRepository);
}
@Singleton
@Provides
static PlainMessageStore provideMessageStore(MessageRepository messageRepository) {
return new PlainMessageStore(messageRepository);
}
}

View file

@ -1,16 +0,0 @@
package org.mercury_im.messenger.core.di;
import org.mercury_im.messenger.core.IRosterHandler;
import org.mercury_im.messenger.core.RosterHandler;
import dagger.Module;
import dagger.Provides;
@Module
public class HandlerModule {
@Provides
RosterHandler provideRosterHandler() {
return new IRosterHandler();
}
}

View file

@ -1,16 +1,12 @@
package org.mercury_im.messenger.core;
package org.mercury_im.messenger.core.stores;
import org.jivesoftware.smack.parsing.SmackParsingException;
import org.jivesoftware.smack.util.PacketParserUtils;
import org.jivesoftware.smack.xml.XmlPullParser;
import org.jivesoftware.smack.xml.XmlPullParserException;
import org.jivesoftware.smackx.caps.cache.EntityCapsPersistentCache;
import org.jivesoftware.smackx.disco.packet.DiscoverInfo;
import org.mercury_im.messenger.persistence.model.EntityCapsModel;
import org.mercury_im.messenger.persistence.repository.EntityCapsRepository;
import java.io.IOException;
import java.io.Reader;
import java.io.StringReader;
import java.util.HashMap;
import java.util.List;

View file

@ -0,0 +1,105 @@
package org.mercury_im.messenger.core.stores;
import org.jivesoftware.smack.chat2.Chat;
import org.jivesoftware.smack.chat2.ChatManager;
import org.jivesoftware.smack.packet.Message;
import org.jivesoftware.smackx.carbons.CarbonManager;
import org.jivesoftware.smackx.carbons.packet.CarbonExtension;
import org.jxmpp.jid.EntityBareJid;
import org.mercury_im.messenger.core.connection.MercuryConnection;
import org.mercury_im.messenger.persistence.model.MessageModel;
import org.mercury_im.messenger.persistence.repository.MessageRepository;
import java.util.Date;
import java.util.logging.Level;
import java.util.logging.Logger;
import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.schedulers.Schedulers;
public class PlainMessageStore {
private static final Logger LOGGER = Logger.getLogger(PlainMessageStore.class.getName());
private static final CompositeDisposable disposable = new CompositeDisposable();
private final MessageRepository messageRepository;
public PlainMessageStore(MessageRepository messageRepository) {
this.messageRepository = messageRepository;
}
public void newIncomingMessage(long accountId, EntityBareJid from, Message message, Chat chat) {
if (message.getBody() == null) {
return;
}
MessageModel messageModel = messageRepository.newMessageModel();
messageModel.setAccountId(accountId);
messageModel.setFrom(chat.getXmppAddressOfChatPartner());
messageModel.setTo(message.getTo().asEntityBareJidIfPossible());
messageModel.setIncoming(true);
messageModel.setBody(message.getBody());
messageModel.setSendDate(new Date());
disposable.add(
messageRepository.insertMessage(messageModel)
.subscribeOn(Schedulers.io())
.subscribe(messageId ->
LOGGER.log(Level.INFO, "Inserted incoming message " + messageId)));
}
public void newOutgoingMessage(long accountId, EntityBareJid to, Message message, Chat chat) {
MessageModel messageModel = messageRepository.newMessageModel();
messageModel.setAccountId(accountId);
messageModel.setFrom(message.getFrom() != null ? message.getFrom().asEntityBareJidIfPossible() : null);
messageModel.setTo(chat.getXmppAddressOfChatPartner());
messageModel.setIncoming(false);
messageModel.setBody(message.getBody());
messageModel.setSendDate(new Date());
disposable.add(
messageRepository.insertMessage(messageModel)
.subscribeOn(Schedulers.io())
.subscribe(messageId ->
LOGGER.log(Level.INFO, "Inserted outgoing message " + messageId)));
}
public void onCarbonCopyReceived(long accountId, CarbonExtension.Direction direction, Message carbonCopy, Message wrappingMessage) {
if (carbonCopy.getBody() == null) {
return;
}
MessageModel messageModel = messageRepository.newMessageModel();
messageModel.setAccountId(accountId);
messageModel.setFrom(carbonCopy.getFrom() != null ? carbonCopy.getFrom().asEntityBareJidIfPossible() : null);
messageModel.setTo(carbonCopy.getTo() != null ? carbonCopy.getTo().asEntityBareJidIfPossible() : null);
messageModel.setIncoming(direction == CarbonExtension.Direction.received);
messageModel.setBody(carbonCopy.getBody());
messageModel.setSendDate(new Date());
disposable.add(
messageRepository.insertMessage(messageModel)
.subscribeOn(Schedulers.io())
.subscribe(messageId ->
LOGGER.log(Level.INFO, "Inserted carbon message " + messageId)));
}
public void registerForMercuryConnection(MercuryConnection connection) {
ChatManager chatManager = ChatManager.getInstanceFor(connection.getConnection());
CarbonManager carbonManager = CarbonManager.getInstanceFor(connection.getConnection());
// Add account ID to
chatManager.addIncomingListener((from, message, chat) ->
PlainMessageStore.this.newIncomingMessage(
connection.getAccountId(), from, message, chat));
chatManager.addOutgoingListener((to, message, chat) ->
PlainMessageStore.this.newOutgoingMessage(
connection.getAccountId(), to, message, chat));
carbonManager.addCarbonCopyReceivedListener((direction, carbonCopy, wrappingMessage) ->
PlainMessageStore.this.onCarbonCopyReceived(
connection.getAccountId(), direction, carbonCopy, wrappingMessage));
}
public void dispose() {
disposable.clear();
}
}

View file

@ -1,6 +1,5 @@
package org.mercury_im.messenger.core;
package org.mercury_im.messenger.core.stores;
import org.jivesoftware.smack.roster.Roster;
import org.jivesoftware.smack.roster.packet.RosterPacket;
import org.jxmpp.jid.Jid;
import org.mercury_im.messenger.persistence.model.ContactModel;
@ -19,7 +18,6 @@ import java.util.logging.Logger;
import javax.inject.Inject;
import io.reactivex.Observable;
import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.functions.Consumer;
import io.reactivex.schedulers.Schedulers;