Improve PeerRepository + impl

This commit is contained in:
Paul Schaub 2019-12-02 02:27:32 +01:00
parent 39da217460
commit 8d7dd5687e
Signed by: vanitasvitae
GPG key ID: 62BEE9264BF17311
2 changed files with 117 additions and 20 deletions

View file

@ -1,9 +1,14 @@
package org.mercury_im.messenger.data.repository;
import org.mercury_im.messenger.data.mapping.PeerMapping;
import org.mercury_im.messenger.data.model.PeerModel;
import org.mercury_im.messenger.data.util.Optional;
import org.mercury_im.messenger.entity.Account;
import org.mercury_im.messenger.entity.contact.IPeer;
import org.mercury_im.messenger.entity.contact.Peer;
import org.mercury_im.messenger.util.ThreadUtils;
import java.util.ArrayList;
import java.util.List;
import javax.inject.Inject;
@ -21,6 +26,9 @@ public class XmppPeerRepository
extends RequeryRepository
implements PeerRepository {
@Inject
PeerMapping peerMapping;
@Inject
public XmppPeerRepository(ReactiveEntityStore<Persistable> data,
@Named(value = ThreadUtils.SCHEDULER_IO) Scheduler subscriberScheduler,
@ -29,47 +37,125 @@ public class XmppPeerRepository
}
@Override
public Single<Peer> insertPeer(Peer contact) {
return null;
public Single<Peer> insertPeer(Peer peer) {
return data().insert(peerMapping.entityToModel(peer, new PeerModel()))
.map(model -> peerMapping.modelToEntity(model, peer))
.subscribeOn(subscriberScheduler())
.observeOn(observerScheduler());
}
@Override
public Observable<Optional<Peer>> observePeer(long contactId) {
return null;
public Observable<Optional<Peer>> observePeer(long peerId) {
return data().select(PeerModel.class)
.where(PeerModel.ID.eq(peerId))
.get().observableResult()
.map(result -> new Optional<>(peerMapping.modelToEntity(result.firstOrNull(), new IPeer())))
.subscribeOn(subscriberScheduler())
.observeOn(observerScheduler());
}
@Override
public Maybe<Peer> getPeer(long contactId) {
return null;
public Maybe<Peer> getPeer(long peerId) {
return data().select(PeerModel.class)
.where(PeerModel.ID.eq(peerId))
.get().maybe()
.map(model -> peerMapping.modelToEntity(model, new IPeer()))
.subscribeOn(subscriberScheduler())
.observeOn(observerScheduler());
}
@Override
public Observable<Optional<Peer>> observePeerByAddress(String address) {
return null;
public Observable<Optional<Peer>> observePeerByAddress(long accountId, String address) {
return data().select(PeerModel.class)
.where(PeerModel.ACCOUNT_ID.eq(accountId))
.and(PeerModel.ADDRESS.eq(address))
.get().observableResult()
.map(result -> new Optional<>(peerMapping.modelToEntity(result.firstOrNull(), new IPeer())))
.subscribeOn(subscriberScheduler())
.observeOn(observerScheduler());
}
@Override
public Maybe<Peer> getPeerByAddress(String address) {
return null;
public Maybe<Peer> getPeerByAddress(long accountId, String address) {
return data().select(PeerModel.class)
.where(PeerModel.ACCOUNT_ID.eq(accountId))
.and(PeerModel.ADDRESS.eq(address))
.get().maybe()
.map(model -> peerMapping.modelToEntity(model, new IPeer()))
.subscribeOn(subscriberScheduler())
.observeOn(observerScheduler());
}
@Override
public Single<Peer> getOrCreatePeer(Account _account, String _address) {
return getPeerByAddress(_account, _address)
.switchIfEmpty(insertPeer(new IPeer(){
{
setAccount(_account);
setAddress(_address);
}
}))
.subscribeOn(subscriberScheduler())
.observeOn(observerScheduler());
}
@Override
public Observable<List<Peer>> observeAllPeers() {
return null;
return data().select(PeerModel.class)
.get().observableResult()
.map(result -> {
List<PeerModel> peerModels = result.toList();
List<Peer> peerEntities = new ArrayList<>(peerModels.size());
for (PeerModel model : peerModels) {
peerEntities.add(peerMapping.modelToEntity(model, new IPeer()));
}
return peerEntities;
})
.subscribeOn(subscriberScheduler())
.observeOn(observerScheduler());
}
@Override
public Single<Peer> updatePeer(Peer contact) {
return null;
public Single<Peer> updatePeer(Peer peer) {
// In order to update, we fetch the model, update it and write it back.
return data().select(PeerModel.class)
.where(PeerModel.ID.eq(peer.getId()))
.get().maybe().toSingle()
.flatMap(model -> {
// write changes into model
model = peerMapping.entityToModel(peer, model);
// write model back to db
return data().update(model);
})
.map(model -> peerMapping.modelToEntity(model, peer))
.subscribeOn(subscriberScheduler())
.observeOn(observerScheduler());
}
@Override
public Single<Peer> upsertPeer(Peer contact) {
return null;
public Single<Peer> upsertPeer(Peer peer) {
return data().select(PeerModel.class)
.where(PeerModel.ID.eq(peer.getId()))
.get().maybe()
// if not exists, create
.switchIfEmpty(data().insert(peerMapping.entityToModel(peer, new PeerModel())))
.flatMap(model -> {
// write changes into fetched model
model = peerMapping.entityToModel(peer, model);
// write changed model back to db
return data().update(model);
})
.map(model -> peerMapping.modelToEntity(model, peer))
.subscribeOn(subscriberScheduler())
.observeOn(observerScheduler());
}
@Override
public Completable deletePeer(Peer contact) {
return null;
public Completable deletePeer(Peer peer) {
return data().delete(PeerModel.class)
.where(PeerModel.ID.eq(peer.getId()))
.get().single().ignoreElement()
.subscribeOn(subscriberScheduler())
.observeOn(observerScheduler());
}
}

View file

@ -1,7 +1,8 @@
package org.mercury_im.messenger.data.repository;
import org.mercury_im.messenger.entity.contact.Peer;
import org.mercury_im.messenger.data.util.Optional;
import org.mercury_im.messenger.entity.Account;
import org.mercury_im.messenger.entity.contact.Peer;
import java.util.List;
@ -18,9 +19,19 @@ public interface PeerRepository {
Maybe<Peer> getPeer(long PeerId);
Observable<Optional<Peer>> observePeerByAddress(String address);
default Observable<Optional<Peer>> observePeerByAddress(Account account, String address) {
return observePeerByAddress(account.getId(), address);
}
Maybe<Peer> getPeerByAddress(String address);
Observable<Optional<Peer>> observePeerByAddress(long accountId, String address);
default Maybe<Peer> getPeerByAddress(Account account, String address) {
return getPeerByAddress(account.getId(), address);
}
Maybe<Peer> getPeerByAddress(long accountId, String address);
Single<Peer> getOrCreatePeer(Account account, String address);
Observable<List<Peer>> observeAllPeers();