package org.mercury_im.messenger.xmpp.repository; import org.mercury_im.messenger.core.entity.Account; import org.mercury_im.messenger.core.repository.AccountRepository; import org.mercury_im.messenger.core.util.Optional; import org.mercury_im.core.util.ThreadUtils; import org.mercury_im.messenger.xmpp.mapping.AccountMapping; import org.mercury_im.messenger.xmpp.model.AccountModel; import java.util.ArrayList; import java.util.List; import javax.inject.Inject; import javax.inject.Named; import io.reactivex.Completable; import io.reactivex.Maybe; import io.reactivex.Observable; import io.reactivex.Scheduler; import io.reactivex.Single; import io.requery.Persistable; import io.requery.query.ResultDelegate; import io.requery.reactivex.ReactiveEntityStore; public class ReactiveXmppAccountRepository extends RequeryRepository implements AccountRepository { @Inject public ReactiveXmppAccountRepository(ReactiveEntityStore data, @Named(value = ThreadUtils.SCHEDULER_IO) Scheduler subscriberScheduler, @Named(value = ThreadUtils.SCHEDULER_UI) Scheduler observerScheduler) { super(data, subscriberScheduler, observerScheduler); } @Override public Single insertAccount(Account account) { return data().insert(AccountMapping.createModelFromEntity(account)) .map(AccountMapping::modelToEntity) .subscribeOn(subscriberScheduler()) .observeOn(observerScheduler()); } @Override public Observable> observeAccount(long accountId) { return data().select(AccountModel.class) .where(AccountModel.ID.eq(accountId)) .get().observableResult() .map(result -> new Optional<>(AccountMapping.modelToEntity(result.firstOrNull()))) .subscribeOn(subscriberScheduler()) .observeOn(observerScheduler()); } @Override public Maybe getAccount(long accountId) { return data().select(AccountModel.class) .where(AccountModel.ID.eq(accountId)) .get().maybe() .map(AccountMapping::modelToEntity) .subscribeOn(subscriberScheduler()) .observeOn(observerScheduler()); } @Override public Observable> observeAccountByAddress(String address) { return data().select(AccountModel.class) .where(AccountModel.ADDRESS.eq(address)) .get().observableResult() .map(result -> new Optional<>(AccountMapping.modelToEntity(result.firstOrNull()))) .subscribeOn(subscriberScheduler()) .observeOn(observerScheduler()); } @Override public Maybe getAccountByAddress(String address) { return data().select(AccountModel.class) .where(AccountModel.ADDRESS.eq(address)) .get().maybe() .map(AccountMapping::modelToEntity) .subscribeOn(subscriberScheduler()) .observeOn(observerScheduler()); } @Override public Observable> observeAllAccounts() { return data().select(AccountModel.class) .get().observableResult() .map(ResultDelegate::toList) .map(list -> { List entities = new ArrayList<>(list.size()); list.forEach(model -> entities.add(AccountMapping.modelToEntity(model))); return entities; }) .subscribeOn(subscriberScheduler()) .observeOn(observerScheduler()); } @Override public Single updateAccount(Account account) { // Since we cannot access setId() of AccountModel, we have to query the model by ID and update it manually. // https://github.com/requery/requery/issues/616#issuecomment-315685460 // fetch model return data().select(AccountModel.class) .where(AccountModel.ID.eq(account.getId())) .get().maybe().toSingle() // to single .map(model -> { // update it model.setEnabled(account.isEnabled()); model.setPassword(account.getAuthentication().getPassword()); model.setAddress(account.getAddress()); // write the updated model back model = data().update(model).blockingGet(); return AccountMapping.modelToEntity(model); }) .subscribeOn(subscriberScheduler()) .observeOn(observerScheduler()); } @Override public Single upsertAccount(Account account) { // Try to fetch model return data().select(AccountModel.class) .where(AccountModel.ID.eq(account.getId())) .get().maybe() // If it does not exist, create a new model from the entity .switchIfEmpty(data().insert(AccountMapping.createModelFromEntity(account))) // finally .map(model -> { // update the model model.setEnabled(account.isEnabled()); model.setPassword(account.getAuthentication().getPassword()); model.setAddress(account.getAddress()); // write the updated model back model = data().update(model).blockingGet(); return AccountMapping.modelToEntity(model); }) .subscribeOn(subscriberScheduler()) .observeOn(observerScheduler()); } @Override public Completable deleteAccount(Account account) { return data().delete(AccountModel.class) .where(AccountModel.ID.eq(account.getId())) .get().single().ignoreElement() // to completable .subscribeOn(subscriberScheduler()) .observeOn(observerScheduler()); } }