package org.mercury_im.messenger.core.store.roster; import org.jivesoftware.smack.roster.packet.RosterPacket; import org.jivesoftware.smack.roster.rosterstore.RosterStore; import org.jxmpp.jid.Jid; import org.jxmpp.jid.impl.JidCreate; import org.mercury_im.messenger.core.SchedulersFacade; import org.mercury_im.messenger.core.data.repository.AccountRepository; import org.mercury_im.messenger.core.data.repository.PeerRepository; import org.mercury_im.messenger.entity.Account; import org.mercury_im.messenger.entity.contact.Peer; import org.mercury_im.messenger.entity.contact.SubscriptionDirection; import org.mercury_im.messenger.core.util.Optional; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; import java.util.logging.Level; import java.util.logging.Logger; import io.reactivex.disposables.CompositeDisposable; import io.reactivex.schedulers.Schedulers; import io.reactivex.subjects.BehaviorSubject; public class MercuryRosterStore implements RosterStore { private static final Logger LOGGER = Logger.getLogger(RosterStore.class.getName()); private final PeerRepository peerRepository; private final AccountRepository accountRepository; private final UUID accountId; private final BehaviorSubject> account; private final CompositeDisposable disposable = new CompositeDisposable(); private final Map itemMap = new HashMap<>(); private String rosterVersion; private final SchedulersFacade schedulers; public MercuryRosterStore(UUID accountId, PeerRepository rosterRepository, AccountRepository accountRepository, SchedulersFacade schedulers) { this.accountId = accountId; this.account = BehaviorSubject.create(); this.peerRepository = rosterRepository; this.accountRepository = accountRepository; this.schedulers = schedulers; LOGGER.log(Level.INFO, "Construct Roster Store for " + accountId); subscribe(); } public void subscribe() { // Subscribe database to behaviorSubject for quick access accountRepository.observeAccount(accountId) .subscribeOn(schedulers.getIoScheduler()) .observeOn(schedulers.getIoScheduler()) .subscribe(account); disposable.add(peerRepository.observeAllContactsOfAccount(accountId) .subscribeOn(schedulers.getIoScheduler()) .observeOn(schedulers.getIoScheduler()) .subscribe(contactsList -> { itemMap.clear(); for (Peer contactModel : contactsList) { itemMap.put(contactModel.getAddress(), fromEntity(contactModel)); LOGGER.log(Level.INFO, "Populate itemMap with " + contactsList.size() + " items"); } }, error -> LOGGER.log(Level.WARNING, "An error occurred while updating roster cache", error))); disposable.add(accountRepository.getAccount(accountId) .map(Account::getRosterVersion) .subscribeOn(schedulers.getIoScheduler()) .observeOn(schedulers.getIoScheduler()) .subscribe(this::setRosterVersion, error -> LOGGER.log(Level.WARNING, "An error occurred updating cached roster version", error))); } public void unsubscribe() { disposable.dispose(); } private void setRosterVersion(String rosterVersion) { this.rosterVersion = rosterVersion; } @Override public List getEntries() { return new ArrayList<>(itemMap.values()); } @Override public RosterPacket.Item getEntry(Jid bareJid) { return itemMap.get(bareJid.asUnescapedString()); } @Override public String getRosterVersion() { return rosterVersion != null ? rosterVersion : ""; } @Override public boolean addEntry(RosterPacket.Item item, String version) { writeEntryToDatabase(item); writeRosterVersionToDatabase(version); return true; } private void writeEntryToDatabase(RosterPacket.Item item) { disposable.add(peerRepository.getOrCreatePeer(accountId, item.getJid().asUnescapedString()) .map(peer -> toEntity(item, peer)) .flatMap(peerRepository::upsertPeer) .subscribeOn(Schedulers.io()) .observeOn(Schedulers.computation()) .subscribe( success -> LOGGER.log(Level.FINE, "Upserted contact model " + success + " successfully"), error -> LOGGER.log(Level.WARNING, "An error occurred upserting contact " + item.getJid().asUnescapedString(), error) )); } private void writeRosterVersionToDatabase(String version) { disposable.add(accountRepository.updateRosterVersion(accountId, version) .subscribeOn(Schedulers.io()) .observeOn(Schedulers.computation()) .subscribe( () -> LOGGER.log(Level.FINE, "Upserted roster version to " + rosterVersion + " successfully"), error -> LOGGER.log(Level.WARNING, "An error occurred upserting roster version", error) )); } @Override public boolean resetEntries(Collection items, String version) { LOGGER.log(Level.INFO, "Reset Entries: " + Arrays.toString(items.toArray())); // Update database disposable.add(peerRepository.deleteAllPeers(accountId).subscribe()); for (RosterPacket.Item item : items) { writeEntryToDatabase(item); } disposable.add(accountRepository.updateRosterVersion(accountId, version) .subscribeOn(Schedulers.io()) .observeOn(Schedulers.computation()) .subscribe( () -> LOGGER.log(Level.FINE, "Upserted roster version to " + rosterVersion + " successfully"), error -> LOGGER.log(Level.WARNING, "An error occurred upserting roster version", error) )); return true; } @Override public boolean removeEntry(Jid bareJid, String version) { LOGGER.log(Level.INFO, "Remove entry " + bareJid.toString()); disposable.add(peerRepository.deletePeer(accountId, bareJid.asEntityBareJidOrThrow().asEntityBareJidString()) .subscribeOn(Schedulers.io()) .observeOn(Schedulers.computation()) .subscribe( () -> LOGGER.log(Level.FINE, "Deletion of contact " + bareJid.toString() + " successful"), error -> LOGGER.log(Level.WARNING, "An error occurred deleting contact " + bareJid.toString(), error) )); disposable.add(accountRepository.updateRosterVersion(accountId, version) .subscribeOn(Schedulers.io()) .observeOn(Schedulers.computation()) .subscribe( () -> LOGGER.log(Level.FINE, "Upserted roster version to " + rosterVersion + " successfully"), error -> LOGGER.log(Level.WARNING, "An error occurred upserting roster version", error) )); return true; } @Override public void resetStore() { LOGGER.log(Level.INFO, "Reset Store"); disposable.add(peerRepository.deleteAllPeers(accountId) .subscribeOn(Schedulers.io()) .observeOn(Schedulers.computation()) .subscribe( () -> LOGGER.log(Level.FINE, "Successfully reset store."), error -> LOGGER.log(Level.WARNING, "An error occurred resetting store", error) )); disposable.add(accountRepository.updateRosterVersion(accountId, "") .subscribeOn(Schedulers.io()) .observeOn(Schedulers.computation()) .subscribe( () -> LOGGER.log(Level.FINE, "Successfully reset roster version"), error -> LOGGER.log(Level.WARNING, "An error occurred resetting roster version", error) )); } public RosterPacket.Item fromEntity(Peer contactModel) { RosterPacket.Item item = new RosterPacket.Item( JidCreate.entityBareFromOrThrowUnchecked(contactModel.getAddress()), contactModel.getName()); if (contactModel.getSubscriptionDirection() != null) { item.setItemType(convert(contactModel.getSubscriptionDirection())); } item.setApproved(contactModel.isSubscriptionApproved()); item.setSubscriptionPending(contactModel.isSubscriptionPending()); List groupNames = contactModel.getGroupNames(); if (groupNames != null) { for (String groupName : groupNames) { item.addGroupName(groupName); } } return item; } public Peer toEntity(RosterPacket.Item item, Peer peer) { peer.setAccount(account.getValue().getItem()); peer.setAddress(item.getJid().asEntityBareJidOrThrow().asEntityBareJidString()); peer.setName(item.getName()); if (item.getItemType() != null) { peer.setSubscriptionDirection(convert(item.getItemType())); } peer.setSubscriptionApproved(item.isApproved()); peer.setSubscriptionPending(item.isSubscriptionPending()); peer.setGroupNames(new ArrayList<>(item.getGroupNames())); return peer; } public SubscriptionDirection convert(RosterPacket.ItemType type) { return SubscriptionDirection.valueOf(type.toString()); } public RosterPacket.ItemType convert(SubscriptionDirection direction) { return RosterPacket.ItemType.fromString(direction.toString()); } }