From dc50bf9468c501272b0661b67f4d823362eef626 Mon Sep 17 00:00:00 2001 From: Paul Schaub Date: Thu, 26 Dec 2019 12:14:52 +0100 Subject: [PATCH] Initial design of MercuryConnectionRegistry --- .../xmpp/MercuryConnectionManager.java | 96 +++++++++++++++++++ 1 file changed, 96 insertions(+) diff --git a/domain/src/main/java/org/mercury_im/messenger/xmpp/MercuryConnectionManager.java b/domain/src/main/java/org/mercury_im/messenger/xmpp/MercuryConnectionManager.java index d32d006..5fc997d 100644 --- a/domain/src/main/java/org/mercury_im/messenger/xmpp/MercuryConnectionManager.java +++ b/domain/src/main/java/org/mercury_im/messenger/xmpp/MercuryConnectionManager.java @@ -1,4 +1,100 @@ package org.mercury_im.messenger.xmpp; +import org.jivesoftware.smack.AbstractXMPPConnection; +import org.mercury_im.messenger.data.repository.AccountRepository; +import org.mercury_im.messenger.entity.Account; +import org.mercury_im.messenger.usecase.LogIntoAccount; +import org.mercury_im.messenger.util.Optional; + +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.logging.Level; +import java.util.logging.Logger; + +import javax.inject.Inject; + +import io.reactivex.Observable; +import io.reactivex.disposables.CompositeDisposable; + public class MercuryConnectionManager { + + private static final Logger LOGGER = Logger.getLogger(MercuryConnectionManager.class.getName()); + + private Map connections = new ConcurrentHashMap<>(); + private final AccountRepository accountRepository; + + private final CompositeDisposable disposable = new CompositeDisposable(); + + @Inject + public MercuryConnectionManager(AccountRepository accountRepository) { + this.accountRepository = accountRepository; + } + + public void registerConnection(MercuryConnection connection) { + if (connection.getConnection().isAuthenticated()) { + registerLiveConnection(connection); + } else { + registerDeadConnection(connection); + } + } + + private void registerLiveConnection(MercuryConnection connection) { + Observable> observableAccount = accountRepository + .observeAccount(connection.getAccount().getId()); + disposable.add(observableAccount.subscribe(event -> + handleAccountChangedEvent(connection, event))); + + } + + private void handleAccountChangedEvent(MercuryConnection connection, Optional event) { + if (event.isPresent()) { + handleAccountChangedEvent(connection, event.getItem()); + } else { + handleAccountRemoved(connection); + } + } + + private void handleAccountChangedEvent(MercuryConnection connection, Account account) { + if (account.isEnabled()) { + handleAccountEnabled(connection); + } else { + handleAccountDisabled(connection); + } + } + + private void handleAccountDisabled(MercuryConnection connection) { + if (connection.getConnection().isAuthenticated()) { + shutdownConnection(connection); + } + removeConnection(connection); + } + + private void handleAccountEnabled(MercuryConnection connection) { + disposable.add(LogIntoAccount.with(connection).executeAndPossiblyThrow() + .subscribe(() -> LOGGER.log(Level.INFO, "Logged in."), + error -> LOGGER.log(Level.SEVERE, "Connection error!", error))); + } + + private void handleAccountRemoved(MercuryConnection connection) { + shutdownConnection(connection); + removeConnection(connection); + } + + private void shutdownConnection(MercuryConnection connection) { + if (connection.getConnection().isAuthenticated()) { + ((AbstractXMPPConnection) connection.getConnection()).instantShutdown(); + } + } + + private void registerDeadConnection(MercuryConnection connection) { + if (connection.getAccount().isEnabled()) { + + } + } + + private void removeConnection(MercuryConnection connection) { + connections.remove(connection.getAccount().getId()); + } + }