package org.mercury_im.messenger.data.repository; import org.mercury_im.messenger.core.data.repository.DirectChatRepository; import org.mercury_im.messenger.core.util.Optional; import org.mercury_im.messenger.data.mapping.DirectChatMapping; import org.mercury_im.messenger.data.model.DirectChatModel; import org.mercury_im.messenger.data.model.PeerModel; import org.mercury_im.messenger.data.repository.dao.DirectChatDao; import org.mercury_im.messenger.entity.chat.DirectChat; import org.mercury_im.messenger.entity.contact.Peer; import java.util.ArrayList; import java.util.List; import java.util.UUID; import javax.inject.Inject; import io.reactivex.Completable; import io.reactivex.Maybe; import io.reactivex.Observable; import io.reactivex.Single; import io.requery.Persistable; import io.requery.query.ResultDelegate; import io.requery.reactivex.ReactiveEntityStore; public class RxDirectChatRepository extends RequeryRepository implements DirectChatRepository { private final DirectChatMapping directChatMapping; private final DirectChatDao dao; @Inject public RxDirectChatRepository( ReactiveEntityStore data, DirectChatMapping directChatMapping) { super(data); this.directChatMapping = directChatMapping; this.dao = new DirectChatDao(data); } @Override public Single insertDirectChat(DirectChat chat) { return Single.just(chat) // map entity to model .map(directChatMapping::toModel) .flatMap(dao::insert) // map back to entity .map(model -> directChatMapping.toEntity(model, chat)); } @Override public Observable> observeDirectChat(UUID chatId) { return dao.get(chatId).observableResult() .map(result -> new Optional<>(result.firstOrNull())) .map(directChatMapping::toEntity); } @Override public Maybe getDirectChat(UUID chatId) { return dao.get(chatId).maybe() .map(directChatMapping::toEntity); } @Override public Single getOrCreateChatWithPeer(Peer peer) { return getDirectChatByPeer(peer) .switchIfEmpty(Single.just(new DirectChat()) .map(chat -> { chat.setAccount(peer.getAccount()); chat.setPeer(peer); return chat; }) .flatMap(this::insertDirectChat)); } @Override public Observable> observeDirectChatByPeer(Peer peer) { return dao.getByPeer(peer.getId()).observableResult() .map(result -> new Optional<>(result.firstOrNull())) .map(directChatMapping::toEntity); } @Override public Maybe getDirectChatByPeer(Peer peer) { return dao.getByPeer(peer.getId()).maybe() .map(directChatMapping::toEntity); } @Override public Observable> observeAllDirectChats() { return dao.getAll().observableResult() .map(ResultDelegate::toList) .map(this::chatModelsToEntities); } private List chatModelsToEntities(List models) { List entities = new ArrayList<>(models.size()); for (DirectChatModel model : models) { entities.add(directChatMapping.toEntity(model)); } return entities; } @Override public Single updateDirectChat(DirectChat chat) { return dao.get(chat.getId()).maybe().toSingle() .map(model -> directChatMapping.toModel(chat, model)) .flatMap(data()::update) .map(model -> directChatMapping.toEntity(model, chat)); } @Override public Single upsertDirectChat(DirectChat chat) { return dao.get(chat.getId()).maybe() .switchIfEmpty(dao.insert(directChatMapping.toModel(chat))) .map(directChatModel -> directChatMapping.toModel(chat, directChatModel)) .flatMap(data()::update) .map(model -> directChatMapping.toEntity(model, chat)); } @Override public Completable deleteDirectChat(UUID chatId) { return dao.delete(chatId) .ignoreElement(); } @Override public Observable> findChatsByQuery(String query) { return data().select(DirectChatModel.class) .join(PeerModel.class) .on(DirectChatModel.PEER_ID.eq(PeerModel.ID)) .where(PeerModel.NAME.like("%" + query + "%") .or(PeerModel.ADDRESS.like("%" + query + "%"))) .get().observableResult() .map(ResultDelegate::toList) .map(this::chatModelsToEntities); } }