package org.mercury_im.messenger.store; import org.jivesoftware.smack.roster.packet.RosterPacket; import org.jivesoftware.smack.roster.rosterstore.RosterStore; import org.jxmpp.jid.Jid; import org.jxmpp.jid.impl.JidCreate; import org.mercury_im.messenger.data.repository.AccountRepository; import org.mercury_im.messenger.data.repository.PeerRepository; import org.mercury_im.messenger.entity.Account; import org.mercury_im.messenger.entity.contact.IPeer; import org.mercury_im.messenger.entity.contact.Peer; import org.mercury_im.messenger.entity.contact.SubscriptionDirection; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.logging.Level; import java.util.logging.Logger; import io.reactivex.disposables.CompositeDisposable; import io.reactivex.schedulers.Schedulers; public class MercuryRosterStore implements RosterStore { private static final Logger LOGGER = Logger.getLogger(RosterStore.class.getName()); private final PeerRepository peerRepository; private final AccountRepository accountRepository; private Account account; private final CompositeDisposable disposable = new CompositeDisposable(); private final Map itemMap = new HashMap<>(); private String rosterVersion; public MercuryRosterStore(Account account, PeerRepository rosterRepository, AccountRepository accountRepository) { this.account = account; this.peerRepository = rosterRepository; this.accountRepository = accountRepository; LOGGER.log(Level.INFO, "Construct Roster Store for " + account.getId()); } public void subscribe() { disposable.add(peerRepository.observeAllContactsOfAccount(account) .observeOn(Schedulers.computation()) .subscribe(contactsList -> { itemMap.clear(); for (Peer contactModel : contactsList) { itemMap.put(contactModel.getAddress(), fromEntity(contactModel)); LOGGER.log(Level.INFO, "Populate itemMap with " + contactsList.size() + " items"); } }, error -> LOGGER.log(Level.WARNING, "An error occurred while updating roster cache", error))); /* disposable.add(peerRepository.getRosterVersion(account) .observeOn(Schedulers.computation()) .subscribe( result -> setRosterVersion(result), error -> LOGGER.log(Level.WARNING, "An error occurred updating cached roster version", error))); */ } public void unsubscribe() { disposable.dispose(); } private void setRosterVersion(String rosterVersion) { this.rosterVersion = rosterVersion; } @Override public List getEntries() { return new ArrayList<>(itemMap.values()); } @Override public RosterPacket.Item getEntry(Jid bareJid) { return itemMap.get(bareJid); } @Override public String getRosterVersion() { return rosterVersion != null ? rosterVersion : ""; } @Override public boolean addEntry(RosterPacket.Item item, String version) { LOGGER.log(Level.INFO, "Add entry " + item.toXML().toString()); // Update database Peer contact = toEntity(item); disposable.add(peerRepository.upsertPeer(contact) .map(p -> { LOGGER.log(Level.INFO, "Unserted Peer for account " + p.getAccount().getId()); return p; }) .subscribe( success -> LOGGER.log(Level.FINE, "Upserted contact model " + success + " successfully"), error -> LOGGER.log(Level.WARNING, "An error occurred upserting contact " + contact, error) )); /* disposable.add(peerRepository.updateRosterVersion(account, version) .subscribe( success -> LOGGER.log(Level.FINE, "Upserted roster version to " + rosterVersion + " successfully"), error -> LOGGER.log(Level.WARNING, "An error occurred upserting roster version", error) )); */ return true; } @Override public boolean resetEntries(Collection items, String version) { LOGGER.log(Level.INFO, "Reset Entries: " + Arrays.toString(items.toArray())); // Update database // TODO: Delete other contacts for (RosterPacket.Item item : items) { Peer model = toEntity(item); disposable.add(peerRepository.upsertPeer(model) .map(p -> { LOGGER.log(Level.INFO, "Unserted Peer for account " + p.getAccount().getId()); return p; }) .subscribe( success -> LOGGER.log(Level.FINE, "Upserted contact model " + success + " successfully"), error -> LOGGER.log(Level.WARNING, "An error occurred upserting contact " + model, error) )); } /* disposable.add(peerRepository.updateRosterVersion(account, version) .subscribe( success -> LOGGER.log(Level.FINE, "Upserted roster version to " + rosterVersion + " successfully"), error -> LOGGER.log(Level.WARNING, "An error occurred upserting roster version", error) )); */ return true; } @Override public boolean removeEntry(Jid bareJid, String version) { LOGGER.log(Level.INFO, "Remove entry " + bareJid.toString()); disposable.add(peerRepository.deletePeer(account.getId(), bareJid.asEntityBareJidOrThrow().asEntityBareJidString()) .subscribe( () -> LOGGER.log(Level.FINE, "Deletion of contact " + bareJid.toString() + " successful"), error -> LOGGER.log(Level.WARNING, "An error occurred deleting contact " + bareJid.toString(), error) )); /* disposable.add(peerRepository.updateRosterVersion(account, version) .subscribe( success -> LOGGER.log(Level.FINE, "Upserted roster version to " + rosterVersion + " successfully"), error -> LOGGER.log(Level.WARNING, "An error occurred upserting roster version", error) )); */ return true; } @Override public void resetStore() { LOGGER.log(Level.INFO, "Reset Store"); /* disposable.add(peerRepository.deleteAllContactsOfAccount(account) .subscribe( success -> LOGGER.log(Level.FINE, "Successfully reset store."), error -> LOGGER.log(Level.WARNING, "An error occurred resetting store", error) )); disposable.add(peerRepository.updateRosterVersion(account, "") .subscribe( success -> LOGGER.log(Level.FINE, "Successfully reset roster version"), error -> LOGGER.log(Level.WARNING, "An error occurred resetting roster version", error) )); */ } public RosterPacket.Item fromEntity(Peer contactModel) { RosterPacket.Item item = new RosterPacket.Item( JidCreate.entityBareFromOrThrowUnchecked(contactModel.getAddress()), contactModel.getName()); if (contactModel.getSubscriptionDirection() != null) { item.setItemType(convert(contactModel.getSubscriptionDirection())); } item.setApproved(contactModel.isSubscriptionApproved()); item.setSubscriptionPending(contactModel.isSubscriptionPending()); return item; } public Peer toEntity(RosterPacket.Item item) { Peer peer = new IPeer(); peer.setAccount(account); peer.setAddress(item.getJid().asEntityBareJidOrThrow().asEntityBareJidString()); peer.setName(item.getName()); if (item.getItemType() != null) { peer.setSubscriptionDirection(convert(item.getItemType())); } peer.setSubscriptionApproved(item.isApproved()); peer.setSubscriptionPending(item.isSubscriptionPending()); return peer; } public SubscriptionDirection convert(RosterPacket.ItemType type) { return SubscriptionDirection.valueOf(type.toString()); } public RosterPacket.ItemType convert(SubscriptionDirection direction) { return RosterPacket.ItemType.fromString(direction.toString()); } }