package org.mercury_im.messenger.core.xmpp; import org.jivesoftware.smack.AbstractXMPPConnection; import org.jivesoftware.smack.ConnectionListener; import org.jivesoftware.smack.ReconnectionListener; import org.jivesoftware.smack.ReconnectionManager; import org.jivesoftware.smack.SmackException; import org.jivesoftware.smack.XMPPConnection; import org.jivesoftware.smack.XMPPException; import org.jivesoftware.smack.sasl.SASLErrorException; import org.mercury_im.messenger.core.xmpp.exception.InvalidCredentialsException; import org.mercury_im.messenger.core.xmpp.exception.ServerUnreachableException; import org.mercury_im.messenger.core.xmpp.state.ConnectivityState; import org.mercury_im.messenger.entity.Account; import org.mercury_im.messenger.core.xmpp.state.ConnectionState; import java.io.IOException; import java.util.UUID; import java.util.logging.Level; import java.util.logging.Logger; import io.reactivex.Completable; import io.reactivex.Observable; import io.reactivex.subjects.BehaviorSubject; import lombok.Getter; public class MercuryConnection { private static final Logger LOGGER = Logger.getLogger("MercuryConnection"); @Getter private XMPPConnection connection; @Getter private final Account account; private final BehaviorSubject state; public MercuryConnection(XMPPConnection connection, Account account) { this.connection = connection; this.account = account; this.state = BehaviorSubject.createDefault(new ConnectionState(account.getId(), this, ConnectivityState.disconnected, false, false)); connection.addConnectionListener(connectionListener); } public UUID getAccountId() { return getAccount().getId(); } public Observable observeConnection() { return state; } public Completable connect() { return Completable.fromAction(this::doConnect) .doOnError(error -> LOGGER.log(Level.WARNING, "Connection error for account " + account, error)); } private void doConnect() throws ServerUnreachableException { state.onNext(state.getValue().withConnectivity(ConnectivityState.connecting)); AbstractXMPPConnection connection = (AbstractXMPPConnection) getConnection(); if (connection.isConnected()) { return; } try { LOGGER.log(Level.INFO, "Connected!"); connection.connect(); } catch (SmackException.EndpointConnectionException e) { connection.disconnect(); throw new ServerUnreachableException("Cannot connect to server " + connection.getXMPPServiceDomain().asUnescapedString(), e); } catch (IOException | InterruptedException | XMPPException | SmackException e) { throw new AssertionError("Unexpected exception.", e); } } public Completable login() { return Completable.fromAction(this::doLogin) .doOnError(error -> LOGGER.log(Level.WARNING, "Login error for account " + account, error)); } private void doLogin() throws InvalidCredentialsException { if (connection.isAuthenticated()) { return; } try { ((AbstractXMPPConnection) getConnection()).login(); } catch (SASLErrorException e) { throw new InvalidCredentialsException("Credentials of account " + account.getId() + " are invalid.", e); } catch (InterruptedException | XMPPException | SmackException | IOException e) { throw new AssertionError("Unexpected exception.", e); } } public Completable shutdown() { return Completable.fromAction(this::doShutdown) .doOnError(error -> LOGGER.log(Level.WARNING, "Shutdown error for account " + account, error)); } public void doShutdown() { if (connection.isConnected()) { ((AbstractXMPPConnection) getConnection()).disconnect(); } else { ((AbstractXMPPConnection) getConnection()).instantShutdown(); } } private final ConnectionListener connectionListener = new ConnectionListener() { @Override public void connected(XMPPConnection connection) { state.onNext(state.getValue() .withConnectivity(ConnectivityState.connected) .withAuthenticated(false)); } @Override public void authenticated(XMPPConnection connection, boolean resumed) { state.onNext(state.getValue() .withConnectivity(ConnectivityState.connected) .withAuthenticated(true) .withResumed(resumed)); if (!resumed) { initialConnectionSetup(); } } @Override public void connectionClosed() { state.onNext(state.getValue() .withConnectivity(ConnectivityState.disconnected) .withAuthenticated(false)); } @Override public void connectionClosedOnError(Exception e) { state.onNext(state.getValue() .withConnectivity(ConnectivityState.disconnected) .withAuthenticated(false)); } }; private void initialConnectionSetup() { ReconnectionManager.getInstanceFor((AbstractXMPPConnection) getConnection()) .addReconnectionListener(new ReconnectionListener() { @Override public void reconnectingIn(int seconds) { LOGGER.log(Level.FINER, "Reconnecting connection " + getAccount().getAddress() + " in " + seconds + " seconds."); } @Override public void reconnectionFailed(Exception e) { LOGGER.log(Level.WARNING, "Reconnection of connection " + getAccount().getAddress() + " failed.", e); } }); } }