Rename repository implementations and remove scheduling from the data module

This commit is contained in:
Paul Schaub 2020-06-09 20:49:13 +02:00
parent 92e1d1ec2b
commit 3b9042b95e
Signed by: vanitasvitae
GPG Key ID: 62BEE9264BF17311
16 changed files with 135 additions and 307 deletions

View File

@ -1,32 +1,29 @@
package org.mercury_im.messenger.data.di; package org.mercury_im.messenger.data.di;
import org.mercury_im.messenger.core.data.repository.AccountRepository;
import org.mercury_im.messenger.core.data.repository.DirectChatRepository;
import org.mercury_im.messenger.core.data.repository.EntityCapsRepository;
import org.mercury_im.messenger.core.data.repository.GroupChatRepository;
import org.mercury_im.messenger.core.data.repository.MessageRepository;
import org.mercury_im.messenger.core.data.repository.PeerRepository;
import org.mercury_im.messenger.core.data.repository.Repositories;
import org.mercury_im.messenger.data.mapping.AccountMapping; import org.mercury_im.messenger.data.mapping.AccountMapping;
import org.mercury_im.messenger.data.mapping.DirectChatMapping; import org.mercury_im.messenger.data.mapping.DirectChatMapping;
import org.mercury_im.messenger.data.mapping.EntityCapsMapping; import org.mercury_im.messenger.data.mapping.EntityCapsMapping;
import org.mercury_im.messenger.data.mapping.GroupChatMapping; import org.mercury_im.messenger.data.mapping.GroupChatMapping;
import org.mercury_im.messenger.data.mapping.MessageMapping; import org.mercury_im.messenger.data.mapping.MessageMapping;
import org.mercury_im.messenger.data.mapping.PeerMapping; import org.mercury_im.messenger.data.mapping.PeerMapping;
import org.mercury_im.messenger.core.data.repository.AccountRepository; import org.mercury_im.messenger.data.repository.RxAccountRepository;
import org.mercury_im.messenger.core.data.repository.EntityCapsRepository; import org.mercury_im.messenger.data.repository.RxDirectChatRepository;
import org.mercury_im.messenger.core.data.repository.GroupChatRepository; import org.mercury_im.messenger.data.repository.RxEntityCapsRepository;
import org.mercury_im.messenger.core.data.repository.MessageRepository; import org.mercury_im.messenger.data.repository.RxGroupChatRepository;
import org.mercury_im.messenger.core.data.repository.PeerRepository; import org.mercury_im.messenger.data.repository.RxMessageRepository;
import org.mercury_im.messenger.core.data.repository.DirectChatRepository; import org.mercury_im.messenger.data.repository.RxPeerRepository;
import org.mercury_im.messenger.data.repository.XmppEntityCapsRepository;
import org.mercury_im.messenger.core.data.repository.Repositories;
import org.mercury_im.messenger.data.repository.XmppAccountRepository;
import org.mercury_im.messenger.data.repository.XmppDirectChatRepository;
import org.mercury_im.messenger.data.repository.XmppGroupChatRepository;
import org.mercury_im.messenger.data.repository.XmppMessageRepository;
import org.mercury_im.messenger.data.repository.XmppPeerRepository;
import org.mercury_im.messenger.core.util.ThreadUtils;
import javax.inject.Named;
import javax.inject.Singleton; import javax.inject.Singleton;
import dagger.Module; import dagger.Module;
import dagger.Provides; import dagger.Provides;
import io.reactivex.Scheduler;
import io.requery.Persistable; import io.requery.Persistable;
import io.requery.reactivex.ReactiveEntityStore; import io.requery.reactivex.ReactiveEntityStore;
@ -40,64 +37,51 @@ public class RepositoryModule {
@Singleton @Singleton
static AccountRepository provideAccountRepository( static AccountRepository provideAccountRepository(
ReactiveEntityStore<Persistable> data, ReactiveEntityStore<Persistable> data,
@Named(value = ThreadUtils.SCHEDULER_IO) Scheduler ioScheduler,
@Named(value = ThreadUtils.SCHEDULER_UI) Scheduler uiScheduler,
AccountMapping accountMapping) { AccountMapping accountMapping) {
return new XmppAccountRepository(data, ioScheduler, uiScheduler, accountMapping); return new RxAccountRepository(data, accountMapping);
} }
@Provides @Provides
@Singleton @Singleton
static PeerRepository providePeerRepository( static PeerRepository providePeerRepository(
ReactiveEntityStore<Persistable> data, ReactiveEntityStore<Persistable> data,
@Named(value = ThreadUtils.SCHEDULER_IO) Scheduler ioScheduler,
@Named(value = ThreadUtils.SCHEDULER_UI) Scheduler uiScheduler,
PeerMapping peerMapping, PeerMapping peerMapping,
AccountRepository accountRepository) { AccountRepository accountRepository) {
return new XmppPeerRepository(data, ioScheduler, uiScheduler, peerMapping, accountRepository); return new RxPeerRepository(data, peerMapping, accountRepository);
} }
@Provides @Provides
@Singleton @Singleton
static DirectChatRepository provideDirectChatRepository( static DirectChatRepository provideDirectChatRepository(
ReactiveEntityStore<Persistable> data, ReactiveEntityStore<Persistable> data,
@Named(value = ThreadUtils.SCHEDULER_IO) Scheduler ioScheduler,
@Named(value = ThreadUtils.SCHEDULER_UI) Scheduler uiScheduler,
DirectChatMapping directChatMapping) { DirectChatMapping directChatMapping) {
return new XmppDirectChatRepository(data, ioScheduler, uiScheduler, directChatMapping); return new RxDirectChatRepository(data, directChatMapping);
} }
@Provides @Provides
@Singleton @Singleton
static GroupChatRepository provideGroupChatRepository( static GroupChatRepository provideGroupChatRepository(
ReactiveEntityStore<Persistable> data, ReactiveEntityStore<Persistable> data,
@Named(value = ThreadUtils.SCHEDULER_IO) Scheduler ioScheduler,
@Named(value = ThreadUtils.SCHEDULER_UI) Scheduler uiScheduler,
GroupChatMapping groupChatMapping) { GroupChatMapping groupChatMapping) {
return new XmppGroupChatRepository(data, ioScheduler, uiScheduler, groupChatMapping); return new RxGroupChatRepository(data, groupChatMapping);
} }
@Provides @Provides
@Singleton @Singleton
static MessageRepository provideMessageRepository( static MessageRepository provideMessageRepository(
ReactiveEntityStore<Persistable> data, ReactiveEntityStore<Persistable> data,
@Named(value = ThreadUtils.SCHEDULER_IO) Scheduler ioScheduler,
@Named(value = ThreadUtils.SCHEDULER_UI) Scheduler uiScheduler,
MessageMapping messageMapping, MessageMapping messageMapping,
DirectChatMapping directChatMapping, DirectChatMapping directChatMapping,
GroupChatMapping groupChatMapping) { GroupChatMapping groupChatMapping) {
return new XmppMessageRepository(data, ioScheduler, uiScheduler, return new RxMessageRepository(data, messageMapping, directChatMapping, groupChatMapping);
messageMapping, directChatMapping, groupChatMapping);
} }
@Provides @Provides
@Singleton @Singleton
static EntityCapsRepository provideCapsRepository( static EntityCapsRepository provideCapsRepository(
ReactiveEntityStore<Persistable> data, ReactiveEntityStore<Persistable> data,
@Named(value = ThreadUtils.SCHEDULER_IO) Scheduler ioScheduler,
@Named(value = ThreadUtils.SCHEDULER_UI) Scheduler uiScheduler,
EntityCapsMapping entityCapsMapping) { EntityCapsMapping entityCapsMapping) {
return new XmppEntityCapsRepository(data, ioScheduler, uiScheduler, entityCapsMapping); return new RxEntityCapsRepository(data, entityCapsMapping);
} }
@Provides @Provides
@ -108,7 +92,7 @@ public class RepositoryModule {
GroupChatRepository groupChatRepository, GroupChatRepository groupChatRepository,
MessageRepository messageRepository, MessageRepository messageRepository,
PeerRepository peerRepository, PeerRepository peerRepository,
XmppEntityCapsRepository entityCapsRepository) { RxEntityCapsRepository entityCapsRepository) {
return new Repositories(accountRepository, directChatRepository, groupChatRepository, return new Repositories(accountRepository, directChatRepository, groupChatRepository,
messageRepository, peerRepository, entityCapsRepository); messageRepository, peerRepository, entityCapsRepository);
} }

View File

@ -1,38 +1,17 @@
package org.mercury_im.messenger.data.repository; package org.mercury_im.messenger.data.repository;
import org.mercury_im.messenger.core.util.ThreadUtils;
import javax.inject.Named;
import io.reactivex.Scheduler;
import io.requery.Persistable; import io.requery.Persistable;
import io.requery.reactivex.ReactiveEntityStore; import io.requery.reactivex.ReactiveEntityStore;
public abstract class RequeryRepository { public abstract class RequeryRepository {
private final Scheduler subscriberScheduler;
private final Scheduler observerScheduler;
private final ReactiveEntityStore<Persistable> data; private final ReactiveEntityStore<Persistable> data;
protected RequeryRepository(ReactiveEntityStore<Persistable> data, protected RequeryRepository(ReactiveEntityStore<Persistable> data) {
@Named(value = ThreadUtils.SCHEDULER_IO) Scheduler subscriberScheduler,
@Named(value = ThreadUtils.SCHEDULER_UI) Scheduler observerScheduler) {
this.data = data; this.data = data;
this.subscriberScheduler = subscriberScheduler;
this.observerScheduler = observerScheduler;
} }
protected ReactiveEntityStore<Persistable> data() { protected ReactiveEntityStore<Persistable> data() {
return data; return data;
} }
protected Scheduler subscriberScheduler() {
return subscriberScheduler;
}
protected Scheduler observerScheduler() {
return observerScheduler;
}
} }

View File

@ -1,31 +1,28 @@
package org.mercury_im.messenger.data.repository; package org.mercury_im.messenger.data.repository;
import org.mercury_im.messenger.core.data.repository.AccountRepository; import org.mercury_im.messenger.core.data.repository.AccountRepository;
import org.mercury_im.messenger.core.util.Optional;
import org.mercury_im.messenger.data.mapping.AccountMapping; import org.mercury_im.messenger.data.mapping.AccountMapping;
import org.mercury_im.messenger.data.model.AccountModel; import org.mercury_im.messenger.data.model.AccountModel;
import org.mercury_im.messenger.data.repository.dao.AccountDao; import org.mercury_im.messenger.data.repository.dao.AccountDao;
import org.mercury_im.messenger.entity.Account; import org.mercury_im.messenger.entity.Account;
import org.mercury_im.messenger.core.util.Optional;
import org.mercury_im.messenger.core.util.ThreadUtils;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.UUID; import java.util.UUID;
import javax.inject.Inject; import javax.inject.Inject;
import javax.inject.Named;
import io.reactivex.Completable; import io.reactivex.Completable;
import io.reactivex.Maybe; import io.reactivex.Maybe;
import io.reactivex.Observable; import io.reactivex.Observable;
import io.reactivex.Scheduler;
import io.reactivex.Single; import io.reactivex.Single;
import io.requery.Persistable; import io.requery.Persistable;
import io.requery.query.ResultDelegate; import io.requery.query.ResultDelegate;
import io.requery.reactivex.ReactiveEntityStore; import io.requery.reactivex.ReactiveEntityStore;
import io.requery.reactivex.ReactiveResult; import io.requery.reactivex.ReactiveResult;
public class XmppAccountRepository public class RxAccountRepository
extends RequeryRepository extends RequeryRepository
implements AccountRepository { implements AccountRepository {
@ -33,11 +30,9 @@ public class XmppAccountRepository
private final AccountDao dao; private final AccountDao dao;
@Inject @Inject
public XmppAccountRepository(ReactiveEntityStore<Persistable> data, public RxAccountRepository(ReactiveEntityStore<Persistable> data,
@Named(value = ThreadUtils.SCHEDULER_IO) Scheduler subscriberScheduler, AccountMapping accountMapping) {
@Named(value = ThreadUtils.SCHEDULER_UI) Scheduler observerScheduler, super(data);
AccountMapping accountMapping) {
super(data, subscriberScheduler, observerScheduler);
this.accountMapping = accountMapping; this.accountMapping = accountMapping;
this.dao = new AccountDao(data); this.dao = new AccountDao(data);
} }
@ -47,61 +42,47 @@ public class XmppAccountRepository
return Single.just(account) return Single.just(account)
.map(accountMapping::toModel) .map(accountMapping::toModel)
.flatMap(dao::insert) .flatMap(dao::insert)
.map(model -> accountMapping.toEntity(model, account)) .map(model -> accountMapping.toEntity(model, account));
.subscribeOn(subscriberScheduler())
.observeOn(observerScheduler());
} }
@Override @Override
public Observable<Optional<Account>> observeAccount(UUID accountId) { public Observable<Optional<Account>> observeAccount(UUID accountId) {
return dao.get(accountId).observableResult() return dao.get(accountId).observableResult()
.map(result -> new Optional<>(result.firstOrNull())) .map(result -> new Optional<>(result.firstOrNull()))
.map(accountMapping::toEntity) .map(accountMapping::toEntity);
.subscribeOn(subscriberScheduler())
.observeOn(observerScheduler());
} }
@Override @Override
public Maybe<Account> getAccount(UUID accountId) { public Maybe<Account> getAccount(UUID accountId) {
return dao.get(accountId).maybe() return dao.get(accountId).maybe()
.map(accountMapping::toEntity) .map(accountMapping::toEntity);
.subscribeOn(subscriberScheduler())
.observeOn(observerScheduler());
} }
@Override @Override
public Observable<Optional<Account>> observeAccountByAddress(String address) { public Observable<Optional<Account>> observeAccountByAddress(String address) {
return dao.get(address).observableResult() return dao.get(address).observableResult()
.map(result -> new Optional<>(result.firstOrNull())) .map(result -> new Optional<>(result.firstOrNull()))
.map(accountMapping::toEntity) .map(accountMapping::toEntity);
.subscribeOn(subscriberScheduler())
.observeOn(observerScheduler());
} }
@Override @Override
public Maybe<Account> getAccountByAddress(String address) { public Maybe<Account> getAccountByAddress(String address) {
return dao.get(address).maybe() return dao.get(address).maybe()
.map(accountMapping::toEntity) .map(accountMapping::toEntity);
.subscribeOn(subscriberScheduler())
.observeOn(observerScheduler());
} }
@Override @Override
public Observable<List<Account>> observeAllAccounts() { public Observable<List<Account>> observeAllAccounts() {
return dao.getAll().observableResult() return dao.getAll().observableResult()
.map(ResultDelegate::toList) .map(ResultDelegate::toList)
.map(this::modelsToEntities) .map(this::modelsToEntities);
.subscribeOn(subscriberScheduler())
.observeOn(observerScheduler());
} }
@Override @Override
public Observable<Account> observeAccounts() { public Observable<Account> observeAccounts() {
return dao.getAll().observableResult() return dao.getAll().observableResult()
.flatMap(ReactiveResult::observable) .flatMap(ReactiveResult::observable)
.map(accountMapping::toEntity) .map(accountMapping::toEntity);
.subscribeOn(subscriberScheduler())
.observeOn(observerScheduler());
} }
@Override @Override
@ -112,9 +93,7 @@ public class XmppAccountRepository
return dao.get(account.getId()).maybe().toSingle() return dao.get(account.getId()).maybe().toSingle()
.map(model -> accountMapping.toModel(account, model)) .map(model -> accountMapping.toModel(account, model))
.flatMap(updatedModel -> data().update(updatedModel)) .flatMap(updatedModel -> data().update(updatedModel))
.map(model -> accountMapping.toEntity(model, account)) .map(model -> accountMapping.toEntity(model, account));
.subscribeOn(subscriberScheduler())
.observeOn(observerScheduler());
} }
@Override @Override
@ -124,16 +103,12 @@ public class XmppAccountRepository
Single.just(account).map(accountMapping::toModel).flatMap(dao::insert)) Single.just(account).map(accountMapping::toModel).flatMap(dao::insert))
.map(model -> accountMapping.toModel(account, model)) .map(model -> accountMapping.toModel(account, model))
.flatMap(data()::update) .flatMap(data()::update)
.map(model -> accountMapping.toEntity(model, account)) .map(model -> accountMapping.toEntity(model, account));
.subscribeOn(subscriberScheduler())
.observeOn(observerScheduler());
} }
@Override @Override
public Completable deleteAccount(UUID accountId) { public Completable deleteAccount(UUID accountId) {
return dao.delete(accountId).ignoreElement() return dao.delete(accountId).ignoreElement();
.subscribeOn(subscriberScheduler())
.observeOn(observerScheduler());
} }
@Override @Override
@ -141,9 +116,7 @@ public class XmppAccountRepository
return data().update(AccountModel.class) return data().update(AccountModel.class)
.set(AccountModel.ROSTER_VERSION, rosterVersion) .set(AccountModel.ROSTER_VERSION, rosterVersion)
.where(AccountModel.ID.eq(accountId)) .where(AccountModel.ID.eq(accountId))
.get().single().ignoreElement() .get().single().ignoreElement();
.subscribeOn(subscriberScheduler())
.observeOn(observerScheduler());
} }
private List<Account> modelsToEntities(List<AccountModel> models) { private List<Account> modelsToEntities(List<AccountModel> models) {

View File

@ -1,31 +1,28 @@
package org.mercury_im.messenger.data.repository; package org.mercury_im.messenger.data.repository;
import org.mercury_im.messenger.core.data.repository.DirectChatRepository; import org.mercury_im.messenger.core.data.repository.DirectChatRepository;
import org.mercury_im.messenger.core.util.Optional;
import org.mercury_im.messenger.data.mapping.DirectChatMapping; import org.mercury_im.messenger.data.mapping.DirectChatMapping;
import org.mercury_im.messenger.data.model.DirectChatModel; import org.mercury_im.messenger.data.model.DirectChatModel;
import org.mercury_im.messenger.data.repository.dao.DirectChatDao; import org.mercury_im.messenger.data.repository.dao.DirectChatDao;
import org.mercury_im.messenger.core.util.Optional;
import org.mercury_im.messenger.entity.chat.DirectChat; import org.mercury_im.messenger.entity.chat.DirectChat;
import org.mercury_im.messenger.entity.contact.Peer; import org.mercury_im.messenger.entity.contact.Peer;
import org.mercury_im.messenger.core.util.ThreadUtils;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.UUID; import java.util.UUID;
import javax.inject.Inject; import javax.inject.Inject;
import javax.inject.Named;
import io.reactivex.Completable; import io.reactivex.Completable;
import io.reactivex.Maybe; import io.reactivex.Maybe;
import io.reactivex.Observable; import io.reactivex.Observable;
import io.reactivex.Scheduler;
import io.reactivex.Single; import io.reactivex.Single;
import io.requery.Persistable; import io.requery.Persistable;
import io.requery.query.ResultDelegate; import io.requery.query.ResultDelegate;
import io.requery.reactivex.ReactiveEntityStore; import io.requery.reactivex.ReactiveEntityStore;
public class XmppDirectChatRepository public class RxDirectChatRepository
extends RequeryRepository extends RequeryRepository
implements DirectChatRepository { implements DirectChatRepository {
@ -34,12 +31,10 @@ public class XmppDirectChatRepository
private final DirectChatDao dao; private final DirectChatDao dao;
@Inject @Inject
public XmppDirectChatRepository( public RxDirectChatRepository(
ReactiveEntityStore<Persistable> data, ReactiveEntityStore<Persistable> data,
@Named(value = ThreadUtils.SCHEDULER_IO) Scheduler subscriberScheduler,
@Named(value = ThreadUtils.SCHEDULER_UI) Scheduler observerScheduler,
DirectChatMapping directChatMapping) { DirectChatMapping directChatMapping) {
super(data, subscriberScheduler, observerScheduler); super(data);
this.directChatMapping = directChatMapping; this.directChatMapping = directChatMapping;
this.dao = new DirectChatDao(data); this.dao = new DirectChatDao(data);
} }
@ -51,26 +46,20 @@ public class XmppDirectChatRepository
.map(directChatMapping::toModel) .map(directChatMapping::toModel)
.flatMap(dao::insert) .flatMap(dao::insert)
// map back to entity // map back to entity
.map(model -> directChatMapping.toEntity(model, chat)) .map(model -> directChatMapping.toEntity(model, chat));
.subscribeOn(subscriberScheduler())
.observeOn(observerScheduler());
} }
@Override @Override
public Observable<Optional<DirectChat>> observeDirectChat(UUID chatId) { public Observable<Optional<DirectChat>> observeDirectChat(UUID chatId) {
return dao.get(chatId).observableResult() return dao.get(chatId).observableResult()
.map(result -> new Optional<>(result.firstOrNull())) .map(result -> new Optional<>(result.firstOrNull()))
.map(directChatMapping::toEntity) .map(directChatMapping::toEntity);
.subscribeOn(subscriberScheduler())
.observeOn(observerScheduler());
} }
@Override @Override
public Maybe<DirectChat> getDirectChat(UUID chatId) { public Maybe<DirectChat> getDirectChat(UUID chatId) {
return dao.get(chatId).maybe() return dao.get(chatId).maybe()
.map(directChatMapping::toEntity) .map(directChatMapping::toEntity);
.subscribeOn(subscriberScheduler())
.observeOn(observerScheduler());
} }
@Override @Override
@ -82,35 +71,27 @@ public class XmppDirectChatRepository
chat.setPeer(peer); chat.setPeer(peer);
return chat; return chat;
}) })
.flatMap(this::insertDirectChat)) .flatMap(this::insertDirectChat));
.subscribeOn(subscriberScheduler())
.observeOn(observerScheduler());
} }
@Override @Override
public Observable<Optional<DirectChat>> observeDirectChatByPeer(Peer peer) { public Observable<Optional<DirectChat>> observeDirectChatByPeer(Peer peer) {
return dao.getByPeer(peer.getId()).observableResult() return dao.getByPeer(peer.getId()).observableResult()
.map(result -> new Optional<>(result.firstOrNull())) .map(result -> new Optional<>(result.firstOrNull()))
.map(directChatMapping::toEntity) .map(directChatMapping::toEntity);
.subscribeOn(subscriberScheduler())
.observeOn(observerScheduler());
} }
@Override @Override
public Maybe<DirectChat> getDirectChatByPeer(Peer peer) { public Maybe<DirectChat> getDirectChatByPeer(Peer peer) {
return dao.getByPeer(peer.getId()).maybe() return dao.getByPeer(peer.getId()).maybe()
.map(directChatMapping::toEntity) .map(directChatMapping::toEntity);
.subscribeOn(subscriberScheduler())
.observeOn(observerScheduler());
} }
@Override @Override
public Observable<List<DirectChat>> observeAllDirectChats() { public Observable<List<DirectChat>> observeAllDirectChats() {
return dao.getAll().observableResult() return dao.getAll().observableResult()
.map(ResultDelegate::toList) .map(ResultDelegate::toList)
.map(this::chatModelsToEntities) .map(this::chatModelsToEntities);
.subscribeOn(subscriberScheduler())
.observeOn(observerScheduler());
} }
private List<DirectChat> chatModelsToEntities(List<DirectChatModel> models) { private List<DirectChat> chatModelsToEntities(List<DirectChatModel> models) {
@ -126,9 +107,7 @@ public class XmppDirectChatRepository
return dao.get(chat.getId()).maybe().toSingle() return dao.get(chat.getId()).maybe().toSingle()
.map(model -> directChatMapping.toModel(chat, model)) .map(model -> directChatMapping.toModel(chat, model))
.flatMap(data()::update) .flatMap(data()::update)
.map(model -> directChatMapping.toEntity(model, chat)) .map(model -> directChatMapping.toEntity(model, chat));
.subscribeOn(subscriberScheduler())
.observeOn(observerScheduler());
} }
@Override @Override
@ -137,16 +116,12 @@ public class XmppDirectChatRepository
.switchIfEmpty(dao.insert(directChatMapping.toModel(chat))) .switchIfEmpty(dao.insert(directChatMapping.toModel(chat)))
.map(directChatModel -> directChatMapping.toModel(chat, directChatModel)) .map(directChatModel -> directChatMapping.toModel(chat, directChatModel))
.flatMap(data()::update) .flatMap(data()::update)
.map(model -> directChatMapping.toEntity(model, chat)) .map(model -> directChatMapping.toEntity(model, chat));
.subscribeOn(subscriberScheduler())
.observeOn(observerScheduler());
} }
@Override @Override
public Completable deleteDirectChat(UUID chatId) { public Completable deleteDirectChat(UUID chatId) {
return dao.delete(chatId) return dao.delete(chatId)
.ignoreElement() .ignoreElement();
.subscribeOn(subscriberScheduler())
.observeOn(observerScheduler());
} }
} }

View File

@ -1,10 +1,10 @@
package org.mercury_im.messenger.data.repository; package org.mercury_im.messenger.data.repository;
import org.mercury_im.messenger.core.SchedulersFacade;
import org.mercury_im.messenger.core.data.repository.EntityCapsRepository; import org.mercury_im.messenger.core.data.repository.EntityCapsRepository;
import org.mercury_im.messenger.data.mapping.EntityCapsMapping; import org.mercury_im.messenger.data.mapping.EntityCapsMapping;
import org.mercury_im.messenger.data.model.EntityCapsModel; import org.mercury_im.messenger.data.model.EntityCapsModel;
import org.mercury_im.messenger.entity.caps.EntityCapsRecord; import org.mercury_im.messenger.entity.caps.EntityCapsRecord;
import org.mercury_im.messenger.core.util.ThreadUtils;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@ -20,17 +20,15 @@ import io.requery.Persistable;
import io.requery.reactivex.ReactiveEntityStore; import io.requery.reactivex.ReactiveEntityStore;
import io.requery.reactivex.ReactiveResult; import io.requery.reactivex.ReactiveResult;
public class XmppEntityCapsRepository extends RequeryRepository implements EntityCapsRepository { public class RxEntityCapsRepository extends RequeryRepository implements EntityCapsRepository {
private final EntityCapsMapping entityCapsMapping; private final EntityCapsMapping entityCapsMapping;
@Inject @Inject
public XmppEntityCapsRepository( public RxEntityCapsRepository(
ReactiveEntityStore<Persistable> data, ReactiveEntityStore<Persistable> data,
@Named(value = ThreadUtils.SCHEDULER_IO) Scheduler subscriberScheduler,
@Named(value = ThreadUtils.SCHEDULER_UI) Scheduler observerScheduler,
EntityCapsMapping mapping) { EntityCapsMapping mapping) {
super(data, subscriberScheduler, observerScheduler); super(data);
this.entityCapsMapping = mapping; this.entityCapsMapping = mapping;
} }
@ -39,9 +37,7 @@ public class XmppEntityCapsRepository extends RequeryRepository implements Entit
return data().select(EntityCapsModel.class).get() return data().select(EntityCapsModel.class).get()
.observableResult() .observableResult()
.map(result -> result.toMap(EntityCapsModel.NODE_VER, new ConcurrentHashMap<>())) .map(result -> result.toMap(EntityCapsModel.NODE_VER, new ConcurrentHashMap<>()))
.map(this::mapModelsToEntities) .map(this::mapModelsToEntities);
.subscribeOn(subscriberScheduler())
.observeOn(observerScheduler());
} }
private Map<String, EntityCapsRecord> mapModelsToEntities(Map<String, EntityCapsModel> models) { private Map<String, EntityCapsRecord> mapModelsToEntities(Map<String, EntityCapsModel> models) {
@ -65,9 +61,7 @@ public class XmppEntityCapsRepository extends RequeryRepository implements Entit
return data().select(EntityCapsModel.class) return data().select(EntityCapsModel.class)
.get().observableResult() .get().observableResult()
.flatMap(ReactiveResult::observable) .flatMap(ReactiveResult::observable)
.map(entityCapsMapping::toEntity) .map(entityCapsMapping::toEntity);
.subscribeOn(subscriberScheduler())
.observeOn(observerScheduler());
} }
@Override @Override
@ -81,8 +75,6 @@ public class XmppEntityCapsRepository extends RequeryRepository implements Entit
@Override @Override
public Completable insertEntityCapsRecord(EntityCapsRecord entityCapsRecord) { public Completable insertEntityCapsRecord(EntityCapsRecord entityCapsRecord) {
return data().upsert(entityCapsMapping.toModel(entityCapsRecord)) return data().upsert(entityCapsMapping.toModel(entityCapsRecord))
.ignoreElement() .ignoreElement();
.subscribeOn(subscriberScheduler())
.observeOn(observerScheduler());
} }
} }

View File

@ -1,31 +1,28 @@
package org.mercury_im.messenger.data.repository; package org.mercury_im.messenger.data.repository;
import org.mercury_im.messenger.core.data.repository.GroupChatRepository; import org.mercury_im.messenger.core.data.repository.GroupChatRepository;
import org.mercury_im.messenger.core.util.Optional;
import org.mercury_im.messenger.data.mapping.GroupChatMapping; import org.mercury_im.messenger.data.mapping.GroupChatMapping;
import org.mercury_im.messenger.data.model.GroupChatModel; import org.mercury_im.messenger.data.model.GroupChatModel;
import org.mercury_im.messenger.data.repository.dao.GroupChatDao; import org.mercury_im.messenger.data.repository.dao.GroupChatDao;
import org.mercury_im.messenger.core.util.Optional;
import org.mercury_im.messenger.entity.Account; import org.mercury_im.messenger.entity.Account;
import org.mercury_im.messenger.entity.chat.GroupChat; import org.mercury_im.messenger.entity.chat.GroupChat;
import org.mercury_im.messenger.core.util.ThreadUtils;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.UUID; import java.util.UUID;
import javax.inject.Inject; import javax.inject.Inject;
import javax.inject.Named;
import io.reactivex.Completable; import io.reactivex.Completable;
import io.reactivex.Maybe; import io.reactivex.Maybe;
import io.reactivex.Observable; import io.reactivex.Observable;
import io.reactivex.Scheduler;
import io.reactivex.Single; import io.reactivex.Single;
import io.requery.Persistable; import io.requery.Persistable;
import io.requery.query.ResultDelegate; import io.requery.query.ResultDelegate;
import io.requery.reactivex.ReactiveEntityStore; import io.requery.reactivex.ReactiveEntityStore;
public class XmppGroupChatRepository public class RxGroupChatRepository
extends RequeryRepository extends RequeryRepository
implements GroupChatRepository { implements GroupChatRepository {
@ -34,12 +31,10 @@ public class XmppGroupChatRepository
private final GroupChatDao dao; private final GroupChatDao dao;
@Inject @Inject
public XmppGroupChatRepository( public RxGroupChatRepository(
ReactiveEntityStore<Persistable> data, ReactiveEntityStore<Persistable> data,
@Named(value = ThreadUtils.SCHEDULER_IO) Scheduler subscriberScheduler,
@Named(value = ThreadUtils.SCHEDULER_UI) Scheduler observerScheduler,
GroupChatMapping groupChatMapping) { GroupChatMapping groupChatMapping) {
super(data, subscriberScheduler, observerScheduler); super(data);
this.groupChatMapping = groupChatMapping; this.groupChatMapping = groupChatMapping;
this.dao = new GroupChatDao(data); this.dao = new GroupChatDao(data);
} }
@ -49,26 +44,20 @@ public class XmppGroupChatRepository
return Single.just(chat) return Single.just(chat)
.map(groupChatMapping::toModel) .map(groupChatMapping::toModel)
.flatMap(dao::insert) .flatMap(dao::insert)
.map(model -> groupChatMapping.toEntity(model, chat)) .map(model -> groupChatMapping.toEntity(model, chat));
.subscribeOn(subscriberScheduler())
.observeOn(observerScheduler());
} }
@Override @Override
public Observable<Optional<GroupChat>> observeGroupChat(UUID chatId) { public Observable<Optional<GroupChat>> observeGroupChat(UUID chatId) {
return dao.get(chatId).observableResult() return dao.get(chatId).observableResult()
.map(result -> new Optional<>(result.firstOrNull())) .map(result -> new Optional<>(result.firstOrNull()))
.map(groupChatMapping::toEntity) .map(groupChatMapping::toEntity);
.subscribeOn(subscriberScheduler())
.observeOn(observerScheduler());
} }
@Override @Override
public Maybe<GroupChat> getGroupChat(UUID chatId) { public Maybe<GroupChat> getGroupChat(UUID chatId) {
return dao.get(chatId).maybe() return dao.get(chatId).maybe()
.map(groupChatMapping::toEntity) .map(groupChatMapping::toEntity);
.subscribeOn(subscriberScheduler())
.observeOn(observerScheduler());
} }
@Override @Override
@ -80,35 +69,27 @@ public class XmppGroupChatRepository
chat.setRoomAddress(roomAddress); chat.setRoomAddress(roomAddress);
return chat; return chat;
}) })
.flatMap(this::insertGroupChat)) .flatMap(this::insertGroupChat));
.subscribeOn(subscriberScheduler())
.observeOn(observerScheduler());
} }
@Override @Override
public Observable<Optional<GroupChat>> observeGroupChatByRoomAddress(UUID accountId, String roomAddress) { public Observable<Optional<GroupChat>> observeGroupChatByRoomAddress(UUID accountId, String roomAddress) {
return dao.get(accountId, roomAddress).observableResult() return dao.get(accountId, roomAddress).observableResult()
.map(result -> new Optional<>(result.firstOrNull())) .map(result -> new Optional<>(result.firstOrNull()))
.map(groupChatMapping::toEntity) .map(groupChatMapping::toEntity);
.subscribeOn(subscriberScheduler())
.observeOn(observerScheduler());
} }
@Override @Override
public Maybe<GroupChat> getGroupChatByRoomAddress(UUID accountId, String roomAddress) { public Maybe<GroupChat> getGroupChatByRoomAddress(UUID accountId, String roomAddress) {
return dao.get(accountId, roomAddress).maybe() return dao.get(accountId, roomAddress).maybe()
.map(groupChatMapping::toEntity) .map(groupChatMapping::toEntity);
.subscribeOn(subscriberScheduler())
.observeOn(observerScheduler());
} }
@Override @Override
public Observable<List<GroupChat>> observeAllGroupChats() { public Observable<List<GroupChat>> observeAllGroupChats() {
return dao.getAll().observableResult() return dao.getAll().observableResult()
.map(ResultDelegate::toList) .map(ResultDelegate::toList)
.map(this::modelsToEntities) .map(this::modelsToEntities);
.subscribeOn(subscriberScheduler())
.observeOn(observerScheduler());
} }
@Override @Override
@ -117,9 +98,7 @@ public class XmppGroupChatRepository
.where(GroupChatModel.ACCOUNT_ID.eq(accountId)) .where(GroupChatModel.ACCOUNT_ID.eq(accountId))
.get().observableResult() .get().observableResult()
.map(ResultDelegate::toList) .map(ResultDelegate::toList)
.map(this::modelsToEntities) .map(this::modelsToEntities);
.subscribeOn(subscriberScheduler())
.observeOn(observerScheduler());
} }
private List<GroupChat> modelsToEntities(List<GroupChatModel> models) { private List<GroupChat> modelsToEntities(List<GroupChatModel> models) {
@ -135,9 +114,7 @@ public class XmppGroupChatRepository
return dao.get(chat.getId()).maybe().toSingle() return dao.get(chat.getId()).maybe().toSingle()
.map(model -> groupChatMapping.toModel(chat, model)) .map(model -> groupChatMapping.toModel(chat, model))
.flatMap(data()::update) .flatMap(data()::update)
.map(model -> groupChatMapping.toEntity(model, chat)) .map(model -> groupChatMapping.toEntity(model, chat));
.subscribeOn(subscriberScheduler())
.observeOn(observerScheduler());
} }
@Override @Override
@ -147,15 +124,11 @@ public class XmppGroupChatRepository
.flatMap(dao::insert)) .flatMap(dao::insert))
.map(model -> groupChatMapping.toModel(chat, model)) .map(model -> groupChatMapping.toModel(chat, model))
.flatMap(data()::update) .flatMap(data()::update)
.map(model -> groupChatMapping.toEntity(model, chat)) .map(model -> groupChatMapping.toEntity(model, chat));
.subscribeOn(subscriberScheduler())
.observeOn(observerScheduler());
} }
@Override @Override
public Completable deleteGroupChat(UUID chatId) { public Completable deleteGroupChat(UUID chatId) {
return dao.delete(chatId).ignoreElement() return dao.delete(chatId).ignoreElement();
.subscribeOn(subscriberScheduler())
.observeOn(observerScheduler());
} }
} }

View File

@ -15,23 +15,20 @@ import org.mercury_im.messenger.data.repository.dao.MessageDao;
import org.mercury_im.messenger.entity.chat.DirectChat; import org.mercury_im.messenger.entity.chat.DirectChat;
import org.mercury_im.messenger.entity.chat.GroupChat; import org.mercury_im.messenger.entity.chat.GroupChat;
import org.mercury_im.messenger.entity.message.Message; import org.mercury_im.messenger.entity.message.Message;
import org.mercury_im.messenger.core.util.ThreadUtils;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import javax.inject.Inject; import javax.inject.Inject;
import javax.inject.Named;
import io.reactivex.Completable; import io.reactivex.Completable;
import io.reactivex.Observable; import io.reactivex.Observable;
import io.reactivex.Scheduler;
import io.reactivex.Single; import io.reactivex.Single;
import io.requery.Persistable; import io.requery.Persistable;
import io.requery.query.ResultDelegate; import io.requery.query.ResultDelegate;
import io.requery.reactivex.ReactiveEntityStore; import io.requery.reactivex.ReactiveEntityStore;
public class XmppMessageRepository public class RxMessageRepository
extends RequeryRepository extends RequeryRepository
implements MessageRepository { implements MessageRepository {
@ -46,13 +43,11 @@ public class XmppMessageRepository
@Inject @Inject
public XmppMessageRepository(ReactiveEntityStore<Persistable> data, public RxMessageRepository(ReactiveEntityStore<Persistable> data,
@Named(value = ThreadUtils.SCHEDULER_IO) Scheduler subscriberScheduler, MessageMapping messageMapping,
@Named(value = ThreadUtils.SCHEDULER_UI) Scheduler observerScheduler, DirectChatMapping directChatMapping,
MessageMapping messageMapping, GroupChatMapping groupChatMapping) {
DirectChatMapping directChatMapping, super(data);
GroupChatMapping groupChatMapping) {
super(data, subscriberScheduler, observerScheduler);
this.messageMapping = messageMapping; this.messageMapping = messageMapping;
this.directChatMapping = directChatMapping; this.directChatMapping = directChatMapping;
this.groupChatMapping = groupChatMapping; this.groupChatMapping = groupChatMapping;
@ -68,9 +63,7 @@ public class XmppMessageRepository
.map(chatModel -> toRelation(chatModel, messageMapping.toModel(message))) .map(chatModel -> toRelation(chatModel, messageMapping.toModel(message)))
.flatMap(data()::insert) .flatMap(data()::insert)
.map(DirectMessagesRelation::getMessage) .map(DirectMessagesRelation::getMessage)
.map(messageModel -> messageMapping.toEntity(messageModel, message)) .map(messageModel -> messageMapping.toEntity(messageModel, message));
.subscribeOn(subscriberScheduler())
.observeOn(observerScheduler());
} }
@Override @Override
@ -80,9 +73,7 @@ public class XmppMessageRepository
.map(chatModel -> toRelation(chatModel, messageMapping.toModel(message))) .map(chatModel -> toRelation(chatModel, messageMapping.toModel(message)))
.flatMap(data()::insert) .flatMap(data()::insert)
.map(GroupMessagesRelation::getMessage) .map(GroupMessagesRelation::getMessage)
.map(messageModel -> messageMapping.toEntity(messageModel, message)) .map(messageModel -> messageMapping.toEntity(messageModel, message));
.subscribeOn(subscriberScheduler())
.observeOn(observerScheduler());
} }
@Override @Override
@ -94,9 +85,7 @@ public class XmppMessageRepository
.where(DirectMessagesRelation.CHAT_ID.eq(chat.getId())) .where(DirectMessagesRelation.CHAT_ID.eq(chat.getId()))
.get().observableResult() .get().observableResult()
.map(ResultDelegate::toList) .map(ResultDelegate::toList)
.map(this::messageModelsToEntities) .map(this::messageModelsToEntities);
.subscribeOn(subscriberScheduler())
.observeOn(observerScheduler());
} }
@Override @Override
@ -108,9 +97,7 @@ public class XmppMessageRepository
.where(GroupMessagesRelation.CHAT_ID.eq(chat.getId())) .where(GroupMessagesRelation.CHAT_ID.eq(chat.getId()))
.get().observableResult() .get().observableResult()
.map(ResultDelegate::toList) .map(ResultDelegate::toList)
.map(this::messageModelsToEntities) .map(this::messageModelsToEntities);
.subscribeOn(subscriberScheduler())
.observeOn(observerScheduler());
} }
@Override @Override
@ -120,9 +107,7 @@ public class XmppMessageRepository
.where(MessageModel.BODY.eq(body)) .where(MessageModel.BODY.eq(body))
.get().observableResult() .get().observableResult()
.map(ResultDelegate::toList) .map(ResultDelegate::toList)
.map(this::messageModelsToEntities) .map(this::messageModelsToEntities);
.subscribeOn(subscriberScheduler())
.observeOn(observerScheduler());
} }
@Override @Override
@ -137,9 +122,7 @@ public class XmppMessageRepository
.and(DirectMessagesRelation.CHAT_ID.eq(chat.getId())) .and(DirectMessagesRelation.CHAT_ID.eq(chat.getId()))
.get().observableResult() .get().observableResult()
.map(ResultDelegate::toList) .map(ResultDelegate::toList)
.map(this::messageModelsToEntities) .map(this::messageModelsToEntities);
.subscribeOn(subscriberScheduler())
.observeOn(observerScheduler());
} }
@Override @Override
@ -154,9 +137,7 @@ public class XmppMessageRepository
.and(GroupMessagesRelation.CHAT_ID.eq(chat.getId())) .and(GroupMessagesRelation.CHAT_ID.eq(chat.getId()))
.get().observableResult() .get().observableResult()
.map(ResultDelegate::toList) .map(ResultDelegate::toList)
.map(this::messageModelsToEntities) .map(this::messageModelsToEntities);
.subscribeOn(subscriberScheduler())
.observeOn(observerScheduler());
} }
@Override @Override
@ -174,9 +155,7 @@ public class XmppMessageRepository
return dao.get(message.getId()).maybe().toSingle() return dao.get(message.getId()).maybe().toSingle()
.map(model -> messageMapping.toModel(message, model)) .map(model -> messageMapping.toModel(message, model))
.flatMap(data()::update) .flatMap(data()::update)
.map(model -> messageMapping.toEntity(model, message)) .map(model -> messageMapping.toEntity(model, message));
.subscribeOn(subscriberScheduler())
.observeOn(observerScheduler());
} }
@Override @Override

View File

@ -2,30 +2,27 @@ package org.mercury_im.messenger.data.repository;
import org.mercury_im.messenger.core.data.repository.AccountRepository; import org.mercury_im.messenger.core.data.repository.AccountRepository;
import org.mercury_im.messenger.core.data.repository.PeerRepository; import org.mercury_im.messenger.core.data.repository.PeerRepository;
import org.mercury_im.messenger.core.util.Optional;
import org.mercury_im.messenger.data.mapping.PeerMapping; import org.mercury_im.messenger.data.mapping.PeerMapping;
import org.mercury_im.messenger.data.model.PeerModel; import org.mercury_im.messenger.data.model.PeerModel;
import org.mercury_im.messenger.entity.Account; import org.mercury_im.messenger.entity.Account;
import org.mercury_im.messenger.entity.contact.Peer; import org.mercury_im.messenger.entity.contact.Peer;
import org.mercury_im.messenger.core.util.Optional;
import org.mercury_im.messenger.core.util.ThreadUtils;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.UUID; import java.util.UUID;
import javax.inject.Inject; import javax.inject.Inject;
import javax.inject.Named;
import io.reactivex.Completable; import io.reactivex.Completable;
import io.reactivex.Maybe; import io.reactivex.Maybe;
import io.reactivex.Observable; import io.reactivex.Observable;
import io.reactivex.Scheduler;
import io.reactivex.Single; import io.reactivex.Single;
import io.requery.Persistable; import io.requery.Persistable;
import io.requery.query.ResultDelegate; import io.requery.query.ResultDelegate;
import io.requery.reactivex.ReactiveEntityStore; import io.requery.reactivex.ReactiveEntityStore;
public class XmppPeerRepository public class RxPeerRepository
extends RequeryRepository extends RequeryRepository
implements PeerRepository { implements PeerRepository {
@ -34,11 +31,9 @@ public class XmppPeerRepository
private final PeerMapping peerMapping; private final PeerMapping peerMapping;
@Inject @Inject
public XmppPeerRepository(ReactiveEntityStore<Persistable> data, public RxPeerRepository(ReactiveEntityStore<Persistable> data,
@Named(value = ThreadUtils.SCHEDULER_IO) Scheduler subscriberScheduler, PeerMapping peerMapping, AccountRepository accountRepository) {
@Named(value = ThreadUtils.SCHEDULER_UI) Scheduler observerScheduler, super(data);
PeerMapping peerMapping, AccountRepository accountRepository) {
super(data, subscriberScheduler, observerScheduler);
this.peerMapping = peerMapping; this.peerMapping = peerMapping;
this.accountRepository = accountRepository; this.accountRepository = accountRepository;
} }
@ -46,9 +41,7 @@ public class XmppPeerRepository
@Override @Override
public Single<Peer> insertPeer(Peer peer) { public Single<Peer> insertPeer(Peer peer) {
return data().insert(peerMapping.toModel(peer, new PeerModel())) return data().insert(peerMapping.toModel(peer, new PeerModel()))
.map(model -> peerMapping.toEntity(model, peer)) .map(model -> peerMapping.toEntity(model, peer));
.subscribeOn(subscriberScheduler())
.observeOn(observerScheduler());
} }
@Override @Override
@ -56,9 +49,7 @@ public class XmppPeerRepository
return data().select(PeerModel.class) return data().select(PeerModel.class)
.where(PeerModel.ID.eq(peerId)) .where(PeerModel.ID.eq(peerId))
.get().observableResult() .get().observableResult()
.map(result -> new Optional<>(peerMapping.toEntity(result.firstOrNull(), new Peer()))) .map(result -> new Optional<>(peerMapping.toEntity(result.firstOrNull(), new Peer())));
.subscribeOn(subscriberScheduler())
.observeOn(observerScheduler());
} }
@Override @Override
@ -66,9 +57,7 @@ public class XmppPeerRepository
return data().select(PeerModel.class) return data().select(PeerModel.class)
.where(PeerModel.ID.eq(peerId)) .where(PeerModel.ID.eq(peerId))
.get().maybe() .get().maybe()
.map(model -> peerMapping.toEntity(model, new Peer())) .map(model -> peerMapping.toEntity(model, new Peer()));
.subscribeOn(subscriberScheduler())
.observeOn(observerScheduler());
} }
@Override @Override
@ -78,9 +67,7 @@ public class XmppPeerRepository
.and(PeerModel.ADDRESS.eq(address)) .and(PeerModel.ADDRESS.eq(address))
.get().observableResult() .get().observableResult()
.map(result -> new Optional<>(result.firstOrNull())) .map(result -> new Optional<>(result.firstOrNull()))
.map(peerMapping::toEntity) .map(peerMapping::toEntity);
.subscribeOn(subscriberScheduler())
.observeOn(observerScheduler());
} }
@Override @Override
@ -95,9 +82,7 @@ public class XmppPeerRepository
.where(PeerModel.ACCOUNT_ID.eq(accountId)) .where(PeerModel.ACCOUNT_ID.eq(accountId))
.and(PeerModel.ADDRESS.eq(address)) .and(PeerModel.ADDRESS.eq(address))
.get().maybe() .get().maybe()
.map(peerMapping::toEntity) .map(peerMapping::toEntity);
.subscribeOn(subscriberScheduler())
.observeOn(observerScheduler());
} }
@Override @Override
@ -110,9 +95,7 @@ public class XmppPeerRepository
peer.setAddress(address); peer.setAddress(address);
return peer; return peer;
}) })
.flatMap(this::insertPeer)) .flatMap(this::insertPeer));
.subscribeOn(subscriberScheduler())
.observeOn(observerScheduler());
} }
@Override @Override
@ -121,9 +104,7 @@ public class XmppPeerRepository
.orderBy(PeerModel.ADDRESS) .orderBy(PeerModel.ADDRESS)
.get().observableResult() .get().observableResult()
.map(ResultDelegate::toList) .map(ResultDelegate::toList)
.map(this::peerModelsToEntities) .map(this::peerModelsToEntities);
.subscribeOn(subscriberScheduler())
.observeOn(observerScheduler());
} }
private List<Peer> peerModelsToEntities(List<PeerModel> peerModels) { private List<Peer> peerModelsToEntities(List<PeerModel> peerModels) {
@ -141,9 +122,7 @@ public class XmppPeerRepository
//.and(isContact()) //.and(isContact())
.get().observableResult() .get().observableResult()
.map(ResultDelegate::toList) .map(ResultDelegate::toList)
.map(this::peerModelsToEntities) .map(this::peerModelsToEntities);
.subscribeOn(subscriberScheduler())
.observeOn(observerScheduler());
} }
@Override @Override
@ -155,9 +134,7 @@ public class XmppPeerRepository
// write changes into model // write changes into model
.map(model -> peerMapping.toModel(peer, model)) .map(model -> peerMapping.toModel(peer, model))
.flatMap(data()::update) .flatMap(data()::update)
.map(model -> peerMapping.toEntity(model, peer)) .map(model -> peerMapping.toEntity(model, peer));
.subscribeOn(subscriberScheduler())
.observeOn(observerScheduler());
} }
@Override @Override
@ -173,18 +150,14 @@ public class XmppPeerRepository
.map(model -> peerMapping.toModel(peer, model)) .map(model -> peerMapping.toModel(peer, model))
// write changed model back to db // write changed model back to db
.flatMap(data()::update) .flatMap(data()::update)
.map(model -> peerMapping.toEntity(model, peer)) .map(model -> peerMapping.toEntity(model, peer));
.subscribeOn(subscriberScheduler())
.observeOn(observerScheduler());
} }
@Override @Override
public Completable deletePeer(Peer peer) { public Completable deletePeer(Peer peer) {
return data().delete(PeerModel.class) return data().delete(PeerModel.class)
.where(PeerModel.ID.eq(peer.getId())) .where(PeerModel.ID.eq(peer.getId()))
.get().single().ignoreElement() .get().single().ignoreElement();
.subscribeOn(subscriberScheduler())
.observeOn(observerScheduler());
} }
@Override @Override
@ -192,17 +165,13 @@ public class XmppPeerRepository
return data().delete(PeerModel.class) return data().delete(PeerModel.class)
.where(PeerModel.ACCOUNT_ID.eq(accountId) .where(PeerModel.ACCOUNT_ID.eq(accountId)
.and(PeerModel.ADDRESS.eq(address))) .and(PeerModel.ADDRESS.eq(address)))
.get().single().ignoreElement() .get().single().ignoreElement();
.subscribeOn(subscriberScheduler())
.observeOn(observerScheduler());
} }
@Override @Override
public Completable deleteAllPeers(UUID accountId) { public Completable deleteAllPeers(UUID accountId) {
return data().delete(PeerModel.class) return data().delete(PeerModel.class)
.where(PeerModel.ACCOUNT_ID.eq(accountId)) .where(PeerModel.ACCOUNT_ID.eq(accountId))
.get().single().ignoreElement() .get().single().ignoreElement();
.subscribeOn(subscriberScheduler())
.observeOn(observerScheduler());
} }
} }

View File

@ -1,5 +1,8 @@
package org.mercury_im.messenger.data.di; package org.mercury_im.messenger.data.di.component;
import org.mercury_im.messenger.data.di.RepositoryModule;
import org.mercury_im.messenger.data.di.module.TestDatabaseModule;
import org.mercury_im.messenger.data.di.module.TestingSchedulerModule;
import org.mercury_im.messenger.data.repository.AccountRepositoryTest; import org.mercury_im.messenger.data.repository.AccountRepositoryTest;
import javax.inject.Singleton; import javax.inject.Singleton;

View File

@ -1,6 +1,7 @@
package org.mercury_im.messenger.data.di; package org.mercury_im.messenger.data.di.component;
import org.mercury_im.messenger.data.di.MappingModule;
import org.mercury_im.messenger.data.mapping.AccountMappingTest; import org.mercury_im.messenger.data.mapping.AccountMappingTest;
import org.mercury_im.messenger.data.mapping.EntityCapsMappingTest; import org.mercury_im.messenger.data.mapping.EntityCapsMappingTest;
import org.mercury_im.messenger.data.mapping.PeerMappingTest; import org.mercury_im.messenger.data.mapping.PeerMappingTest;

View File

@ -1,4 +1,4 @@
package org.mercury_im.messenger.data.di; package org.mercury_im.messenger.data.di.module;
import org.mercury_im.messenger.data.model.Models; import org.mercury_im.messenger.data.model.Models;
import org.sqlite.SQLiteConfig; import org.sqlite.SQLiteConfig;

View File

@ -1,6 +1,6 @@
package org.mercury_im.messenger.data.di; package org.mercury_im.messenger.data.di.module;
import org.mercury_im.messenger.core.util.ThreadUtils; import org.mercury_im.messenger.core.SchedulersFacade;
import javax.inject.Named; import javax.inject.Named;
import javax.inject.Singleton; import javax.inject.Singleton;
@ -15,14 +15,14 @@ public class TestingSchedulerModule {
@Provides @Provides
@Singleton @Singleton
@Named(value = ThreadUtils.SCHEDULER_IO) @Named(value = SchedulersFacade.SCHEDULER_IO)
public static Scheduler provideSubscriberScheduler() { public static Scheduler provideSubscriberScheduler() {
return Schedulers.io(); return Schedulers.io();
} }
@Provides @Provides
@Singleton @Singleton
@Named(value = ThreadUtils.SCHEDULER_UI) @Named(value = SchedulersFacade.SCHEDULER_UI)
public static Scheduler provideObserverScheduler() { public static Scheduler provideObserverScheduler() {
return Schedulers.trampoline(); return Schedulers.trampoline();
} }

View File

@ -1,14 +1,13 @@
package org.mercury_im.messenger.data.mapping; package org.mercury_im.messenger.data.mapping;
import org.junit.Test; import org.junit.Test;
import org.mercury_im.messenger.data.di.DaggerMappingTestComponent; import org.mercury_im.messenger.data.di.component.DaggerMappingTestComponent;
import org.mercury_im.messenger.data.model.AccountModel; import org.mercury_im.messenger.data.model.AccountModel;
import org.mercury_im.messenger.entity.Account; import org.mercury_im.messenger.entity.Account;
import javax.inject.Inject; import javax.inject.Inject;
import static junit.framework.TestCase.assertEquals; import static junit.framework.TestCase.assertEquals;
import static junit.framework.TestCase.assertNotSame;
public class AccountMappingTest { public class AccountMappingTest {
@ -19,12 +18,12 @@ public class AccountMappingTest {
public static final Account ACCOUNT_LITTLE_JOE; public static final Account ACCOUNT_LITTLE_JOE;
static { static {
ACCOUNT_MISSION_CONTROL = new IAccount(); ACCOUNT_MISSION_CONTROL = new Account();
ACCOUNT_MISSION_CONTROL.setAddress("mission-controll@planet.earth"); ACCOUNT_MISSION_CONTROL.setAddress("mission-controll@planet.earth");
ACCOUNT_MISSION_CONTROL.setEnabled(true); ACCOUNT_MISSION_CONTROL.setEnabled(true);
ACCOUNT_MISSION_CONTROL.setPassword("notBecauseTheyAreEasy"); ACCOUNT_MISSION_CONTROL.setPassword("notBecauseTheyAreEasy");
ACCOUNT_LITTLE_JOE = new IAccount(); ACCOUNT_LITTLE_JOE = new Account();
ACCOUNT_LITTLE_JOE.setAddress("little-joe@planet.earth"); ACCOUNT_LITTLE_JOE.setAddress("little-joe@planet.earth");
ACCOUNT_LITTLE_JOE.setEnabled(false); ACCOUNT_LITTLE_JOE.setEnabled(false);
ACCOUNT_LITTLE_JOE.setPassword("butBecauseTheyAreHard"); ACCOUNT_LITTLE_JOE.setPassword("butBecauseTheyAreHard");

View File

@ -1,7 +1,7 @@
package org.mercury_im.messenger.data.mapping; package org.mercury_im.messenger.data.mapping;
import org.junit.Test; import org.junit.Test;
import org.mercury_im.messenger.data.di.DaggerMappingTestComponent; import org.mercury_im.messenger.data.di.component.DaggerMappingTestComponent;
import org.mercury_im.messenger.data.model.EntityCapsModel; import org.mercury_im.messenger.data.model.EntityCapsModel;
import org.mercury_im.messenger.entity.caps.EntityCapsRecord; import org.mercury_im.messenger.entity.caps.EntityCapsRecord;
@ -20,7 +20,7 @@ public class EntityCapsMappingTest {
@Test @Test
public void mapEntityToModelTest() { public void mapEntityToModelTest() {
EntityCapsRecord entity = new IEntityCapsRecord(); EntityCapsRecord entity = new EntityCapsRecord();
entity.setNodeVer("thisisahash"); entity.setNodeVer("thisisahash");
entity.setXml("<xml/>"); entity.setXml("<xml/>");

View File

@ -1,7 +1,7 @@
package org.mercury_im.messenger.data.mapping; package org.mercury_im.messenger.data.mapping;
import org.junit.Test; import org.junit.Test;
import org.mercury_im.messenger.data.di.DaggerMappingTestComponent; import org.mercury_im.messenger.data.di.component.DaggerMappingTestComponent;
import org.mercury_im.messenger.data.model.AccountModel; import org.mercury_im.messenger.data.model.AccountModel;
import org.mercury_im.messenger.data.model.PeerModel; import org.mercury_im.messenger.data.model.PeerModel;
import org.mercury_im.messenger.entity.contact.Peer; import org.mercury_im.messenger.entity.contact.Peer;
@ -19,7 +19,7 @@ public class PeerMappingTest {
public static final Peer PEER_GORDO; public static final Peer PEER_GORDO;
static { static {
PEER_GORDO = new IPeer(); PEER_GORDO = new Peer();
PEER_GORDO.setAccount(AccountMappingTest.ACCOUNT_MISSION_CONTROL); PEER_GORDO.setAccount(AccountMappingTest.ACCOUNT_MISSION_CONTROL);
PEER_GORDO.setAddress("gordo@big.joe"); PEER_GORDO.setAddress("gordo@big.joe");
PEER_GORDO.setName("Gordo"); PEER_GORDO.setName("Gordo");

View File

@ -1,8 +1,8 @@
package org.mercury_im.messenger.data.repository; package org.mercury_im.messenger.data.repository;
import org.junit.Test; import org.junit.Test;
import org.mercury_im.messenger.data.di.DaggerInMemoryDatabaseComponent; import org.mercury_im.messenger.data.di.component.DaggerInMemoryDatabaseComponent;
import org.mercury_im.messenger.data.di.InMemoryDatabaseComponent; import org.mercury_im.messenger.data.di.component.InMemoryDatabaseComponent;
import org.mercury_im.messenger.entity.Account; import org.mercury_im.messenger.entity.Account;
import java.util.NoSuchElementException; import java.util.NoSuchElementException;
@ -24,16 +24,16 @@ public class AccountRepositoryTest {
ReactiveEntityStore<Persistable> dataStore; ReactiveEntityStore<Persistable> dataStore;
@Inject @Inject
XmppAccountRepository accountRepository; RxAccountRepository accountRepository;
@Inject @Inject
XmppDirectChatRepository directChatRepository; RxDirectChatRepository directChatRepository;
@Inject @Inject
XmppPeerRepository contactRepository; RxPeerRepository contactRepository;
@Inject @Inject
XmppMessageRepository messageRepository; RxMessageRepository messageRepository;
@Inject @Inject
public AccountRepositoryTest() { public AccountRepositoryTest() {
@ -51,7 +51,7 @@ public class AccountRepositoryTest {
Thread.sleep(100); Thread.sleep(100);
Account a1 = new IAccount(); Account a1 = new Account();
a1.setAddress("a1@example.com"); a1.setAddress("a1@example.com");
a1.setPassword("a1a1a1"); a1.setPassword("a1a1a1");
a1.setEnabled(true); a1.setEnabled(true);
@ -60,7 +60,7 @@ public class AccountRepositoryTest {
Thread.sleep(100); Thread.sleep(100);
Account a2 = new IAccount(); Account a2 = new Account();
a2.setAddress("a2@example.com"); a2.setAddress("a2@example.com");
a2.setPassword("a2a2a2"); a2.setPassword("a2a2a2");
a2.setEnabled(false); a2.setEnabled(false);
@ -69,7 +69,7 @@ public class AccountRepositoryTest {
Thread.sleep(100); Thread.sleep(100);
Account a3 = new IAccount(); Account a3 = new Account();
a3.setAddress("a3@example.com"); a3.setAddress("a3@example.com");
a3.setPassword("a3a3a3"); a3.setPassword("a3a3a3");
a3.setEnabled(false); a3.setEnabled(false);
@ -100,7 +100,8 @@ public class AccountRepositoryTest {
} }
})); }));
Account account = new IAccount(uuid); Account account = new Account();
account.setId(uuid);
account.setEnabled(true); account.setEnabled(true);
account.setAddress("hello@world"); account.setAddress("hello@world");
account.setPassword("wooooooh"); account.setPassword("wooooooh");
@ -120,7 +121,7 @@ public class AccountRepositoryTest {
@Test(expected = NoSuchElementException.class) @Test(expected = NoSuchElementException.class)
public void updateMissingEntityFails() { public void updateMissingEntityFails() {
Account missingAccount = new IAccount(); Account missingAccount = new Account();
missingAccount.setAddress("this@account.is.missing"); missingAccount.setAddress("this@account.is.missing");
missingAccount.setPassword("inTheDatabase"); missingAccount.setPassword("inTheDatabase");