From f58ad142cc063b543a779bbc16e8c27c680aff43 Mon Sep 17 00:00:00 2001 From: Paul Schaub Date: Tue, 5 May 2020 22:08:04 +0200 Subject: [PATCH] Start incorporating changes from RxConnectionCenter --- .../messenger/xmpp/MercuryConnection.java | 132 +++++++++--------- .../xmpp/MercuryConnectionManager.java | 65 +++++++-- .../xmpp/state/ConnectionPoolState.java | 37 +++++ .../messenger/xmpp/state/ConnectionState.java | 43 ++++++ .../xmpp/state/ConnectivityState.java | 8 ++ 5 files changed, 203 insertions(+), 82 deletions(-) create mode 100644 domain/src/main/java/org/mercury_im/messenger/xmpp/state/ConnectionPoolState.java create mode 100644 domain/src/main/java/org/mercury_im/messenger/xmpp/state/ConnectionState.java create mode 100644 domain/src/main/java/org/mercury_im/messenger/xmpp/state/ConnectivityState.java diff --git a/domain/src/main/java/org/mercury_im/messenger/xmpp/MercuryConnection.java b/domain/src/main/java/org/mercury_im/messenger/xmpp/MercuryConnection.java index 1cadeff..4899dfa 100644 --- a/domain/src/main/java/org/mercury_im/messenger/xmpp/MercuryConnection.java +++ b/domain/src/main/java/org/mercury_im/messenger/xmpp/MercuryConnection.java @@ -1,104 +1,102 @@ package org.mercury_im.messenger.xmpp; +import org.jivesoftware.smack.AbstractXMPPConnection; import org.jivesoftware.smack.ConnectionListener; +import org.jivesoftware.smack.SmackException; import org.jivesoftware.smack.XMPPConnection; +import org.jivesoftware.smack.XMPPException; import org.mercury_im.messenger.data.repository.AccountRepository; import org.mercury_im.messenger.entity.Account; import org.mercury_im.messenger.util.Optional; +import org.mercury_im.messenger.xmpp.state.ConnectionState; +import org.mercury_im.messenger.xmpp.state.ConnectivityState; +import java.io.IOException; +import java.util.UUID; import java.util.logging.Level; import java.util.logging.Logger; import io.reactivex.Observable; import io.reactivex.disposables.CompositeDisposable; import io.reactivex.subjects.BehaviorSubject; +import lombok.Getter; public class MercuryConnection { private static final Logger LOGGER = Logger.getLogger("MercuryConnection"); - private static final XmppConnectionFactory connectionFactory = new XmppConnectionFactory(); - private final CompositeDisposable disposable = new CompositeDisposable(); - private final AccountRepository accountRepository; - - private Account account; + @Getter private XMPPConnection connection; - private BehaviorSubject enabled = BehaviorSubject.create(); - private BehaviorSubject state = BehaviorSubject.createDefault(ConnectionState.closed); + @Getter + private final UUID accountId; - public MercuryConnection(AccountRepository accountRepository, Account account) { - this.accountRepository = accountRepository; - this.account = account; - this.connection = connectionFactory.createConnection(account); - observeAccountAndConnection(); + private final BehaviorSubject state; + + public MercuryConnection(XMPPConnection connection, Account account) { + this.connection = connection; + this.accountId = account.getId(); + + this.state = BehaviorSubject.createDefault(new ConnectionState(accountId, + ConnectivityState.disconnected, false, false)); } - public Observable getState() { + + public Observable observeConnection() { return state; } - private void observeAccountAndConnection() { - observeAccount(); - observeConnection(); + public void connect() throws InterruptedException, XMPPException, SmackException, IOException { + if (getConnection().isConnected()) { + return; + } + ((AbstractXMPPConnection) getConnection()).connect(); } - private void observeAccount() { - disposable.add(accountRepository.observeAccount(account.getId()) - .filter(Optional::isPresent) - .map(Optional::getItem) - .subscribe(this::setAccount)); + public void login() throws InterruptedException, IOException, SmackException, XMPPException { + if (getConnection().isAuthenticated()) { + return; + } + connect(); + ((AbstractXMPPConnection) getConnection()).login(); } - private void observeConnection() { - connection.addConnectionListener(new ConnectionListener() { - @Override - public void connected(XMPPConnection connection) { - LOGGER.log(Level.FINER, "connected"); - state.onNext(ConnectionState.connected); - } + private final ConnectionListener connectionListener = new ConnectionListener() { + @Override + public void connected(XMPPConnection connection) { + state.onNext(state.getValue() + .withConnectivity(ConnectivityState.connected) + .withAuthenticated(false)); + } - @Override - public void authenticated(XMPPConnection connection, boolean resumed) { - LOGGER.log(Level.FINER, "authenticated. resumed? " + resumed); - state.onNext(ConnectionState.authenticated); - } + @Override + public void authenticated(XMPPConnection connection, boolean resumed) { + state.onNext(state.getValue() + .withConnectivity(ConnectivityState.connected) + .withAuthenticated(true) + .withResumed(resumed)); + } - @Override - public void connectionClosed() { - LOGGER.log(Level.FINER, "connectionClosed"); - state.onNext(ConnectionState.closed); - } + @Override + public void connectionClosed() { + state.onNext(state.getValue() + .withConnectivity(ConnectivityState.disconnected) + .withAuthenticated(false)); + } - @Override - public void connectionClosedOnError(Exception e) { - LOGGER.log(Level.WARNING, "connectionClosedOnError"); - state.onNext(ConnectionState.closedOnError); - } - }); - } + @Override + public void connectionClosedOnError(Exception e) { + state.onNext(state.getValue() + .withConnectivity(ConnectivityState.disconnected) + .withAuthenticated(false)); + } + }; - private void setAccount(Account account) { - this.account = account; - enabled.onNext(account.isEnabled()); - } - - public Account getAccount() { - return account; - } - - public XMPPConnection getConnection() { - return connection; - } - - public void dispose() { - disposable.dispose(); - } - - public enum ConnectionState { - connected, - authenticated, - closedOnError, - closed + public void shutdown() { + if (connection.isConnected()) { + ((AbstractXMPPConnection) getConnection()).disconnect(); + } else { + ((AbstractXMPPConnection) getConnection()).instantShutdown(); + } } } diff --git a/domain/src/main/java/org/mercury_im/messenger/xmpp/MercuryConnectionManager.java b/domain/src/main/java/org/mercury_im/messenger/xmpp/MercuryConnectionManager.java index f4df1da..1c808b4 100644 --- a/domain/src/main/java/org/mercury_im/messenger/xmpp/MercuryConnectionManager.java +++ b/domain/src/main/java/org/mercury_im/messenger/xmpp/MercuryConnectionManager.java @@ -15,6 +15,8 @@ import org.mercury_im.messenger.store.MercuryEntityCapsStore; import org.mercury_im.messenger.usecase.LogIntoAccount; import org.mercury_im.messenger.usecase.RosterStoreBinder; import org.mercury_im.messenger.util.Optional; +import org.mercury_im.messenger.xmpp.state.ConnectionPoolState; +import org.mercury_im.messenger.xmpp.state.ConnectionState; import java.util.ArrayList; import java.util.List; @@ -29,6 +31,7 @@ import javax.inject.Singleton; import io.reactivex.Observable; import io.reactivex.disposables.CompositeDisposable; +import io.reactivex.disposables.Disposable; import io.reactivex.schedulers.Schedulers; import io.reactivex.subjects.BehaviorSubject; @@ -36,9 +39,16 @@ import io.reactivex.subjects.BehaviorSubject; public class MercuryConnectionManager { private static final Logger LOGGER = Logger.getLogger("ConnectionManager"); + private static final XmppConnectionFactory connectionFactory = new XmppConnectionFactory(); private final Map connections = new ConcurrentHashMap<>(); - private final BehaviorSubject> connectionsSubject = BehaviorSubject.createDefault(connections); + private final Map connectionDisposables = new ConcurrentHashMap<>(); + + private final BehaviorSubject stateObservable = + BehaviorSubject.createDefault(new ConnectionPoolState()); + + private final BehaviorSubject> connectionsSubject = + BehaviorSubject.createDefault(connections); private final AccountRepository accountRepository; @@ -78,6 +88,11 @@ public class MercuryConnectionManager { return connectionsSubject; } + public Observable observeConnectionPoolState() { + return stateObservable; + } + + public MercuryConnection getConnection(Account account) { return getConnection(account.getId()); } @@ -88,16 +103,19 @@ public class MercuryConnectionManager { public void registerConnections(List accounts) { for (Account account : accounts) { - MercuryConnection connection = new MercuryConnection(accountRepository, account); - registerConnection(connection); + if (!connections.containsKey(account.getId())) { + MercuryConnection connection = new MercuryConnection( + connectionFactory.createConnection(account), account); + registerConnection(connection); + } } } public void registerConnection(MercuryConnection connection) { - LOGGER.log(Level.INFO, "Register Connection " + connection.getAccount().getAddress()); + LOGGER.log(Level.INFO, "Register Connection " + connection.getAccountId()); putConnection(connection); disposable.add(accountRepository - .observeAccount(connection.getAccount().getId()) + .observeAccount(connection.getAccountId()) .subscribeOn(Schedulers.newThread()) .observeOn(Schedulers.newThread()) .subscribe(event -> @@ -105,23 +123,30 @@ public class MercuryConnectionManager { } private void putConnection(MercuryConnection connection) { - connections.put(connection.getAccount().getId(), connection); - connectionsSubject.onNext(connections); + connections.put(connection.getAccountId(), connection); + connectionDisposables.put(connection.getAccountId(), connection.observeConnection().subscribe(s -> + stateObservable.onNext(updatePoolState(stateObservable.getValue(), s)))); bindConnection(connection); } + private ConnectionPoolState updatePoolState(ConnectionPoolState poolState, ConnectionState conState) { + Map states = poolState.getConnectionStates(); + states.put(conState.getId(), conState); + return new ConnectionPoolState(states); + } + public void bindConnection(MercuryConnection connection) { rosterStoreBinder.setRosterStoreOn(connection); ReconnectionManager.getInstanceFor((AbstractXMPPConnection) connection.getConnection()) .addReconnectionListener(new ReconnectionListener() { @Override public void reconnectingIn(int seconds) { - LOGGER.log(Level.FINER, "Reconnecting connection " + connection.getAccount().getAddress() + " in " + seconds + " seconds."); + LOGGER.log(Level.FINER, "Reconnecting connection " + connection.getAccountId() + " in " + seconds + " seconds."); } @Override public void reconnectionFailed(Exception e) { - LOGGER.log(Level.WARNING, "Reconnection of connection " + connection.getAccount().getAddress() + " failed.", e); + LOGGER.log(Level.WARNING, "Reconnection of connection " + connection.getAccountId() + " failed.", e); } }); } @@ -143,12 +168,12 @@ public class MercuryConnectionManager { } private void handleAccountDisabled(MercuryConnection connection) { - LOGGER.log(Level.FINER, "HandleAccountDisabled: " + connection.getAccount().getAddress()); + LOGGER.log(Level.FINER, "HandleAccountDisabled: " + connection.getAccountId()); connectionDisconnect(connection); } private void handleAccountEnabled(MercuryConnection connection) { - LOGGER.log(Level.FINER, "HandleAccountEnabled: " + connection.getAccount().getAddress()); + LOGGER.log(Level.FINER, "HandleAccountEnabled: " + connection.getAccountId()); connectionLogin(connection); } @@ -173,10 +198,20 @@ public class MercuryConnectionManager { } private void removeConnection(MercuryConnection connection) { - LOGGER.log(Level.FINER, "Remove Connection: " + connection.getAccount().getAddress()); - connections.remove(connection.getAccount().getId()); - connectionsSubject.onNext(connections); - connection.dispose(); + LOGGER.log(Level.FINER, "Remove Connection: " + connection.getAccountId()); + connections.remove(connection.getAccountId()); + connectionDisposables.remove(connection.getAccountId()).dispose(); + stateObservable.onNext(updatePoolState(stateObservable.getValue())); + } + + private ConnectionPoolState updatePoolState(ConnectionPoolState value) { + Map states = value.getConnectionStates(); + for (UUID id : connections.keySet()) { + if (!states.containsKey(id)) { + states.remove(id); + } + } + return new ConnectionPoolState(states); } } diff --git a/domain/src/main/java/org/mercury_im/messenger/xmpp/state/ConnectionPoolState.java b/domain/src/main/java/org/mercury_im/messenger/xmpp/state/ConnectionPoolState.java new file mode 100644 index 0000000..189e8d6 --- /dev/null +++ b/domain/src/main/java/org/mercury_im/messenger/xmpp/state/ConnectionPoolState.java @@ -0,0 +1,37 @@ +package org.mercury_im.messenger.xmpp.state; + +import java.util.Iterator; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; + +public class ConnectionPoolState { + + private final Map connectionStates; + + public ConnectionPoolState() { + this(new ConcurrentHashMap<>()); + } + + public ConnectionPoolState(Map connectionStates) { + this.connectionStates = connectionStates; + } + + public Map getConnectionStates() { + return connectionStates; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder("["); + Iterator iterator = connectionStates.values().iterator(); + while (iterator.hasNext()) { + sb.append(iterator.next().toString()); + if (iterator.hasNext()) { + sb.append(","); + } + } + sb.append("]"); + return sb.toString(); + } +} diff --git a/domain/src/main/java/org/mercury_im/messenger/xmpp/state/ConnectionState.java b/domain/src/main/java/org/mercury_im/messenger/xmpp/state/ConnectionState.java new file mode 100644 index 0000000..cc6b862 --- /dev/null +++ b/domain/src/main/java/org/mercury_im/messenger/xmpp/state/ConnectionState.java @@ -0,0 +1,43 @@ +package org.mercury_im.messenger.xmpp.state; + +import java.util.UUID; + +import lombok.ToString; +import lombok.Value; +import lombok.With; + +@Value +@ToString +public class ConnectionState { + + UUID id; + @With + ConnectivityState connectivity; + @With + boolean authenticated; + @With + boolean resumed; + + public ConnectionState(UUID id, ConnectivityState connectivity, boolean authenticated, boolean resumed) { + this.id = id; + this.connectivity = connectivity; + this.authenticated = authenticated; + this.resumed = resumed; + } + + public UUID getId() { + return id; + } + + public ConnectivityState getConnectivity() { + return connectivity; + } + + public boolean isAuthenticated() { + return authenticated; + } + + public boolean isResumed() { + return resumed; + } +} diff --git a/domain/src/main/java/org/mercury_im/messenger/xmpp/state/ConnectivityState.java b/domain/src/main/java/org/mercury_im/messenger/xmpp/state/ConnectivityState.java new file mode 100644 index 0000000..8ff820e --- /dev/null +++ b/domain/src/main/java/org/mercury_im/messenger/xmpp/state/ConnectivityState.java @@ -0,0 +1,8 @@ +package org.mercury_im.messenger.xmpp.state; + +public enum ConnectivityState { + disconnected, + connecting, + connected, + disconnecting +}