package org.mercury_im.messenger.core.connection; import org.jivesoftware.smack.chat2.ChatManager; import org.jivesoftware.smackx.caps.EntityCapsManager; import org.mercury_im.messenger.core.SchedulersFacade; import org.mercury_im.messenger.core.connection.state.ConnectionPoolState; import org.mercury_im.messenger.core.connection.state.ConnectionState; import org.mercury_im.messenger.core.crypto.MercuryOpenPgpManager; import org.mercury_im.messenger.core.data.repository.AccountRepository; import org.mercury_im.messenger.core.store.caps.MercuryEntityCapsStore; import org.mercury_im.messenger.core.store.message.MercuryMessageStore; import org.mercury_im.messenger.core.store.message.MercuryMessageStoreFactory; import org.mercury_im.messenger.core.usecase.RosterStoreBinder; import org.mercury_im.messenger.core.util.Optional; import org.mercury_im.messenger.entity.Account; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.logging.Level; import java.util.logging.Logger; import javax.inject.Inject; import javax.inject.Singleton; import io.reactivex.Completable; import io.reactivex.Observable; import io.reactivex.disposables.CompositeDisposable; import io.reactivex.disposables.Disposable; import io.reactivex.functions.BiPredicate; import io.reactivex.schedulers.Schedulers; import io.reactivex.subjects.BehaviorSubject; @Singleton public class MercuryConnectionManager { private static final Logger LOGGER = Logger.getLogger("ConnectionManager"); private final XmppConnectionFactory connectionFactory; private final MercuryMessageStoreFactory messageStoreFactory; private final AccountRepository accountRepository; private final RosterStoreBinder rosterStoreBinder; private final MercuryOpenPgpManager cryptoManager; private final SchedulersFacade schedulers; private final Map connectionsMap = new ConcurrentHashMap<>(); private final Map connectionDisposables = new ConcurrentHashMap<>(); private final BehaviorSubject connectionPoolObservable = BehaviorSubject.createDefault(new ConnectionPoolState()); private final CompositeDisposable disposable = new CompositeDisposable(); static { SmackConfig.staticConfiguration(); } @Inject public MercuryConnectionManager(AccountRepository accountRepository, RosterStoreBinder rosterStoreBinder, MercuryEntityCapsStore entityCapsStore, MercuryMessageStoreFactory messageStoreFactory, XmppConnectionFactory connectionFactory, MercuryOpenPgpManager cryptoManager, SchedulersFacade schedulers) { this.accountRepository = accountRepository; this.rosterStoreBinder = rosterStoreBinder; this.connectionFactory = connectionFactory; this.messageStoreFactory = messageStoreFactory; this.cryptoManager = cryptoManager; this.schedulers = schedulers; EntityCapsManager.setPersistentCache(entityCapsStore); registerNewConnections(); } public synchronized void registerNewConnections() { disposable.add(accountRepository.observeAllAccounts() .distinctUntilChanged((a,b) -> a.size() != b.size()) .subscribeOn(schedulers.getIoScheduler()) .subscribe(this::doRegisterConnections)); } public synchronized List getConnections() { return new ArrayList<>(connectionsMap.values()); } public Observable observeConnectionPool() { return connectionPoolObservable; } public synchronized MercuryConnection getConnection(Account account) { return getConnection(account.getId()); } public synchronized MercuryConnection getConnection(UUID id) { return connectionsMap.get(id); } public MercuryConnection createConnection(Account account) { return new MercuryConnection(connectionFactory.createConnection(account), account); } public synchronized void doRegisterConnections(List accounts) { for (Account account : accounts) { if (!connectionsMap.containsKey(account.getId())) { MercuryConnection connection = createConnection(account); doRegisterConnection(connection); } } } public Completable registerConnection(MercuryConnection connection) { return Completable.fromAction(() -> doRegisterConnection(connection)); } public synchronized void doRegisterConnection(MercuryConnection connection) { LOGGER.log(Level.INFO, "Register Connection " + connection.getAccountId()); putConnection(connection); disposable.add(accountRepository .observeAccount(connection.getAccountId()) .subscribeOn(schedulers.getNewThread()) .observeOn(schedulers.getNewThread()) .distinctUntilChanged(accountNotToggledNorRemoved) .subscribe(event -> handleOptionalAccountChangedEvent(connection, event))); } private BiPredicate, Optional> accountNotToggledNorRemoved = (first, second) -> // Account not removed first.isPresent() == second.isPresent() // If account is not removed check if not toggled && (!first.isPresent() || first.getItem().isEnabled() == second.getItem().isEnabled()); private synchronized void putConnection(MercuryConnection connection) { connectionsMap.put(connection.getAccountId(), connection); connectionDisposables.put(connection.getAccountId(), connection.observeConnection().subscribe(this::insertConnectionToPoolState)); bindConnection(connection); } private void insertConnectionToPoolState(ConnectionState s) { LOGGER.log(Level.INFO, "Insert new connection to pool state: " + s + " " + s.getConnection().getAccount().getJid()); connectionPoolObservable.onNext(updatePoolState(connectionPoolObservable.getValue(), s)); } private synchronized ConnectionPoolState updatePoolState(ConnectionPoolState poolState, ConnectionState conState) { Map states = poolState.getConnectionStates(); states.put(conState.getId(), conState); return new ConnectionPoolState(states); } public synchronized void bindConnection(MercuryConnection connection) { rosterStoreBinder.setRosterStoreOn(connection); disposable.add(accountRepository.getAccount(connection.getAccountId()) .subscribeOn(schedulers.getIoScheduler()) .subscribe(account -> { MercuryMessageStore mercuryMessageStore = messageStoreFactory.createMessageStore(account); ChatManager chatManager = ChatManager.getInstanceFor(connection.getConnection()); chatManager.addIncomingListener(mercuryMessageStore); })); cryptoManager.initialize(connection); } private synchronized void handleOptionalAccountChangedEvent(MercuryConnection connection, Optional event) { if (event.isPresent()) { handleAccountChangedEvent(connection, event.getItem()); } else { handleAccountRemoved(connection); } } private synchronized void handleAccountChangedEvent(MercuryConnection connection, Account account) { LOGGER.log(Level.INFO, "handleAccountChangedEvent: " + account); if (account.isEnabled()) { handleAccountEnabled(connection); } else { handleAccountDisabled(connection); } } private synchronized void handleAccountDisabled(MercuryConnection connection) { LOGGER.log(Level.FINER, "HandleAccountDisabled: " + connection.getAccountId()); disposable.add(connection.shutdown().subscribeOn(Schedulers.newThread()).subscribe()); } private synchronized void handleAccountEnabled(MercuryConnection connection) { LOGGER.log(Level.FINER, "HandleAccountEnabled: " + connection.getAccountId()); connectionLogin(connection); } private synchronized void connectionLogin(MercuryConnection connection) { disposable.add(connection.connect().andThen(connection.login()) .subscribeOn(Schedulers.io()) .subscribe(() -> LOGGER.log(Level.FINER, "Logged in."), error -> LOGGER.log(Level.SEVERE, "Connection error!", error))); } private synchronized void handleAccountRemoved(MercuryConnection connection) { LOGGER.log(Level.FINER, "HandleAccountRemove: " + connection.getAccountId()); disconnectAndRemoveConnection(connection); } private synchronized void disconnectAndRemoveConnection(MercuryConnection connection) { disposable.add(connection.shutdown().subscribeOn(Schedulers.newThread()).subscribe()); removeConnection(connection); } private synchronized void removeConnection(MercuryConnection connection) { LOGGER.log(Level.FINER, "Remove Connection: " + connection.getAccountId()); connectionsMap.remove(connection.getAccountId()); connectionDisposables.remove(connection.getAccountId()).dispose(); removeConnectionFromPoolState(); } private void removeConnectionFromPoolState() { LOGGER.log(Level.INFO, "Remove connection from pool state"); connectionPoolObservable.onNext(updatePoolState(connectionPoolObservable.getValue())); } private synchronized ConnectionPoolState updatePoolState(ConnectionPoolState value) { Map states = value.getConnectionStates(); for (UUID id : connectionsMap.keySet()) { if (!states.containsKey(id)) { states.remove(id); } } return new ConnectionPoolState(states); } public synchronized void doShutdownAllConnections() { for (MercuryConnection connection : getConnections()) { connection.doShutdown(); } } }