Mercury-IM/core/src/main/java/org/mercury_im/messenger/core/centers/ConnectionCenter.java

154 lines
6.7 KiB
Java
Raw Normal View History

2019-08-25 17:54:03 +02:00
package org.mercury_im.messenger.core.centers;
2019-07-31 22:59:46 +02:00
2019-08-04 04:22:08 +02:00
import org.jivesoftware.smack.AbstractXMPPConnection;
import org.jivesoftware.smack.tcp.XMPPTCPConnection;
import org.jivesoftware.smack.tcp.XMPPTCPConnectionConfiguration;
2019-08-12 00:34:19 +02:00
import org.jivesoftware.smackx.caps.EntityCapsManager;
2019-08-25 17:54:03 +02:00
import org.mercury_im.messenger.core.stores.EntityCapsStore;
import org.mercury_im.messenger.core.connection.MercuryConfiguration;
import org.mercury_im.messenger.core.connection.MercuryConnection;
import org.mercury_im.messenger.core.stores.PlainMessageStore;
import org.mercury_im.messenger.core.stores.RosterStore;
2019-08-04 04:22:08 +02:00
import org.mercury_im.messenger.persistence.model.AccountModel;
2019-08-12 00:34:19 +02:00
import org.mercury_im.messenger.persistence.repository.AccountRepository;
2019-08-19 02:36:31 +02:00
import org.mercury_im.messenger.persistence.repository.RosterRepository;
2019-08-04 04:22:08 +02:00
2019-08-12 00:34:19 +02:00
import java.util.Collections;
2019-07-31 22:59:46 +02:00
import java.util.HashMap;
2019-08-12 17:05:30 +02:00
import java.util.HashSet;
2019-08-12 00:34:19 +02:00
import java.util.List;
2019-07-31 22:59:46 +02:00
import java.util.Map;
2019-08-12 17:05:30 +02:00
import java.util.Set;
2019-08-12 00:34:19 +02:00
import java.util.concurrent.atomic.AtomicBoolean;
2019-08-12 17:05:30 +02:00
import java.util.logging.Level;
import java.util.logging.Logger;
2019-07-31 22:59:46 +02:00
2019-08-10 21:50:03 +02:00
import javax.inject.Inject;
2019-08-03 19:05:50 +02:00
import javax.inject.Singleton;
2019-08-12 00:34:19 +02:00
import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.functions.Consumer;
import io.reactivex.schedulers.Schedulers;
2019-08-03 19:05:50 +02:00
@Singleton
2019-07-31 22:59:46 +02:00
public class ConnectionCenter {
2019-08-12 17:05:30 +02:00
private static final Logger LOGGER = Logger.getLogger(ConnectionCenter.class.getName());
2019-08-12 00:34:19 +02:00
// Injected
private final AccountRepository accountRepository;
2019-08-25 17:54:03 +02:00
private final PlainMessageStore messageStore;
2019-08-12 00:34:19 +02:00
private final EntityCapsStore entityCapsStore;
2019-08-19 02:36:31 +02:00
private final RosterRepository rosterRepository;
2019-07-31 22:59:46 +02:00
2019-08-12 00:34:19 +02:00
// Connections
private final Map<Long, MercuryConnection> connectionMap =
Collections.synchronizedMap(new HashMap<>());
2019-07-31 22:59:46 +02:00
2019-08-12 00:34:19 +02:00
// Disposable for rx
private final CompositeDisposable disposable = new CompositeDisposable();
2019-08-10 21:50:03 +02:00
2019-08-25 17:54:03 +02:00
private final AtomicBoolean isConnectionCenterStarted = new AtomicBoolean(false);
2019-08-12 00:34:19 +02:00
@Inject
2019-08-25 17:54:03 +02:00
public ConnectionCenter(EntityCapsStore capsStore,
PlainMessageStore messageStore,
AccountRepository accountRepository,
RosterRepository rosterRepository) {
2019-08-12 17:05:30 +02:00
LOGGER.log(Level.INFO, "ConnectionCenter initialized");
2019-08-12 00:34:19 +02:00
this.entityCapsStore = capsStore;
2019-08-25 17:54:03 +02:00
this.messageStore = messageStore;
this.accountRepository = accountRepository;
2019-08-19 02:36:31 +02:00
this.rosterRepository = rosterRepository;
2019-07-31 22:59:46 +02:00
2019-08-12 00:34:19 +02:00
EntityCapsManager.setPersistentCache(capsStore);
2019-08-12 17:05:30 +02:00
startUp();
2019-07-31 22:59:46 +02:00
}
2019-08-12 00:34:19 +02:00
/**
* Start up the center by subscribing to changes of the {@link AccountModel accounts} in the
* database. For each new {@link AccountModel} it creates a {@link MercuryConnection} and
* stores it in the {@link #connectionMap}.
*/
@SuppressWarnings("unchecked")
public synchronized void startUp() {
2019-08-25 17:54:03 +02:00
if (isConnectionCenterStarted.getAndSet(true)) {
2019-08-12 00:34:19 +02:00
// already started.
return;
2019-07-31 22:59:46 +02:00
}
2019-08-12 00:34:19 +02:00
// otherwise subscribe to accounts and create connections.
2019-08-19 02:36:31 +02:00
disposable.add(
accountRepository.getAllAccounts()
.observeOn(Schedulers.io())
.subscribeOn(Schedulers.computation())
.subscribe((Consumer<List<? extends AccountModel>>) accounts -> {
LOGGER.log(Level.INFO, "Accounts changed.");
Set<Long> accountIds = new HashSet<>();
// Add missing connections to the map
for (AccountModel account : accounts) {
accountIds.add(account.getId());
if (connectionMap.get(account.getId()) != null) {
continue;
}
2019-08-25 17:54:03 +02:00
2019-08-19 02:36:31 +02:00
LOGGER.log(Level.INFO, "Add new connection " + account.getJid().toString() + " to ConnectionCenter list.");
MercuryConnection connection = createConnection(account);
connectionMap.put(account.getId(), connection);
2019-08-25 17:54:03 +02:00
2019-08-19 02:36:31 +02:00
RosterStore rosterStore = new RosterStore(rosterRepository);
rosterStore.setAccountId(account.getId());
rosterStore.subscribe();
connection.getRoster().setRosterStore(rosterStore);
2019-08-25 17:54:03 +02:00
messageStore.registerForMercuryConnection(connection);
2019-08-19 02:36:31 +02:00
if (account.getEnabled()) {
2019-08-20 01:15:30 +02:00
LOGGER.log(Level.INFO, "Connecting...");
2019-08-19 02:36:31 +02:00
connection.connect();
2019-08-20 01:15:30 +02:00
LOGGER.log(Level.INFO, "Connected!");
} else {
LOGGER.log(Level.INFO, "Account not enabled. Skip.");
2019-08-19 02:36:31 +02:00
}
}
// Remove unwanted connections from the map
for (long connectionId : connectionMap.keySet()) {
if (!accountIds.contains(connectionId)) {
AbstractXMPPConnection con =
(AbstractXMPPConnection) connectionMap.get(connectionId).getConnection();
con.disconnect();
connectionMap.remove(connectionId);
}
}
}));
2019-08-12 00:34:19 +02:00
}
public MercuryConnection getConnection(AccountModel account) {
return getConnection(account.getId());
2019-07-31 22:59:46 +02:00
}
public MercuryConnection getConnection(long accountId) {
return connectionMap.get(accountId);
}
public void putConnection(MercuryConnection connection) {
connectionMap.put(connection.getAccountId(), connection);
}
2019-08-04 04:22:08 +02:00
public MercuryConnection createConnection(AccountModel accountModel) {
2019-08-12 17:05:30 +02:00
LOGGER.log(Level.INFO, "Create Connection for " + accountModel.getJid().toString());
2019-08-04 04:22:08 +02:00
XMPPTCPConnectionConfiguration configuration = XMPPTCPConnectionConfiguration.builder()
.setHost(accountModel.getJid().getDomain().toString())
.setXmppAddressAndPassword(accountModel.getJid(), accountModel.getPassword())
.setConnectTimeout(2 * 60 * 1000)
2019-08-19 02:36:31 +02:00
.setEnabledSSLCiphers(MercuryConfiguration.enabledCiphers)
.setEnabledSSLProtocols(MercuryConfiguration.enabledProtocols)
2019-08-04 04:22:08 +02:00
.build();
AbstractXMPPConnection tcpConnection = new XMPPTCPConnection(configuration);
return new MercuryConnection(tcpConnection, accountModel.getId());
}
2019-07-31 22:59:46 +02:00
}