package org.mercury_im.messenger.xmpp; import org.jivesoftware.smack.ConnectionListener; import org.jivesoftware.smack.XMPPConnection; import org.mercury_im.messenger.data.repository.AccountRepository; import org.mercury_im.messenger.entity.Account; import org.mercury_im.messenger.util.Optional; import java.util.logging.Level; import java.util.logging.Logger; import io.reactivex.Observable; import io.reactivex.disposables.CompositeDisposable; import io.reactivex.subjects.BehaviorSubject; public class MercuryConnection { private static final Logger LOGGER = Logger.getLogger("MercuryConnection"); private static final XmppConnectionFactory connectionFactory = new XmppConnectionFactory(); private final CompositeDisposable disposable = new CompositeDisposable(); private final AccountRepository accountRepository; private Account account; private XMPPConnection connection; private BehaviorSubject enabled = BehaviorSubject.create(); private BehaviorSubject state = BehaviorSubject.createDefault(ConnectionState.closed); public MercuryConnection(AccountRepository accountRepository, Account account) { this.accountRepository = accountRepository; this.account = account; this.connection = connectionFactory.createConnection(account); observeAccountAndConnection(); } public Observable getState() { return state; } private void observeAccountAndConnection() { observeAccount(); observeConnection(); } private void observeAccount() { disposable.add(accountRepository.observeAccount(account.getId()) .filter(Optional::isPresent) .map(Optional::getItem) .subscribe(this::setAccount)); } private void observeConnection() { connection.addConnectionListener(new ConnectionListener() { @Override public void connected(XMPPConnection connection) { LOGGER.log(Level.FINER, "connected"); state.onNext(ConnectionState.connected); } @Override public void authenticated(XMPPConnection connection, boolean resumed) { LOGGER.log(Level.FINER, "authenticated. resumed? " + resumed); state.onNext(ConnectionState.authenticated); } @Override public void connectionClosed() { LOGGER.log(Level.FINER, "connectionClosed"); state.onNext(ConnectionState.closed); } @Override public void connectionClosedOnError(Exception e) { LOGGER.log(Level.WARNING, "connectionClosedOnError"); state.onNext(ConnectionState.closedOnError); } }); } private void setAccount(Account account) { this.account = account; enabled.onNext(account.isEnabled()); } public Account getAccount() { return account; } public XMPPConnection getConnection() { return connection; } public void dispose() { disposable.dispose(); } public enum ConnectionState { connected, authenticated, closedOnError, closed } }