Mercury-IM/data/src/main/java/org/mercury_im/messenger/data/repository/XmppAccountRepository.java

178 lines
8.3 KiB
Java

package org.mercury_im.messenger.data.repository;
import org.mercury_im.messenger.data.mapping.AccountMapping;
import org.mercury_im.messenger.data.model.AccountModel;
import org.mercury_im.messenger.data.repository.dao.AccountDao;
import org.mercury_im.messenger.entity.Account;
import org.mercury_im.messenger.util.Optional;
import org.mercury_im.messenger.util.ThreadUtils;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.inject.Inject;
import javax.inject.Named;
import io.reactivex.Completable;
import io.reactivex.Maybe;
import io.reactivex.Observable;
import io.reactivex.Scheduler;
import io.reactivex.Single;
import io.requery.Persistable;
import io.requery.query.ResultDelegate;
import io.requery.reactivex.ReactiveEntityStore;
import io.requery.reactivex.ReactiveResult;
public class XmppAccountRepository
extends RequeryRepository
implements AccountRepository {
private static final Logger LOGGER = Logger.getLogger(XmppAccountRepository.class.getName());
private static final Level LEVEL = Level.INFO;
private final AccountMapping accountMapping;
private final AccountDao dao;
@Inject
public XmppAccountRepository(ReactiveEntityStore<Persistable> data,
@Named(value = ThreadUtils.SCHEDULER_IO) Scheduler subscriberScheduler,
@Named(value = ThreadUtils.SCHEDULER_UI) Scheduler observerScheduler,
AccountMapping accountMapping) {
super(data, subscriberScheduler, observerScheduler);
this.accountMapping = accountMapping;
this.dao = new AccountDao(data);
}
@Override
public Single<Account> insertAccount(Account account) {
return Single.just(account)
.map(accountMapping::toModel)
.flatMap(dao::insert)
.map(model -> accountMapping.toEntity(model, account))
.doOnSuccess(a -> LOGGER.log(LEVEL, "insertAccount(" + account + "): onSuccess(): " + a))
.doOnError(e -> LOGGER.log(Level.SEVERE, "insertAccount(" + account + "): onError()", e))
.subscribeOn(subscriberScheduler())
.observeOn(observerScheduler());
}
@Override
public Observable<Optional<Account>> observeAccount(UUID accountId) {
return dao.get(accountId).observableResult()
.map(result -> new Optional<>(result.firstOrNull()))
.map(accountMapping::toEntity)
.doOnNext(a -> LOGGER.log(LEVEL, "observeAccount(" + accountId + "): onNext(): " + a))
.doOnError(e -> LOGGER.log(Level.SEVERE, "observeAccount(" + accountId + "): onError()", e))
.doOnComplete(() -> LOGGER.log(LEVEL, "observeAccount(" + accountId + "): onComplete()"))
.subscribeOn(subscriberScheduler())
.observeOn(observerScheduler());
}
@Override
public Maybe<Account> getAccount(UUID accountId) {
return dao.get(accountId).maybe()
.map(accountMapping::toEntity)
.doOnSuccess(a -> LOGGER.log(LEVEL, "getAccount(" + accountId + "): onSuccess(): " + a))
.doOnComplete(() -> LOGGER.log(LEVEL, "getAccount(" + accountId + "): onComplete()"))
.doOnError(e -> LOGGER.log(Level.SEVERE, "getAccount(" + accountId + "): onError()", e))
.subscribeOn(subscriberScheduler())
.observeOn(observerScheduler());
}
@Override
public Observable<Optional<Account>> observeAccountByAddress(String address) {
return dao.get(address).observableResult()
.map(result -> new Optional<>(result.firstOrNull()))
.map(accountMapping::toEntity)
.doOnNext(o -> LOGGER.log(LEVEL, "observeAccountByAddress(" + address + "): onNext(): " + o))
.doOnError(e -> LOGGER.log(Level.SEVERE, "observeAccountByAddress(" + address + "): onError()", e))
.doOnComplete(() -> LOGGER.log(Level.INFO, "observeAccountByAddress(" + address + "): onComplete()"))
.subscribeOn(subscriberScheduler())
.observeOn(observerScheduler());
}
@Override
public Maybe<Account> getAccountByAddress(String address) {
return dao.get(address).maybe()
.map(accountMapping::toEntity)
.doOnSuccess(a -> LOGGER.log(LEVEL, "getAccountByAddress(" + address + "): onSuccess(): " + a))
.doOnComplete(() -> LOGGER.log(LEVEL, "getAccountByAddress(" + address + "): onComplete()"))
.doOnError(e -> LOGGER.log(Level.SEVERE, "getAccountByAddress(" + address + "): onError()", e))
.subscribeOn(subscriberScheduler())
.observeOn(observerScheduler());
}
@Override
public Observable<List<Account>> observeAllAccounts() {
return dao.getAll().observableResult()
.map(ResultDelegate::toList)
.map(this::modelsToEntities)
.doOnNext(l -> LOGGER.log(LEVEL, "observeAllAccounts(): onNext(): " + Arrays.toString(l.toArray())))
.doOnError(e -> LOGGER.log(Level.SEVERE, "observeAllAccounts(): onError()", e))
.doOnComplete(() -> LOGGER.log(LEVEL, "observeAllAccounts(): onComplete()"))
.subscribeOn(subscriberScheduler())
.observeOn(observerScheduler());
}
@Override
public Observable<Account> observeAccounts() {
return dao.getAll().observableResult()
.flatMap(ReactiveResult::observable)
.map(accountMapping::toEntity)
.doOnNext(a -> LOGGER.log(LEVEL, "observeAccounts(): onNext(): " + a))
.doOnError(e -> LOGGER.log(Level.SEVERE, "observeAccounts(): onError()", e))
.doOnComplete(() -> LOGGER.log(LEVEL, "observeAccounts(): onComplete()"))
.subscribeOn(subscriberScheduler())
.observeOn(observerScheduler());
}
@Override
public Single<Account> updateAccount(Account account) {
// Since we cannot access setId() of AccountModel, we have to query the model by ID and update it manually.
// https://github.com/requery/requery/issues/616#issuecomment-315685460
return dao.get(account.getId()).maybe().toSingle()
.map(model -> accountMapping.toModel(account, model))
.flatMap(updatedModel -> data().update(updatedModel))
.map(model -> accountMapping.toEntity(model, account))
.doOnSuccess(a -> LOGGER.log(LEVEL, "updateAccount(" + account + "): onSuccess(): " + a))
.doOnError(e -> LOGGER.log(Level.SEVERE, "updateAccount(" + account + "): onError()", e))
.subscribeOn(subscriberScheduler())
.observeOn(observerScheduler());
}
@Override
public Single<Account> upsertAccount(Account account) {
return dao.get(account.getId()).maybe()
.switchIfEmpty(
Single.just(account).map(accountMapping::toModel).flatMap(dao::insert))
.map(model -> accountMapping.toModel(account, model))
.flatMap(data()::update)
.map(model -> accountMapping.toEntity(model, account))
.doOnSuccess(a -> LOGGER.log(LEVEL, "upsertAccount(" + account + "): onSuccess(): " + a))
.doOnError(e -> LOGGER.log(Level.SEVERE, "upsertAccount(" + account + "): onError()", e))
.subscribeOn(subscriberScheduler())
.observeOn(observerScheduler());
}
@Override
public Completable deleteAccount(UUID accountId) {
return dao.delete(accountId).ignoreElement()
.doOnComplete(() -> LOGGER.log(LEVEL, "deleteAccount(" + accountId + "): onComplete()"))
.doOnError(e -> LOGGER.log(Level.SEVERE, "deleteAccount(" + accountId + "): onError()", e))
.subscribeOn(subscriberScheduler())
.observeOn(observerScheduler());
}
private List<Account> modelsToEntities(List<AccountModel> models) {
List<Account> entities = new ArrayList<>(models.size());
for (AccountModel model : models) {
entities.add(accountMapping.toEntity(model));
}
return entities;
}
}