Move connection handling to MercuryConnectionManager

This commit is contained in:
Paul Schaub 2020-01-05 15:54:44 +01:00
parent fc48b35f05
commit bc85fe432d
Signed by: vanitasvitae
GPG key ID: 62BEE9264BF17311
6 changed files with 140 additions and 56 deletions

View file

@ -7,10 +7,8 @@ import org.mercury_im.messenger.usecase.AddAccount;
import org.mercury_im.messenger.usecase.ConnectAccountsOnStartup; import org.mercury_im.messenger.usecase.ConnectAccountsOnStartup;
import org.mercury_im.messenger.usecase.RosterStoreBinder; import org.mercury_im.messenger.usecase.RosterStoreBinder;
import org.mercury_im.messenger.xmpp.MercuryConnection; import org.mercury_im.messenger.xmpp.MercuryConnection;
import org.mercury_im.messenger.xmpp.MercuryConnectionManager;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
@ -19,49 +17,43 @@ import javax.inject.Inject;
import io.reactivex.disposables.CompositeDisposable; import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.schedulers.Schedulers; import io.reactivex.schedulers.Schedulers;
public class Messenger implements ConnectionRegistry, ClientStateListener { public class Messenger implements ClientStateListener {
public static final String TAG = "MercuryIM"; public static final String TAG = "MercuryIM";
private static final Logger LOGGER = Logger.getLogger(Messenger.class.getName()); private static final Logger LOGGER = Logger.getLogger(Messenger.class.getName());
private final Map<UUID, MercuryConnection> connections = new HashMap<>(); private final MercuryConnectionManager connectionManager;
private Repositories repositories; private final Repositories repositories;
private final RosterStoreBinder rosterStoreBinder; private final RosterStoreBinder rosterStoreBinder;
private CompositeDisposable disposable = new CompositeDisposable(); private CompositeDisposable disposable = new CompositeDisposable();
@Inject @Inject
public Messenger(Repositories repositories) { public Messenger(Repositories repositories) {
this.connectionManager = new MercuryConnectionManager(this, repositories.getAccountRepository());
this.repositories = repositories; this.repositories = repositories;
this.rosterStoreBinder = new RosterStoreBinder(repositories.getAccountRepository(), repositories.getPeerRepository()); this.rosterStoreBinder = new RosterStoreBinder(repositories.getAccountRepository(), repositories.getPeerRepository());
performInitialLogin(); performInitialLogin();
} }
public MercuryConnectionManager getConnectionManager() {
return connectionManager;
}
public void performInitialLogin() { public void performInitialLogin() {
disposable.add(repositories.getAccountRepository().observeAllAccounts().firstOrError() disposable.add(repositories.getAccountRepository().observeAllAccounts().firstOrError()
.subscribeOn(Schedulers.newThread()) .subscribeOn(Schedulers.newThread())
.subscribe(initialAccounts -> ConnectAccountsOnStartup .subscribe(initialAccounts -> ConnectAccountsOnStartup
.with(this, initialAccounts) .with(this.connectionManager, repositories.getAccountRepository(), initialAccounts)
.execute())); .execute()));
} }
@Override
public void addConnection(MercuryConnection connection) {
connections.put(connection.getAccount().getId(), connection);
}
@Override
public MercuryConnection getConnection(UUID accountId) {
return connections.get(accountId);
}
public void bindConnection(MercuryConnection connection) { public void bindConnection(MercuryConnection connection) {
rosterStoreBinder.setRosterStoreOn(connection); rosterStoreBinder.setRosterStoreOn(connection);
} }
public AddAccount addAccount() { public AddAccount addAccount() {
return new AddAccount(repositories.getAccountRepository(), this); return new AddAccount(repositories.getAccountRepository(), this.connectionManager);
} }
// CSI // CSI
@ -69,7 +61,7 @@ public class Messenger implements ConnectionRegistry, ClientStateListener {
@Override @Override
public void onClientInForeground() { public void onClientInForeground() {
LOGGER.log(Level.INFO, "CSI: active"); LOGGER.log(Level.INFO, "CSI: active");
for (MercuryConnection connection : connections.values()) { for (MercuryConnection connection : connectionManager.getConnections()) {
tryCsiActive(connection); tryCsiActive(connection);
} }
} }
@ -77,7 +69,7 @@ public class Messenger implements ConnectionRegistry, ClientStateListener {
@Override @Override
public void onClientInBackground() { public void onClientInBackground() {
LOGGER.log(Level.INFO, "CSI: inactive"); LOGGER.log(Level.INFO, "CSI: inactive");
for (MercuryConnection connection : connections.values()) { for (MercuryConnection connection : connectionManager.getConnections()) {
tryCsiInactive(connection); tryCsiInactive(connection);
} }
} }

View file

@ -1,9 +1,9 @@
package org.mercury_im.messenger.usecase; package org.mercury_im.messenger.usecase;
import org.mercury_im.messenger.Messenger;
import org.mercury_im.messenger.data.repository.AccountRepository; import org.mercury_im.messenger.data.repository.AccountRepository;
import org.mercury_im.messenger.entity.Account; import org.mercury_im.messenger.entity.Account;
import org.mercury_im.messenger.xmpp.MercuryConnection; import org.mercury_im.messenger.xmpp.MercuryConnection;
import org.mercury_im.messenger.xmpp.MercuryConnectionManager;
import java.util.logging.Logger; import java.util.logging.Logger;
@ -17,13 +17,13 @@ public class AddAccount {
private static final Logger LOGGER = Logger.getLogger(AddAccount.class.getName()); private static final Logger LOGGER = Logger.getLogger(AddAccount.class.getName());
private final AccountRepository accountRepository; private final AccountRepository accountRepository;
private final Messenger messenger; private final MercuryConnectionManager connectionManager;
private final CompositeDisposable disposable = new CompositeDisposable(); private final CompositeDisposable disposable = new CompositeDisposable();
public AddAccount(AccountRepository accountRepository, Messenger messenger) { public AddAccount(AccountRepository accountRepository, MercuryConnectionManager connectionManager) {
this.accountRepository = accountRepository; this.accountRepository = accountRepository;
this.messenger = messenger; this.connectionManager = connectionManager;
} }
public Completable execute(Account account) { public Completable execute(Account account) {
@ -31,7 +31,6 @@ public class AddAccount {
} }
private Completable loginAndStoreAccountIfSuccessful(Account account) { private Completable loginAndStoreAccountIfSuccessful(Account account) {
return logIntoAccount(account).flatMap(connection -> return logIntoAccount(account).flatMap(connection ->
insertEnabledAccountIntoDatabase(account).flatMap(insertedAccount -> insertEnabledAccountIntoDatabase(account).flatMap(insertedAccount ->
updateAccountIdInConnectionSingle(insertedAccount, connection))) updateAccountIdInConnectionSingle(insertedAccount, connection)))
@ -50,9 +49,9 @@ public class AddAccount {
} }
private MercuryConnection getOrCreateConnection(Account account) { private MercuryConnection getOrCreateConnection(Account account) {
MercuryConnection connection = messenger.getConnection(account); MercuryConnection connection = connectionManager.getConnection(account);
if (connection == null) { if (connection == null) {
connection = new MercuryConnection(account); connection = new MercuryConnection(accountRepository, account);
} }
return connection; return connection;
} }
@ -63,7 +62,7 @@ public class AddAccount {
} }
private Completable addConnectionToMessenger(MercuryConnection connection) { private Completable addConnectionToMessenger(MercuryConnection connection) {
return Completable.fromAction(() -> messenger.addConnection(connection)); return Completable.fromAction(() -> connectionManager.registerConnection(connection));
} }
private Single<MercuryConnection> updateAccountIdInConnectionSingle(Account account, MercuryConnection connection) { private Single<MercuryConnection> updateAccountIdInConnectionSingle(Account account, MercuryConnection connection) {

View file

@ -1,8 +1,9 @@
package org.mercury_im.messenger.usecase; package org.mercury_im.messenger.usecase;
import org.mercury_im.messenger.Messenger; import org.mercury_im.messenger.data.repository.AccountRepository;
import org.mercury_im.messenger.entity.Account; import org.mercury_im.messenger.entity.Account;
import org.mercury_im.messenger.xmpp.MercuryConnection; import org.mercury_im.messenger.xmpp.MercuryConnection;
import org.mercury_im.messenger.xmpp.MercuryConnectionManager;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@ -16,28 +17,30 @@ public class ConnectAccountsOnStartup {
private static final Logger LOGGER = Logger.getLogger(ConnectAccountsOnStartup.class.getName()); private static final Logger LOGGER = Logger.getLogger(ConnectAccountsOnStartup.class.getName());
private final Messenger messenger; private final MercuryConnectionManager connectionManager;
private final AccountRepository accountRepository;
private final List<Account> accounts = new ArrayList<>(); private final List<Account> accounts = new ArrayList<>();
private CompositeDisposable disposable = new CompositeDisposable(); private CompositeDisposable disposable = new CompositeDisposable();
private ConnectAccountsOnStartup(Messenger messenger, List<Account> initialAccounts) { private ConnectAccountsOnStartup(MercuryConnectionManager connectionManager, AccountRepository accountRepository, List<Account> initialAccounts) {
this.messenger = messenger; this.connectionManager = connectionManager;
this.accountRepository = accountRepository;
this.accounts.addAll(initialAccounts); this.accounts.addAll(initialAccounts);
} }
public static ConnectAccountsOnStartup with(Messenger messenger, List<Account> initialAccounts) { public static ConnectAccountsOnStartup with(MercuryConnectionManager connectionManager, AccountRepository accountRepository, List<Account> initialAccounts) {
return new ConnectAccountsOnStartup(messenger, initialAccounts); return new ConnectAccountsOnStartup(connectionManager, accountRepository, initialAccounts);
} }
public void execute() { public void execute() {
for (Account account : accounts) { for (Account account : accounts) {
if (account.isEnabled()) { if (account.isEnabled()) {
MercuryConnection connection = new MercuryConnection(account); MercuryConnection connection = new MercuryConnection(accountRepository, account);
disposable.add(LogIntoAccount.with(connection) disposable.add(LogIntoAccount.with(connection)
.executeAndPossiblyThrow() .executeAndPossiblyThrow()
.subscribeOn(Schedulers.newThread()) .subscribeOn(Schedulers.newThread())
.doOnComplete(() -> messenger.addConnection(connection)) .doOnComplete(() -> connectionManager.registerConnection(connection))
.subscribe( .subscribe(
() -> LOGGER.log(Level.INFO, "Successfully logged into account " + account.getAddress()), () -> LOGGER.log(Level.INFO, "Successfully logged into account " + account.getAddress()),
error -> LOGGER.log(Level.SEVERE, "Error logging into account " + account.getAddress(), error))); error -> LOGGER.log(Level.SEVERE, "Error logging into account " + account.getAddress(), error)));

View file

@ -1,18 +1,73 @@
package org.mercury_im.messenger.xmpp; package org.mercury_im.messenger.xmpp;
import org.jivesoftware.smack.ConnectionListener;
import org.jivesoftware.smack.XMPPConnection; import org.jivesoftware.smack.XMPPConnection;
import org.mercury_im.messenger.data.repository.AccountRepository;
import org.mercury_im.messenger.entity.Account; import org.mercury_im.messenger.entity.Account;
import org.mercury_im.messenger.util.Optional;
import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.subjects.BehaviorSubject;
public class MercuryConnection { public class MercuryConnection {
private static final XmppConnectionFactory connectionFactory = new XmppConnectionFactory(); private static final XmppConnectionFactory connectionFactory = new XmppConnectionFactory();
private final Account account; private final CompositeDisposable disposable = new CompositeDisposable();
private final AccountRepository accountRepository;
private Account account;
private XMPPConnection connection; private XMPPConnection connection;
public MercuryConnection(Account account) { private BehaviorSubject<Boolean> enabled = BehaviorSubject.create();
private BehaviorSubject<ConnectionState> state = BehaviorSubject.createDefault(ConnectionState.closed);
public MercuryConnection(AccountRepository accountRepository, Account account) {
this.accountRepository = accountRepository;
this.account = account; this.account = account;
this.connection = connectionFactory.createConnection(account); this.connection = connectionFactory.createConnection(account);
observeAccountAndConnection();
}
private void observeAccountAndConnection() {
observeAccount();
observeConnection();
}
private void observeAccount() {
disposable.add(accountRepository.observeAccount(account.getId())
.filter(Optional::isPresent)
.map(Optional::getItem)
.subscribe(this::setAccount));
}
private void observeConnection() {
connection.addConnectionListener(new ConnectionListener() {
@Override
public void connected(XMPPConnection connection) {
state.onNext(ConnectionState.connected);
}
@Override
public void authenticated(XMPPConnection connection, boolean resumed) {
state.onNext(ConnectionState.authenticated);
}
@Override
public void connectionClosed() {
state.onNext(ConnectionState.closed);
}
@Override
public void connectionClosedOnError(Exception e) {
state.onNext(ConnectionState.closedOnError);
}
});
}
private void setAccount(Account account) {
this.account = account;
enabled.onNext(account.isEnabled());
} }
public Account getAccount() { public Account getAccount() {
@ -22,4 +77,15 @@ public class MercuryConnection {
public XMPPConnection getConnection() { public XMPPConnection getConnection() {
return connection; return connection;
} }
public void dispose() {
disposable.dispose();
}
public enum ConnectionState {
connected,
authenticated,
closedOnError,
closed
}
} }

View file

@ -1,11 +1,14 @@
package org.mercury_im.messenger.xmpp; package org.mercury_im.messenger.xmpp;
import org.jivesoftware.smack.AbstractXMPPConnection; import org.jivesoftware.smack.AbstractXMPPConnection;
import org.mercury_im.messenger.Messenger;
import org.mercury_im.messenger.data.repository.AccountRepository; import org.mercury_im.messenger.data.repository.AccountRepository;
import org.mercury_im.messenger.entity.Account; import org.mercury_im.messenger.entity.Account;
import org.mercury_im.messenger.usecase.LogIntoAccount; import org.mercury_im.messenger.usecase.LogIntoAccount;
import org.mercury_im.messenger.util.Optional; import org.mercury_im.messenger.util.Optional;
import java.util.ArrayList;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@ -21,17 +24,32 @@ public class MercuryConnectionManager {
private static final Logger LOGGER = Logger.getLogger(MercuryConnectionManager.class.getName()); private static final Logger LOGGER = Logger.getLogger(MercuryConnectionManager.class.getName());
private Map<UUID, MercuryConnection> connections = new ConcurrentHashMap<>(); private final Map<UUID, MercuryConnection> connections = new ConcurrentHashMap<>();
private final AccountRepository accountRepository; private final AccountRepository accountRepository;
private final Messenger messenger;
private final CompositeDisposable disposable = new CompositeDisposable(); private final CompositeDisposable disposable = new CompositeDisposable();
@Inject @Inject
public MercuryConnectionManager(AccountRepository accountRepository) { public MercuryConnectionManager(Messenger messenger, AccountRepository accountRepository) {
this.messenger = messenger;
this.accountRepository = accountRepository; this.accountRepository = accountRepository;
} }
public List<MercuryConnection> getConnections() {
return new ArrayList<>(connections.values());
}
public MercuryConnection getConnection(Account account) {
return getConnection(account.getId());
}
public MercuryConnection getConnection(UUID id) {
return connections.get(id);
}
public void registerConnection(MercuryConnection connection) { public void registerConnection(MercuryConnection connection) {
messenger.bindConnection(connection);
if (connection.getConnection().isAuthenticated()) { if (connection.getConnection().isAuthenticated()) {
registerLiveConnection(connection); registerLiveConnection(connection);
} else { } else {
@ -43,11 +61,17 @@ public class MercuryConnectionManager {
Observable<Optional<Account>> observableAccount = accountRepository Observable<Optional<Account>> observableAccount = accountRepository
.observeAccount(connection.getAccount().getId()); .observeAccount(connection.getAccount().getId());
disposable.add(observableAccount.subscribe(event -> disposable.add(observableAccount.subscribe(event ->
handleAccountChangedEvent(connection, event))); handleOptionalAccountChangedEvent(connection, event)));
} }
private void handleAccountChangedEvent(MercuryConnection connection, Optional<Account> event) { private void registerDeadConnection(MercuryConnection connection) {
if (connection.getAccount().isEnabled()) {
connectionLogin(connection);
}
}
private void handleOptionalAccountChangedEvent(MercuryConnection connection, Optional<Account> event) {
if (event.isPresent()) { if (event.isPresent()) {
handleAccountChangedEvent(connection, event.getItem()); handleAccountChangedEvent(connection, event.getItem());
} else { } else {
@ -64,21 +88,26 @@ public class MercuryConnectionManager {
} }
private void handleAccountDisabled(MercuryConnection connection) { private void handleAccountDisabled(MercuryConnection connection) {
if (connection.getConnection().isAuthenticated()) { disconnectAndRemoveConnection(connection);
shutdownConnection(connection); }
}
private void disconnectAndRemoveConnection(MercuryConnection connection) {
shutdownConnection(connection);
removeConnection(connection); removeConnection(connection);
} }
private void handleAccountEnabled(MercuryConnection connection) { private void handleAccountEnabled(MercuryConnection connection) {
connectionLogin(connection);
}
private void connectionLogin(MercuryConnection connection) {
disposable.add(LogIntoAccount.with(connection).executeAndPossiblyThrow() disposable.add(LogIntoAccount.with(connection).executeAndPossiblyThrow()
.subscribe(() -> LOGGER.log(Level.INFO, "Logged in."), .subscribe(() -> LOGGER.log(Level.INFO, "Logged in."),
error -> LOGGER.log(Level.SEVERE, "Connection error!", error))); error -> LOGGER.log(Level.SEVERE, "Connection error!", error)));
} }
private void handleAccountRemoved(MercuryConnection connection) { private void handleAccountRemoved(MercuryConnection connection) {
shutdownConnection(connection); disconnectAndRemoveConnection(connection);
removeConnection(connection);
} }
private void shutdownConnection(MercuryConnection connection) { private void shutdownConnection(MercuryConnection connection) {
@ -87,14 +116,9 @@ public class MercuryConnectionManager {
} }
} }
private void registerDeadConnection(MercuryConnection connection) {
if (connection.getAccount().isEnabled()) {
}
}
private void removeConnection(MercuryConnection connection) { private void removeConnection(MercuryConnection connection) {
connections.remove(connection.getAccount().getId()); connections.remove(connection.getAccount().getId());
connection.dispose();
} }
} }

View file

@ -53,7 +53,7 @@ public class XmppDirectMessageCenter
this.directChatRepository = repositories.getDirectChatRepository(); this.directChatRepository = repositories.getDirectChatRepository();
this.messageRepository = repositories.getMessageRepository(); this.messageRepository = repositories.getMessageRepository();
XMPPConnection connection = getMessenger().getConnection(account).getConnection(); XMPPConnection connection = getMessenger().getConnectionManager().getConnection(account).getConnection();
ChatManager.getInstanceFor(connection).addIncomingListener(this); ChatManager.getInstanceFor(connection).addIncomingListener(this);
} }
@ -98,7 +98,7 @@ public class XmppDirectMessageCenter
} }
protected ChatManager getChatManager(DirectChat chat) { protected ChatManager getChatManager(DirectChat chat) {
MercuryConnection mercuryConnection = getMessenger().getConnection(chat.getAccount()); MercuryConnection mercuryConnection = getMessenger().getConnectionManager().getConnection(chat.getAccount());
return ChatManager.getInstanceFor(mercuryConnection.getConnection()); return ChatManager.getInstanceFor(mercuryConnection.getConnection());
} }