Mercury-IM/domain/src/main/java/org/mercury_im/messenger/xmpp/MercuryConnection.java

97 lines
2.8 KiB
Java
Raw Normal View History

package org.mercury_im.messenger.xmpp;
import org.jivesoftware.smack.ConnectionListener;
import org.jivesoftware.smack.XMPPConnection;
import org.mercury_im.messenger.data.repository.AccountRepository;
import org.mercury_im.messenger.entity.Account;
import org.mercury_im.messenger.util.Optional;
import io.reactivex.Observable;
import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.subjects.BehaviorSubject;
public class MercuryConnection {
2019-12-21 01:45:30 +01:00
private static final XmppConnectionFactory connectionFactory = new XmppConnectionFactory();
private final CompositeDisposable disposable = new CompositeDisposable();
private final AccountRepository accountRepository;
private Account account;
private XMPPConnection connection;
private BehaviorSubject<Boolean> enabled = BehaviorSubject.create();
private BehaviorSubject<ConnectionState> state = BehaviorSubject.createDefault(ConnectionState.closed);
public MercuryConnection(AccountRepository accountRepository, Account account) {
this.accountRepository = accountRepository;
this.account = account;
2019-12-21 01:45:30 +01:00
this.connection = connectionFactory.createConnection(account);
observeAccountAndConnection();
}
public Observable<ConnectionState> getState() {
return state;
}
private void observeAccountAndConnection() {
observeAccount();
observeConnection();
}
private void observeAccount() {
disposable.add(accountRepository.observeAccount(account.getId())
.filter(Optional::isPresent)
.map(Optional::getItem)
.subscribe(this::setAccount));
}
private void observeConnection() {
connection.addConnectionListener(new ConnectionListener() {
@Override
public void connected(XMPPConnection connection) {
state.onNext(ConnectionState.connected);
}
@Override
public void authenticated(XMPPConnection connection, boolean resumed) {
state.onNext(ConnectionState.authenticated);
}
@Override
public void connectionClosed() {
state.onNext(ConnectionState.closed);
}
@Override
public void connectionClosedOnError(Exception e) {
state.onNext(ConnectionState.closedOnError);
}
});
}
private void setAccount(Account account) {
this.account = account;
enabled.onNext(account.isEnabled());
}
public Account getAccount() {
return account;
}
public XMPPConnection getConnection() {
return connection;
}
public void dispose() {
disposable.dispose();
}
public enum ConnectionState {
connected,
authenticated,
closedOnError,
closed
}
}