package org.mercury_im.messenger.xmpp; import org.jivesoftware.smack.AbstractXMPPConnection; import org.jivesoftware.smack.ReconnectionListener; import org.jivesoftware.smack.ReconnectionManager; import org.jivesoftware.smack.sm.packet.StreamManagement; import org.jivesoftware.smackx.caps.EntityCapsManager; import org.jivesoftware.smackx.iqversion.VersionManager; import org.jivesoftware.smackx.receipts.DeliveryReceiptManager; import org.jivesoftware.smackx.sid.StableUniqueStanzaIdManager; import org.mercury_im.messenger.data.repository.AccountRepository; import org.mercury_im.messenger.data.repository.Repositories; import org.mercury_im.messenger.entity.Account; import org.mercury_im.messenger.store.MercuryEntityCapsStore; import org.mercury_im.messenger.usecase.LogIntoAccount; import org.mercury_im.messenger.usecase.RosterStoreBinder; import org.mercury_im.messenger.util.Optional; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.logging.Level; import java.util.logging.Logger; import javax.inject.Inject; import javax.inject.Singleton; import io.reactivex.Observable; import io.reactivex.disposables.CompositeDisposable; import io.reactivex.schedulers.Schedulers; import io.reactivex.subjects.BehaviorSubject; @Singleton public class MercuryConnectionManager { private static final Logger LOGGER = Logger.getLogger("ConnectionManager"); private final Map connections = new ConcurrentHashMap<>(); private final BehaviorSubject> connectionsSubject = BehaviorSubject.createDefault(connections); private final AccountRepository accountRepository; private final RosterStoreBinder rosterStoreBinder; private final MercuryEntityCapsStore entityCapsStore; private final CompositeDisposable disposable = new CompositeDisposable(); static { ReconnectionManager.setEnabledPerDefault(true); ReconnectionManager.setDefaultReconnectionPolicy(ReconnectionManager.ReconnectionPolicy.RANDOM_INCREASING_DELAY); VersionManager.setAutoAppendSmackVersion(false); VersionManager.setDefaultVersion("Mercury-IM", "0.0.1-little-joe"); DeliveryReceiptManager.setDefaultAutoReceiptMode(DeliveryReceiptManager.AutoReceiptMode.ifIsSubscribed); StableUniqueStanzaIdManager.setEnabledByDefault(true); } @Inject public MercuryConnectionManager(Repositories repositories, RosterStoreBinder rosterStoreBinder, MercuryEntityCapsStore entityCapsStore) { this.accountRepository = repositories.getAccountRepository(); this.rosterStoreBinder = rosterStoreBinder; this.entityCapsStore = entityCapsStore; EntityCapsManager.setPersistentCache(entityCapsStore); } public List getConnections() { return new ArrayList<>(connections.values()); } public Observable> observeConnections() { return connectionsSubject; } public MercuryConnection getConnection(Account account) { return getConnection(account.getId()); } public MercuryConnection getConnection(UUID id) { return connections.get(id); } public void registerConnections(List accounts) { for (Account account : accounts) { MercuryConnection connection = new MercuryConnection(accountRepository, account); registerConnection(connection); } } public void registerConnection(MercuryConnection connection) { LOGGER.log(Level.INFO, "Register Connection " + connection.getAccount().getAddress()); putConnection(connection); disposable.add(accountRepository .observeAccount(connection.getAccount().getId()) .subscribeOn(Schedulers.newThread()) .observeOn(Schedulers.newThread()) .subscribe(event -> handleOptionalAccountChangedEvent(connection, event))); } private void putConnection(MercuryConnection connection) { connections.put(connection.getAccount().getId(), connection); connectionsSubject.onNext(connections); bindConnection(connection); } public void bindConnection(MercuryConnection connection) { rosterStoreBinder.setRosterStoreOn(connection); ReconnectionManager.getInstanceFor((AbstractXMPPConnection) connection.getConnection()) .addReconnectionListener(new ReconnectionListener() { @Override public void reconnectingIn(int seconds) { LOGGER.log(Level.FINER, "Reconnecting connection " + connection.getAccount().getAddress() + " in " + seconds + " seconds."); } @Override public void reconnectionFailed(Exception e) { LOGGER.log(Level.WARNING, "Reconnection of connection " + connection.getAccount().getAddress() + " failed.", e); } }); } private void handleOptionalAccountChangedEvent(MercuryConnection connection, Optional event) { if (event.isPresent()) { handleAccountChangedEvent(connection, event.getItem()); } else { handleAccountRemoved(connection); } } private void handleAccountChangedEvent(MercuryConnection connection, Account account) { if (account.isEnabled()) { handleAccountEnabled(connection); } else { handleAccountDisabled(connection); } } private void handleAccountDisabled(MercuryConnection connection) { LOGGER.log(Level.FINER, "HandleAccountDisabled: " + connection.getAccount().getAddress()); connectionDisconnect(connection); } private void handleAccountEnabled(MercuryConnection connection) { LOGGER.log(Level.FINER, "HandleAccountEnabled: " + connection.getAccount().getAddress()); connectionLogin(connection); } private void connectionLogin(MercuryConnection connection) { disposable.add(LogIntoAccount.with(connection).executeAndPossiblyThrow() .subscribeOn(Schedulers.newThread()) .subscribe(() -> LOGGER.log(Level.FINER, "Logged in."), error -> LOGGER.log(Level.SEVERE, "Connection error!", error))); } private void handleAccountRemoved(MercuryConnection connection) { disconnectAndRemoveConnection(connection); } private void disconnectAndRemoveConnection(MercuryConnection connection) { connectionDisconnect(connection); removeConnection(connection); } private void connectionDisconnect(MercuryConnection connection) { ((AbstractXMPPConnection) connection.getConnection()).disconnect(); } private void removeConnection(MercuryConnection connection) { LOGGER.log(Level.FINER, "Remove Connection: " + connection.getAccount().getAddress()); connections.remove(connection.getAccount().getId()); connectionsSubject.onNext(connections); connection.dispose(); } }