Start incorporating changes from RxConnectionCenter

This commit is contained in:
Paul Schaub 2020-05-05 22:08:04 +02:00
parent 7e1c9a7bdc
commit f58ad142cc
Signed by: vanitasvitae
GPG key ID: 62BEE9264BF17311
5 changed files with 203 additions and 82 deletions

View file

@ -1,104 +1,102 @@
package org.mercury_im.messenger.xmpp;
import org.jivesoftware.smack.AbstractXMPPConnection;
import org.jivesoftware.smack.ConnectionListener;
import org.jivesoftware.smack.SmackException;
import org.jivesoftware.smack.XMPPConnection;
import org.jivesoftware.smack.XMPPException;
import org.mercury_im.messenger.data.repository.AccountRepository;
import org.mercury_im.messenger.entity.Account;
import org.mercury_im.messenger.util.Optional;
import org.mercury_im.messenger.xmpp.state.ConnectionState;
import org.mercury_im.messenger.xmpp.state.ConnectivityState;
import java.io.IOException;
import java.util.UUID;
import java.util.logging.Level;
import java.util.logging.Logger;
import io.reactivex.Observable;
import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.subjects.BehaviorSubject;
import lombok.Getter;
public class MercuryConnection {
private static final Logger LOGGER = Logger.getLogger("MercuryConnection");
private static final XmppConnectionFactory connectionFactory = new XmppConnectionFactory();
private final CompositeDisposable disposable = new CompositeDisposable();
private final AccountRepository accountRepository;
private Account account;
@Getter
private XMPPConnection connection;
private BehaviorSubject<Boolean> enabled = BehaviorSubject.create();
private BehaviorSubject<ConnectionState> state = BehaviorSubject.createDefault(ConnectionState.closed);
@Getter
private final UUID accountId;
public MercuryConnection(AccountRepository accountRepository, Account account) {
this.accountRepository = accountRepository;
this.account = account;
this.connection = connectionFactory.createConnection(account);
observeAccountAndConnection();
private final BehaviorSubject<ConnectionState> state;
public MercuryConnection(XMPPConnection connection, Account account) {
this.connection = connection;
this.accountId = account.getId();
this.state = BehaviorSubject.createDefault(new ConnectionState(accountId,
ConnectivityState.disconnected, false, false));
}
public Observable<ConnectionState> getState() {
public Observable<ConnectionState> observeConnection() {
return state;
}
private void observeAccountAndConnection() {
observeAccount();
observeConnection();
public void connect() throws InterruptedException, XMPPException, SmackException, IOException {
if (getConnection().isConnected()) {
return;
}
((AbstractXMPPConnection) getConnection()).connect();
}
private void observeAccount() {
disposable.add(accountRepository.observeAccount(account.getId())
.filter(Optional::isPresent)
.map(Optional::getItem)
.subscribe(this::setAccount));
public void login() throws InterruptedException, IOException, SmackException, XMPPException {
if (getConnection().isAuthenticated()) {
return;
}
connect();
((AbstractXMPPConnection) getConnection()).login();
}
private void observeConnection() {
connection.addConnectionListener(new ConnectionListener() {
@Override
public void connected(XMPPConnection connection) {
LOGGER.log(Level.FINER, "connected");
state.onNext(ConnectionState.connected);
}
private final ConnectionListener connectionListener = new ConnectionListener() {
@Override
public void connected(XMPPConnection connection) {
state.onNext(state.getValue()
.withConnectivity(ConnectivityState.connected)
.withAuthenticated(false));
}
@Override
public void authenticated(XMPPConnection connection, boolean resumed) {
LOGGER.log(Level.FINER, "authenticated. resumed? " + resumed);
state.onNext(ConnectionState.authenticated);
}
@Override
public void authenticated(XMPPConnection connection, boolean resumed) {
state.onNext(state.getValue()
.withConnectivity(ConnectivityState.connected)
.withAuthenticated(true)
.withResumed(resumed));
}
@Override
public void connectionClosed() {
LOGGER.log(Level.FINER, "connectionClosed");
state.onNext(ConnectionState.closed);
}
@Override
public void connectionClosed() {
state.onNext(state.getValue()
.withConnectivity(ConnectivityState.disconnected)
.withAuthenticated(false));
}
@Override
public void connectionClosedOnError(Exception e) {
LOGGER.log(Level.WARNING, "connectionClosedOnError");
state.onNext(ConnectionState.closedOnError);
}
});
}
@Override
public void connectionClosedOnError(Exception e) {
state.onNext(state.getValue()
.withConnectivity(ConnectivityState.disconnected)
.withAuthenticated(false));
}
};
private void setAccount(Account account) {
this.account = account;
enabled.onNext(account.isEnabled());
}
public Account getAccount() {
return account;
}
public XMPPConnection getConnection() {
return connection;
}
public void dispose() {
disposable.dispose();
}
public enum ConnectionState {
connected,
authenticated,
closedOnError,
closed
public void shutdown() {
if (connection.isConnected()) {
((AbstractXMPPConnection) getConnection()).disconnect();
} else {
((AbstractXMPPConnection) getConnection()).instantShutdown();
}
}
}

View file

@ -15,6 +15,8 @@ import org.mercury_im.messenger.store.MercuryEntityCapsStore;
import org.mercury_im.messenger.usecase.LogIntoAccount;
import org.mercury_im.messenger.usecase.RosterStoreBinder;
import org.mercury_im.messenger.util.Optional;
import org.mercury_im.messenger.xmpp.state.ConnectionPoolState;
import org.mercury_im.messenger.xmpp.state.ConnectionState;
import java.util.ArrayList;
import java.util.List;
@ -29,6 +31,7 @@ import javax.inject.Singleton;
import io.reactivex.Observable;
import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.subjects.BehaviorSubject;
@ -36,9 +39,16 @@ import io.reactivex.subjects.BehaviorSubject;
public class MercuryConnectionManager {
private static final Logger LOGGER = Logger.getLogger("ConnectionManager");
private static final XmppConnectionFactory connectionFactory = new XmppConnectionFactory();
private final Map<UUID, MercuryConnection> connections = new ConcurrentHashMap<>();
private final BehaviorSubject<Map<UUID, MercuryConnection>> connectionsSubject = BehaviorSubject.createDefault(connections);
private final Map<UUID, Disposable> connectionDisposables = new ConcurrentHashMap<>();
private final BehaviorSubject<ConnectionPoolState> stateObservable =
BehaviorSubject.createDefault(new ConnectionPoolState());
private final BehaviorSubject<Map<UUID, MercuryConnection>> connectionsSubject =
BehaviorSubject.createDefault(connections);
private final AccountRepository accountRepository;
@ -78,6 +88,11 @@ public class MercuryConnectionManager {
return connectionsSubject;
}
public Observable<ConnectionPoolState> observeConnectionPoolState() {
return stateObservable;
}
public MercuryConnection getConnection(Account account) {
return getConnection(account.getId());
}
@ -88,16 +103,19 @@ public class MercuryConnectionManager {
public void registerConnections(List<Account> accounts) {
for (Account account : accounts) {
MercuryConnection connection = new MercuryConnection(accountRepository, account);
registerConnection(connection);
if (!connections.containsKey(account.getId())) {
MercuryConnection connection = new MercuryConnection(
connectionFactory.createConnection(account), account);
registerConnection(connection);
}
}
}
public void registerConnection(MercuryConnection connection) {
LOGGER.log(Level.INFO, "Register Connection " + connection.getAccount().getAddress());
LOGGER.log(Level.INFO, "Register Connection " + connection.getAccountId());
putConnection(connection);
disposable.add(accountRepository
.observeAccount(connection.getAccount().getId())
.observeAccount(connection.getAccountId())
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.newThread())
.subscribe(event ->
@ -105,23 +123,30 @@ public class MercuryConnectionManager {
}
private void putConnection(MercuryConnection connection) {
connections.put(connection.getAccount().getId(), connection);
connectionsSubject.onNext(connections);
connections.put(connection.getAccountId(), connection);
connectionDisposables.put(connection.getAccountId(), connection.observeConnection().subscribe(s ->
stateObservable.onNext(updatePoolState(stateObservable.getValue(), s))));
bindConnection(connection);
}
private ConnectionPoolState updatePoolState(ConnectionPoolState poolState, ConnectionState conState) {
Map<UUID, ConnectionState> states = poolState.getConnectionStates();
states.put(conState.getId(), conState);
return new ConnectionPoolState(states);
}
public void bindConnection(MercuryConnection connection) {
rosterStoreBinder.setRosterStoreOn(connection);
ReconnectionManager.getInstanceFor((AbstractXMPPConnection) connection.getConnection())
.addReconnectionListener(new ReconnectionListener() {
@Override
public void reconnectingIn(int seconds) {
LOGGER.log(Level.FINER, "Reconnecting connection " + connection.getAccount().getAddress() + " in " + seconds + " seconds.");
LOGGER.log(Level.FINER, "Reconnecting connection " + connection.getAccountId() + " in " + seconds + " seconds.");
}
@Override
public void reconnectionFailed(Exception e) {
LOGGER.log(Level.WARNING, "Reconnection of connection " + connection.getAccount().getAddress() + " failed.", e);
LOGGER.log(Level.WARNING, "Reconnection of connection " + connection.getAccountId() + " failed.", e);
}
});
}
@ -143,12 +168,12 @@ public class MercuryConnectionManager {
}
private void handleAccountDisabled(MercuryConnection connection) {
LOGGER.log(Level.FINER, "HandleAccountDisabled: " + connection.getAccount().getAddress());
LOGGER.log(Level.FINER, "HandleAccountDisabled: " + connection.getAccountId());
connectionDisconnect(connection);
}
private void handleAccountEnabled(MercuryConnection connection) {
LOGGER.log(Level.FINER, "HandleAccountEnabled: " + connection.getAccount().getAddress());
LOGGER.log(Level.FINER, "HandleAccountEnabled: " + connection.getAccountId());
connectionLogin(connection);
}
@ -173,10 +198,20 @@ public class MercuryConnectionManager {
}
private void removeConnection(MercuryConnection connection) {
LOGGER.log(Level.FINER, "Remove Connection: " + connection.getAccount().getAddress());
connections.remove(connection.getAccount().getId());
connectionsSubject.onNext(connections);
connection.dispose();
LOGGER.log(Level.FINER, "Remove Connection: " + connection.getAccountId());
connections.remove(connection.getAccountId());
connectionDisposables.remove(connection.getAccountId()).dispose();
stateObservable.onNext(updatePoolState(stateObservable.getValue()));
}
private ConnectionPoolState updatePoolState(ConnectionPoolState value) {
Map<UUID, ConnectionState> states = value.getConnectionStates();
for (UUID id : connections.keySet()) {
if (!states.containsKey(id)) {
states.remove(id);
}
}
return new ConnectionPoolState(states);
}
}

View file

@ -0,0 +1,37 @@
package org.mercury_im.messenger.xmpp.state;
import java.util.Iterator;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
public class ConnectionPoolState {
private final Map<UUID, ConnectionState> connectionStates;
public ConnectionPoolState() {
this(new ConcurrentHashMap<>());
}
public ConnectionPoolState(Map<UUID, ConnectionState> connectionStates) {
this.connectionStates = connectionStates;
}
public Map<UUID, ConnectionState> getConnectionStates() {
return connectionStates;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder("[");
Iterator<ConnectionState> iterator = connectionStates.values().iterator();
while (iterator.hasNext()) {
sb.append(iterator.next().toString());
if (iterator.hasNext()) {
sb.append(",");
}
}
sb.append("]");
return sb.toString();
}
}

View file

@ -0,0 +1,43 @@
package org.mercury_im.messenger.xmpp.state;
import java.util.UUID;
import lombok.ToString;
import lombok.Value;
import lombok.With;
@Value
@ToString
public class ConnectionState {
UUID id;
@With
ConnectivityState connectivity;
@With
boolean authenticated;
@With
boolean resumed;
public ConnectionState(UUID id, ConnectivityState connectivity, boolean authenticated, boolean resumed) {
this.id = id;
this.connectivity = connectivity;
this.authenticated = authenticated;
this.resumed = resumed;
}
public UUID getId() {
return id;
}
public ConnectivityState getConnectivity() {
return connectivity;
}
public boolean isAuthenticated() {
return authenticated;
}
public boolean isResumed() {
return resumed;
}
}

View file

@ -0,0 +1,8 @@
package org.mercury_im.messenger.xmpp.state;
public enum ConnectivityState {
disconnected,
connecting,
connected,
disconnecting
}