package org.mercury_im.messenger.xmpp; import org.jivesoftware.smack.AbstractXMPPConnection; import org.mercury_im.messenger.data.repository.AccountRepository; import org.mercury_im.messenger.entity.Account; import org.mercury_im.messenger.usecase.LogIntoAccount; import org.mercury_im.messenger.util.Optional; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.logging.Level; import java.util.logging.Logger; import javax.inject.Inject; import io.reactivex.Observable; import io.reactivex.disposables.CompositeDisposable; public class MercuryConnectionManager { private static final Logger LOGGER = Logger.getLogger(MercuryConnectionManager.class.getName()); private Map connections = new ConcurrentHashMap<>(); private final AccountRepository accountRepository; private final CompositeDisposable disposable = new CompositeDisposable(); @Inject public MercuryConnectionManager(AccountRepository accountRepository) { this.accountRepository = accountRepository; } public void registerConnection(MercuryConnection connection) { if (connection.getConnection().isAuthenticated()) { registerLiveConnection(connection); } else { registerDeadConnection(connection); } } private void registerLiveConnection(MercuryConnection connection) { Observable> observableAccount = accountRepository .observeAccount(connection.getAccount().getId()); disposable.add(observableAccount.subscribe(event -> handleAccountChangedEvent(connection, event))); } private void handleAccountChangedEvent(MercuryConnection connection, Optional event) { if (event.isPresent()) { handleAccountChangedEvent(connection, event.getItem()); } else { handleAccountRemoved(connection); } } private void handleAccountChangedEvent(MercuryConnection connection, Account account) { if (account.isEnabled()) { handleAccountEnabled(connection); } else { handleAccountDisabled(connection); } } private void handleAccountDisabled(MercuryConnection connection) { if (connection.getConnection().isAuthenticated()) { shutdownConnection(connection); } removeConnection(connection); } private void handleAccountEnabled(MercuryConnection connection) { disposable.add(LogIntoAccount.with(connection).executeAndPossiblyThrow() .subscribe(() -> LOGGER.log(Level.INFO, "Logged in."), error -> LOGGER.log(Level.SEVERE, "Connection error!", error))); } private void handleAccountRemoved(MercuryConnection connection) { shutdownConnection(connection); removeConnection(connection); } private void shutdownConnection(MercuryConnection connection) { if (connection.getConnection().isAuthenticated()) { ((AbstractXMPPConnection) connection.getConnection()).instantShutdown(); } } private void registerDeadConnection(MercuryConnection connection) { if (connection.getAccount().isEnabled()) { } } private void removeConnection(MercuryConnection connection) { connections.remove(connection.getAccount().getId()); } }