package org.mercury_im.messenger.core.connection; import org.jivesoftware.smack.AbstractXMPPConnection; import org.jivesoftware.smack.ConnectionListener; import org.jivesoftware.smack.ReconnectionListener; import org.jivesoftware.smack.ReconnectionManager; import org.jivesoftware.smack.SmackException; import org.jivesoftware.smack.XMPPConnection; import org.jivesoftware.smack.XMPPException; import org.jivesoftware.smack.sasl.SASLErrorException; import org.mercury_im.messenger.core.connection.exception.InvalidCredentialsException; import org.mercury_im.messenger.core.connection.exception.ServerUnreachableException; import org.mercury_im.messenger.core.connection.state.ConnectionState; import org.mercury_im.messenger.core.connection.state.ConnectivityState; import org.mercury_im.messenger.entity.Account; import java.io.IOException; import java.util.UUID; import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Level; import java.util.logging.Logger; import io.reactivex.Completable; import io.reactivex.Observable; import io.reactivex.subjects.BehaviorSubject; import lombok.Getter; public class MercuryConnection { private static final Logger LOGGER = Logger.getLogger(MercuryConnection.class.getName()); @Getter private XMPPConnection connection; @Getter private final Account account; private final MercuryConnectionListener connectionListener; public MercuryConnection(XMPPConnection connection, Account account) { this.connection = connection; this.account = account; if (connection.isConnected()) { throw new IllegalStateException("Connection" + account.getJid() + " MUST NOT be connected at this point."); } connectionListener = new MercuryConnectionListener(account.getId(), this); connection.addConnectionListener(connectionListener); initialConnectionSetup(); } public UUID getAccountId() { return getAccount().getId(); } public Observable observeConnection() { return connectionListener.connectionState; } public Completable connect() { return Completable.fromAction(this::doConnect) .doOnError(error -> LOGGER.log(Level.WARNING, "connect(): Connection error for account " + account, error)); } private synchronized void doConnect() throws ServerUnreachableException { ConnectivityState connectivity = connectionListener.connectivity.get(); if (connectivity != ConnectivityState.disconnected) { LOGGER.log(Level.WARNING, "doConnect(): Connection " + account.getJid() + " is not disconnected: " + connectivity); return; } AbstractXMPPConnection connection = (AbstractXMPPConnection) getConnection(); try { connection.connect(); } catch (SmackException.EndpointConnectionException e) { connection.disconnect(); throw new ServerUnreachableException("doConnect(): Cannot connect to server " + connection.getXMPPServiceDomain().asUnescapedString(), e); } catch (IOException | InterruptedException | XMPPException | SmackException e) { throw new AssertionError("Unexpected exception.", e); } LOGGER.log(Level.INFO, "Connected!"); } public Completable login() { return Completable.fromAction(this::doLogin) .doOnError(error -> LOGGER.log(Level.WARNING, "Login error for account " + account, error)); } private synchronized void doLogin() throws InvalidCredentialsException { ConnectivityState connectivity = connectionListener.connectivity.get(); if (connectivity != ConnectivityState.connected) { LOGGER.log(Level.WARNING, "doLogin(): Connection " + account.getJid() + " is not connected: " + connectivity); } try { ((AbstractXMPPConnection) getConnection()).login(); } catch (SASLErrorException e) { throw new InvalidCredentialsException("Credentials of account " + account.getId() + " are invalid.", e); } catch (InterruptedException | XMPPException | SmackException | IOException e) { throw new AssertionError("Unexpected exception.", e); } } public Completable shutdown() { return Completable.fromAction(this::doShutdown) .doOnError(error -> LOGGER.log(Level.WARNING, "Shutdown error for account " + account.getJid(), error)); } public synchronized void doShutdown() { if (connection.isConnected()) { ((AbstractXMPPConnection) getConnection()).disconnect(); } else { ((AbstractXMPPConnection) getConnection()).instantShutdown(); } } private void initialConnectionSetup() { ReconnectionManager.getInstanceFor((AbstractXMPPConnection) getConnection()) .addReconnectionListener(new ReconnectionListener() { @Override public void reconnectingIn(int seconds) { LOGGER.log(Level.FINER, "Reconnecting connection " + getAccount().getAddress() + " in " + seconds + " seconds."); } @Override public void reconnectionFailed(Exception e) { LOGGER.log(Level.WARNING, "Reconnection of connection " + getAccount().getAddress() + " failed.", e); } }); } private class MercuryConnectionListener implements ConnectionListener { private final AtomicReference connectivity; private final BehaviorSubject connectionState; public MercuryConnectionListener(UUID accountId, MercuryConnection connection) { this.connectionState = BehaviorSubject.createDefault(new ConnectionState(accountId, connection)); this.connectivity = new AtomicReference<>(ConnectivityState.disconnected); } @Override public void connecting(XMPPConnection connection) { changeConnectivity(ConnectivityState.connecting); } @Override public void connected(XMPPConnection connection) { changeConnectivity(ConnectivityState.connected); } @Override public void authenticated(XMPPConnection connection, boolean resumed) { connectivity.set(ConnectivityState.authenticated); ConnectionState state = connectionState.getValue() .withConnectivity(ConnectivityState.authenticated) .withResumed(resumed); connectionState.onNext(state); } @Override public void connectionClosed() { changeConnectivity(ConnectivityState.disconnected); } @Override public void connectionClosedOnError(Exception e) { changeConnectivity(ConnectivityState.disconnectedOnError); } private void changeConnectivity(ConnectivityState newConnectivity) { connectivity.set(newConnectivity); ConnectionState state = connectionState.getValue() .withConnectivity(newConnectivity); connectionState.onNext(state); } } }