This commit is contained in:
Paul Schaub 2020-05-27 22:34:27 +02:00
parent 71cabbb54a
commit ac472a3b6f
Signed by: vanitasvitae
GPG Key ID: 62BEE9264BF17311
12 changed files with 77 additions and 136 deletions

View File

@ -12,13 +12,10 @@ import org.mercury_im.messenger.Messenger;
import org.mercury_im.messenger.data.repository.AccountRepository;
import org.mercury_im.messenger.entity.Account;
import org.mercury_im.messenger.xmpp.MercuryConnection;
import org.mercury_im.messenger.xmpp.exception.InvalidCredentialsException;
import org.mercury_im.messenger.xmpp.exception.ServerUnreachableException;
import org.mercury_im.messenger.xmpp.state.ConnectionPoolState;
import javax.inject.Inject;
import io.reactivex.Scheduler;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.schedulers.Schedulers;
@ -71,7 +68,11 @@ public class AccountsViewModel extends AndroidViewModel {
.subscribe(() -> Log.d("Mercury-IM", "Login successful for " + accountModel.getAddress()),
e -> Log.e("Mercury-IM", "Connecting failed.", e)));
} else {
connection.shutdown();
compositeDisposable.add(connection.shutdown()
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(() -> Log.d("Mercury-IM", "Connection " + accountModel.getAddress() + " shutdown."),
e -> Log.e("Mercury-IM", "Shutdown of connection " + accountModel.getAddress() + " failed", e)));
}
}
}

View File

@ -76,9 +76,7 @@ public class XmppEntityCapsRepository extends RequeryRepository implements Entit
return data().select(EntityCapsModel.class)
.where(EntityCapsModel.NODE_VER.eq(nodeVer))
.get().maybe()
.map(entityCapsMapping::toEntity)
.subscribeOn(subscriberScheduler())
.observeOn(observerScheduler());
.map(entityCapsMapping::toEntity);
}
@Override

View File

@ -1,24 +0,0 @@
package org.mercury_im.messenger;
import org.mercury_im.messenger.util.ThreadUtils;
import javax.inject.Inject;
import javax.inject.Named;
import io.reactivex.Scheduler;
import lombok.Getter;
public class MercurySchedulers {
@Getter
private final Scheduler subscriberScheduler;
@Getter
private final Scheduler observerScheduler;
@Inject
public MercurySchedulers(@Named(value = ThreadUtils.SCHEDULER_IO) Scheduler subscriberScheduler,
@Named(value = ThreadUtils.SCHEDULER_UI) Scheduler observerScheduler) {
this.subscriberScheduler = subscriberScheduler;
this.observerScheduler = observerScheduler;
}
}

View File

@ -0,0 +1,28 @@
package org.mercury_im.messenger;
import org.mercury_im.messenger.util.ThreadUtils;
import javax.inject.Inject;
import javax.inject.Named;
import io.reactivex.Scheduler;
import lombok.Getter;
public class SchedulersFacade {
@Getter
private final Scheduler io;
@Getter
private final Scheduler ui;
@Getter
private final Scheduler newThread;
@Inject
public SchedulersFacade(@Named(value = ThreadUtils.SCHEDULER_IO) Scheduler io,
@Named(value = ThreadUtils.SCHEDULER_UI) Scheduler ui,
@Named(value = ThreadUtils.SCHEDULER_NEW_THREAD) Scheduler newThread) {
this.io = io;
this.ui = ui;
this.newThread = newThread;
}
}

View File

@ -4,6 +4,7 @@ import org.jivesoftware.smack.util.PacketParserUtils;
import org.jivesoftware.smack.xml.XmlPullParser;
import org.jivesoftware.smackx.caps.cache.EntityCapsPersistentCache;
import org.jivesoftware.smackx.disco.packet.DiscoverInfo;
import org.mercury_im.messenger.SchedulersFacade;
import org.mercury_im.messenger.data.repository.EntityCapsRepository;
import org.mercury_im.messenger.entity.caps.EntityCapsRecord;
import org.mercury_im.messenger.logging.Tags;
@ -16,6 +17,7 @@ import javax.inject.Inject;
import javax.inject.Singleton;
import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.schedulers.Schedulers;
@Singleton
public class MercuryEntityCapsStore implements EntityCapsPersistentCache {
@ -43,10 +45,19 @@ public class MercuryEntityCapsStore implements EntityCapsPersistentCache {
@Override
public DiscoverInfo lookup(String nodeVer) {
LOGGER.log(Level.INFO, "MercuryEntityCapsStore: lookup: " + nodeVer);
return repository.maybeGetEntityCapsRecord(nodeVer)
.map(this::parseDiscoverInfo)
.onErrorComplete()
EntityCapsRecord defaultIfEmpty = new EntityCapsRecord();
EntityCapsRecord record = repository.maybeGetEntityCapsRecord(nodeVer)
.defaultIfEmpty(defaultIfEmpty)
.blockingGet();
if (record == defaultIfEmpty) {
return null;
}
try {
return parseDiscoverInfo(record);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
private DiscoverInfo parseDiscoverInfo(EntityCapsRecord record) throws Exception {

View File

@ -1,74 +0,0 @@
package org.mercury_im.messenger.usecase;
import org.jivesoftware.smack.AbstractXMPPConnection;
import org.jivesoftware.smack.SmackException;
import org.jivesoftware.smack.XMPPException;
import org.jivesoftware.smack.sasl.SASLErrorException;
import org.mercury_im.messenger.xmpp.MercuryConnection;
import java.io.IOException;
import java.util.logging.Level;
import java.util.logging.Logger;
import io.reactivex.Completable;
import io.reactivex.Single;
public class LogIntoAccount {
public enum ConnectionResult {
success,
credential_error,
server_error,
other_error
}
private static final Logger LOGGER = Logger.getLogger(LogIntoAccount.class.getName());
private final MercuryConnection connection;
private LogIntoAccount(MercuryConnection connection) {
this.connection = connection;
}
public static LogIntoAccount with(MercuryConnection connection) {
if (connection == null) {
throw new NullPointerException("MercuryConnection cannot be null.");
}
return new LogIntoAccount(connection);
}
public Completable executeAndPossiblyThrow() {
return Completable.fromAction(this::doAuthenticateIfNecessary);
}
public Single<ConnectionResult> execute() {
return Single.fromCallable(this::authenticateIfNecessary);
}
private ConnectionResult authenticateIfNecessary() {
try {
doAuthenticateIfNecessary();
return ConnectionResult.success;
} catch (SASLErrorException e) {
LOGGER.log(Level.WARNING, "SASL Error while connecting to account " + connection.getAccount().getId(), e);
return ConnectionResult.credential_error;
} catch (SmackException.ConnectionException e) {
LOGGER.log(Level.WARNING, "Connectivity error while connecting to account " + connection.getAccount().getId(), e);
return ConnectionResult.server_error;
}
catch (IOException | XMPPException | SmackException | InterruptedException e) {
LOGGER.log(Level.WARNING, "Error connecting to account " + connection.getAccount().getId(), e);
return ConnectionResult.other_error;
}
}
private void doAuthenticateIfNecessary()
throws InterruptedException, XMPPException, SmackException, IOException {
if (!connection.getConnection().isAuthenticated()) {
LOGGER.log(Level.INFO, "Logging in");
((AbstractXMPPConnection) connection.getConnection()).connect().login();
LOGGER.log(Level.INFO, "Login complete");
}
}
}

View File

@ -13,4 +13,6 @@ public class ThreadUtils {
* Name for the UI / main thread.
*/
public static final String SCHEDULER_UI = "UIThread";
public static final String SCHEDULER_NEW_THREAD = "NewThread";
}

View File

@ -118,7 +118,11 @@ public class MercuryConnection {
}
};
public void shutdown() {
public Completable shutdown() {
return Completable.fromAction(this::doShutdown);
}
public void doShutdown() {
if (connection.isConnected()) {
((AbstractXMPPConnection) getConnection()).disconnect();
} else {

View File

@ -3,12 +3,10 @@ package org.mercury_im.messenger.xmpp;
import org.jivesoftware.smack.AbstractXMPPConnection;
import org.jivesoftware.smack.ReconnectionListener;
import org.jivesoftware.smack.ReconnectionManager;
import org.jivesoftware.smackx.caps.EntityCapsManager;
import org.mercury_im.messenger.data.repository.AccountRepository;
import org.mercury_im.messenger.data.repository.Repositories;
import org.mercury_im.messenger.entity.Account;
import org.mercury_im.messenger.store.MercuryEntityCapsStore;
import org.mercury_im.messenger.usecase.LogIntoAccount;
import org.mercury_im.messenger.usecase.RosterStoreBinder;
import org.mercury_im.messenger.util.Optional;
import org.mercury_im.messenger.xmpp.state.ConnectionPoolState;
@ -60,7 +58,7 @@ public class MercuryConnectionManager {
this.rosterStoreBinder = rosterStoreBinder;
this.entityCapsStore = entityCapsStore;
EntityCapsManager.setPersistentCache(entityCapsStore);
//EntityCapsManager.setPersistentCache(entityCapsStore);
start();
}
@ -124,7 +122,7 @@ public class MercuryConnectionManager {
}
public void bindConnection(MercuryConnection connection) {
rosterStoreBinder.setRosterStoreOn(connection);
//rosterStoreBinder.setRosterStoreOn(connection);
ReconnectionManager.getInstanceFor((AbstractXMPPConnection) connection.getConnection())
.addReconnectionListener(new ReconnectionListener() {
@Override
@ -157,7 +155,7 @@ public class MercuryConnectionManager {
private void handleAccountDisabled(MercuryConnection connection) {
LOGGER.log(Level.FINER, "HandleAccountDisabled: " + connection.getAccountId());
connectionDisconnect(connection);
disposable.add(connection.shutdown().subscribeOn(Schedulers.newThread()).subscribe());
}
private void handleAccountEnabled(MercuryConnection connection) {
@ -166,25 +164,22 @@ public class MercuryConnectionManager {
}
private void connectionLogin(MercuryConnection connection) {
disposable.add(LogIntoAccount.with(connection).executeAndPossiblyThrow()
disposable.add(connection.connect().andThen(connection.login())
.subscribeOn(Schedulers.newThread())
.subscribe(() -> LOGGER.log(Level.FINER, "Logged in."),
error -> LOGGER.log(Level.SEVERE, "Connection error!", error)));
}
private void handleAccountRemoved(MercuryConnection connection) {
LOGGER.log(Level.FINER, "HandleAccountRemove: " + connection.getAccountId());
disconnectAndRemoveConnection(connection);
}
private void disconnectAndRemoveConnection(MercuryConnection connection) {
connectionDisconnect(connection);
disposable.add(connection.shutdown().subscribeOn(Schedulers.newThread()).subscribe());
removeConnection(connection);
}
private void connectionDisconnect(MercuryConnection connection) {
((AbstractXMPPConnection) connection.getConnection()).disconnect();
}
private void removeConnection(MercuryConnection connection) {
LOGGER.log(Level.FINER, "Remove Connection: " + connection.getAccountId());
connectionsMap.remove(connection.getAccountId());

View File

@ -7,29 +7,29 @@ import org.jxmpp.jid.impl.JidCreate;
import org.jxmpp.stringprep.XmppStringprepException;
import org.mercury_im.messenger.entity.Account;
import java.security.NoSuchAlgorithmException;
import javax.net.ssl.SSLContext;
public class XmppConnectionFactory {
private static final int DEFAULT_PORT = 5222;
private static final int CONNECTION_TIMEOUT = 30 * 1000;
public AbstractXMPPConnection createConnection(Account account) {
try {
return new XMPPTCPConnection(XMPPTCPConnectionConfiguration.builder()
XMPPTCPConnectionConfiguration.Builder configBuilder =
XMPPTCPConnectionConfiguration.builder()
.setConnectTimeout(CONNECTION_TIMEOUT)
.setXmppAddressAndPassword(account.getAddress(), account.getPassword())
.setHost(determineHost(account))
.setPort(determinePort(account))
.build());
.setXmppAddressAndPassword(account.getAddress(), account.getPassword());
if (account.getHost() != null) {
configBuilder.setHost(account.getHost());
}
if (account.getPort() != 0) {
configBuilder.setPort(account.getPort());
}
return new XMPPTCPConnection(configBuilder.build());
} catch (XmppStringprepException e) {
throw new AssertionError("Account has invalid address: " + account.getAddress(), e);
}
}
private String determineHost(Account account) throws XmppStringprepException {
return account.getHost() != null ? account.getHost() : JidCreate.domainBareFrom(account.getAddress()).toString();
}
private int determinePort(Account account) {
return account.getPort() != 0 ? account.getPort() : DEFAULT_PORT;
}
}

View File

@ -79,7 +79,7 @@ public class XmppDirectMessageCenter
.to(peerAddress)
.ofType(org.jivesoftware.smack.packet.Message.Type.chat);
String originId = OriginIdElement.addOriginId(messageBuilder).getId();
String originId = OriginIdElement.addTo(messageBuilder).getId();
String legacyStanzaId = messageBuilder.getStanzaId();
MessageMetadata metadata = new MessageMetadata();

@ -1 +1 @@
Subproject commit 5bfe789e08ebb251b3c4302cb653c715eee363ea
Subproject commit ccc785062e9b04513302c898e3015c2ff2d7fb91