From d65f2c932e38ea63f8b2600feb2a9fdd22b686fa Mon Sep 17 00:00:00 2001 From: Florian Schmaus Date: Sun, 24 May 2020 21:07:56 +0200 Subject: [PATCH] Bump Error Prone version to 2.3.4 and fix new bug patterns --- build.gradle | 2 +- .../smack/AbstractXMPPConnection.java | 13 +- .../org/jivesoftware/smack/util/TLSUtils.java | 13 +- .../jivesoftware/smack/util/DnsUtilTest.java | 1 + .../FallbackIndicationManager.java | 26 +- .../httpfileupload/HttpFileUploadManager.java | 4 +- .../sid/StableUniqueStanzaIdManager.java | 16 +- .../smackx/sid/element/OriginIdElement.java | 4 +- .../smackx/sid/StableUniqueStanzaIdTest.java | 2 +- .../smackx/caps/EntityCapsManager.java | 9 +- .../smackx/commands/AdHocCommandManager.java | 4 +- .../jivesoftware/smackx/ping/PingManager.java | 11 +- .../receipts/DeliveryReceiptManager.java | 10 +- .../rosterstore/DirectoryRosterStore.java | 20 +- .../chatstate/ChatStateIntegrationTest.java | 31 +- .../smackx/omemo/OmemoManager.java | 194 +++--- .../smackx/ox/OpenPgpManager.java | 148 +++-- .../ox_im/OXInstantMessagingManager.java | 14 +- .../smack/tcp/XmppTcpTransportModule.java | 562 +++++++++--------- 19 files changed, 499 insertions(+), 585 deletions(-) diff --git a/build.gradle b/build.gradle index b9de6f288..96dcab393 100644 --- a/build.gradle +++ b/build.gradle @@ -283,7 +283,7 @@ tasks.withType(Javadoc) { testFixturesApi "org.mockito:mockito-core:3.3.3" testImplementation 'com.jamesmurty.utils:java-xmlbuilder:1.2' - errorprone 'com.google.errorprone:error_prone_core:2.3.3' + errorprone 'com.google.errorprone:error_prone_core:2.3.4' errorproneJavac('com.google.errorprone:javac:9+181-r4173-1') } diff --git a/smack-core/src/main/java/org/jivesoftware/smack/AbstractXMPPConnection.java b/smack-core/src/main/java/org/jivesoftware/smack/AbstractXMPPConnection.java index f93c48408..eed428deb 100644 --- a/smack-core/src/main/java/org/jivesoftware/smack/AbstractXMPPConnection.java +++ b/smack-core/src/main/java/org/jivesoftware/smack/AbstractXMPPConnection.java @@ -373,17 +373,6 @@ public abstract class AbstractXMPPConnection implements XMPPConnection { protected final AsyncButOrdered inOrderListeners = new AsyncButOrdered<>(); - /** - * An executor which uses {@link #asyncGoLimited(Runnable)} to limit the number of asynchronously processed runnables - * per connection. - */ - private final Executor limitedExcutor = new Executor() { - @Override - public void execute(Runnable runnable) { - asyncGoLimited(runnable); - } - }; - /** * The used host to establish the connection to */ @@ -1524,7 +1513,7 @@ public abstract class AbstractXMPPConnection implements XMPPConnection { executorService = ASYNC_BUT_ORDERED.asExecutorFor(this); break; case async: - executorService = limitedExcutor; + executorService = this::asyncGoLimited; break; } final IQRequestHandler finalIqRequestHandler = iqRequestHandler; diff --git a/smack-core/src/main/java/org/jivesoftware/smack/util/TLSUtils.java b/smack-core/src/main/java/org/jivesoftware/smack/util/TLSUtils.java index 60c644320..69b817566 100644 --- a/smack-core/src/main/java/org/jivesoftware/smack/util/TLSUtils.java +++ b/smack-core/src/main/java/org/jivesoftware/smack/util/TLSUtils.java @@ -38,7 +38,6 @@ import java.util.Set; import java.util.logging.Level; import java.util.logging.Logger; -import javax.net.ssl.HostnameVerifier; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLPeerUnverifiedException; import javax.net.ssl.SSLSession; @@ -143,14 +142,6 @@ public class TLSUtils { return builder; } - private static final HostnameVerifier DOES_NOT_VERIFY_VERIFIER = new HostnameVerifier() { - @Override - public boolean verify(String hostname, SSLSession session) { - // This verifier doesn't verify the hostname, it always returns true. - return true; - } - }; - /** * Disable the hostname verification of TLS certificates. *

@@ -164,7 +155,9 @@ public class TLSUtils { * @return the given builder. */ public static > B disableHostnameVerificationForTlsCertificates(B builder) { - builder.setHostnameVerifier(DOES_NOT_VERIFY_VERIFIER); + builder.setHostnameVerifier((hostname, session) -> { + return true; + }); return builder; } diff --git a/smack-core/src/test/java/org/jivesoftware/smack/util/DnsUtilTest.java b/smack-core/src/test/java/org/jivesoftware/smack/util/DnsUtilTest.java index eef18918f..6a7fe8e54 100644 --- a/smack-core/src/test/java/org/jivesoftware/smack/util/DnsUtilTest.java +++ b/smack-core/src/test/java/org/jivesoftware/smack/util/DnsUtilTest.java @@ -25,6 +25,7 @@ import org.junit.Test; public class DnsUtilTest { + @SuppressWarnings("UnnecessaryAnonymousClass") private static final SmackDaneProvider DNS_UTIL_TEST_DANE_PROVIDER = new SmackDaneProvider() { @Override public SmackDaneVerifier newInstance() { diff --git a/smack-experimental/src/main/java/org/jivesoftware/smackx/fallback_indication/FallbackIndicationManager.java b/smack-experimental/src/main/java/org/jivesoftware/smackx/fallback_indication/FallbackIndicationManager.java index 25829bd63..e0859dd26 100644 --- a/smack-experimental/src/main/java/org/jivesoftware/smackx/fallback_indication/FallbackIndicationManager.java +++ b/smack-experimental/src/main/java/org/jivesoftware/smackx/fallback_indication/FallbackIndicationManager.java @@ -25,7 +25,6 @@ import org.jivesoftware.smack.AsyncButOrdered; import org.jivesoftware.smack.ConnectionCreationListener; import org.jivesoftware.smack.Manager; import org.jivesoftware.smack.SmackException; -import org.jivesoftware.smack.StanzaListener; import org.jivesoftware.smack.XMPPConnection; import org.jivesoftware.smack.XMPPConnectionRegistry; import org.jivesoftware.smack.XMPPException; @@ -68,23 +67,20 @@ public final class FallbackIndicationManager extends Manager { private final StanzaFilter fallbackIndicationElementFilter = new AndFilter(StanzaTypeFilter.MESSAGE, new StanzaExtensionFilter(FallbackIndicationElement.ELEMENT, FallbackIndicationElement.NAMESPACE)); - private final StanzaListener fallbackIndicationElementListener = new StanzaListener() { - @Override - public void processStanza(Stanza packet) { - Message message = (Message) packet; - FallbackIndicationElement indicator = FallbackIndicationElement.fromMessage(message); - String body = message.getBody(); - asyncButOrdered.performAsyncButOrdered(message.getFrom().asBareJid(), () -> { - for (FallbackIndicationListener l : listeners) { - l.onFallbackIndicationReceived(message, indicator, body); - } - }); - } - }; + private void fallbackIndicationElementListener(Stanza packet) { + Message message = (Message) packet; + FallbackIndicationElement indicator = FallbackIndicationElement.fromMessage(message); + String body = message.getBody(); + asyncButOrdered.performAsyncButOrdered(message.getFrom().asBareJid(), () -> { + for (FallbackIndicationListener l : listeners) { + l.onFallbackIndicationReceived(message, indicator, body); + } + }); + } private FallbackIndicationManager(XMPPConnection connection) { super(connection); - connection.addAsyncStanzaListener(fallbackIndicationElementListener, fallbackIndicationElementFilter); + connection.addAsyncStanzaListener(this::fallbackIndicationElementListener, fallbackIndicationElementFilter); ServiceDiscoveryManager.getInstanceFor(connection).addFeature(FallbackIndicationElement.NAMESPACE); } diff --git a/smack-experimental/src/main/java/org/jivesoftware/smackx/httpfileupload/HttpFileUploadManager.java b/smack-experimental/src/main/java/org/jivesoftware/smackx/httpfileupload/HttpFileUploadManager.java index 9a790dbc8..8a12f164c 100644 --- a/smack-experimental/src/main/java/org/jivesoftware/smackx/httpfileupload/HttpFileUploadManager.java +++ b/smack-experimental/src/main/java/org/jivesoftware/smackx/httpfileupload/HttpFileUploadManager.java @@ -307,7 +307,9 @@ public final class HttpFileUploadManager extends Manager { public URL uploadFile(InputStream inputStream, String fileName, long fileSize, UploadProgressListener listener) throws XMPPErrorException, InterruptedException, SmackException, IOException { Objects.requireNonNull(inputStream, "Input Stream cannot be null"); Objects.requireNonNull(fileName, "Filename Stream cannot be null"); - Objects.requireNonNull(fileSize, "Filesize Stream cannot be null"); + if (fileSize < 0) { + throw new IllegalArgumentException("File size cannot be negative"); + } final Slot slot = requestSlot(fileName, fileSize, "application/octet-stream"); upload(inputStream, fileSize, slot, listener); return slot.getGetUrl(); diff --git a/smack-experimental/src/main/java/org/jivesoftware/smackx/sid/StableUniqueStanzaIdManager.java b/smack-experimental/src/main/java/org/jivesoftware/smackx/sid/StableUniqueStanzaIdManager.java index d8911aa3e..35242eba9 100644 --- a/smack-experimental/src/main/java/org/jivesoftware/smackx/sid/StableUniqueStanzaIdManager.java +++ b/smack-experimental/src/main/java/org/jivesoftware/smackx/sid/StableUniqueStanzaIdManager.java @@ -1,6 +1,6 @@ /** * - * Copyright 2018 Paul Schaub + * Copyright 2018 Paul Schaub, 2020 Florian Schmaus * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -29,10 +29,6 @@ import org.jivesoftware.smack.filter.NotFilter; import org.jivesoftware.smack.filter.StanzaExtensionFilter; import org.jivesoftware.smack.filter.StanzaFilter; import org.jivesoftware.smack.filter.ToTypeFilter; -import org.jivesoftware.smack.packet.Message; -import org.jivesoftware.smack.packet.MessageBuilder; -import org.jivesoftware.smack.util.Consumer; -import org.jivesoftware.smack.util.Predicate; import org.jivesoftware.smackx.disco.ServiceDiscoveryManager; import org.jivesoftware.smackx.sid.element.OriginIdElement; @@ -62,14 +58,8 @@ public final class StableUniqueStanzaIdManager extends Manager { // Filter that filters for messages with an origin id private static final StanzaFilter ORIGIN_ID_FILTER = new StanzaExtensionFilter(OriginIdElement.ELEMENT, NAMESPACE); - // Listener for outgoing stanzas that adds origin-ids to outgoing stanzas. - private static final Consumer ADD_ORIGIN_ID_INTERCEPTOR = mb -> OriginIdElement.addOriginId(mb); - // We need a filter for outgoing messages that do not carry an origin-id already. private static final StanzaFilter ADD_ORIGIN_ID_FILTER = new AndFilter(OUTGOING_FILTER, new NotFilter(ORIGIN_ID_FILTER)); - private static final Predicate ADD_ORIGIN_ID_PREDICATE = m -> { - return ADD_ORIGIN_ID_FILTER.accept(m); - }; static { XMPPConnectionRegistry.addConnectionCreationListener(new ConnectionCreationListener() { @@ -113,7 +103,7 @@ public final class StableUniqueStanzaIdManager extends Manager { * Start appending origin-id elements to outgoing stanzas and add the feature to disco. */ public synchronized void enable() { - connection().addMessageInterceptor(ADD_ORIGIN_ID_INTERCEPTOR, ADD_ORIGIN_ID_PREDICATE); + connection().addMessageInterceptor(OriginIdElement::addTo, ADD_ORIGIN_ID_FILTER::accept); ServiceDiscoveryManager.getInstanceFor(connection()).addFeature(NAMESPACE); } @@ -122,7 +112,7 @@ public final class StableUniqueStanzaIdManager extends Manager { */ public synchronized void disable() { ServiceDiscoveryManager.getInstanceFor(connection()).removeFeature(NAMESPACE); - connection().removeMessageInterceptor(ADD_ORIGIN_ID_INTERCEPTOR); + connection().removeMessageInterceptor(OriginIdElement::addTo); } /** diff --git a/smack-experimental/src/main/java/org/jivesoftware/smackx/sid/element/OriginIdElement.java b/smack-experimental/src/main/java/org/jivesoftware/smackx/sid/element/OriginIdElement.java index b157b40a0..1a448b516 100644 --- a/smack-experimental/src/main/java/org/jivesoftware/smackx/sid/element/OriginIdElement.java +++ b/smack-experimental/src/main/java/org/jivesoftware/smackx/sid/element/OriginIdElement.java @@ -39,7 +39,7 @@ public class OriginIdElement extends StableAndUniqueIdElement { * * @param message message. * @return the added origin-id element. - * @deprecated use {@link #addOriginId(MessageBuilder)} instead. + * @deprecated use {@link #addTo(MessageBuilder)} instead. */ @Deprecated // TODO: Remove in Smack 4.5. @@ -57,7 +57,7 @@ public class OriginIdElement extends StableAndUniqueIdElement { * @param messageBuilder the message builder to add an origin ID to. * @return the added origin-id element. */ - public static OriginIdElement addOriginId(MessageBuilder messageBuilder) { + public static OriginIdElement addTo(MessageBuilder messageBuilder) { OriginIdElement originId = new OriginIdElement(); messageBuilder.addExtension(originId); // TODO: Find solution to have both the originIds stanzaId and a nice to look at incremental stanzaID. diff --git a/smack-experimental/src/test/java/org/jivesoftware/smackx/sid/StableUniqueStanzaIdTest.java b/smack-experimental/src/test/java/org/jivesoftware/smackx/sid/StableUniqueStanzaIdTest.java index c9c4367a8..a95e5caac 100644 --- a/smack-experimental/src/test/java/org/jivesoftware/smackx/sid/StableUniqueStanzaIdTest.java +++ b/smack-experimental/src/test/java/org/jivesoftware/smackx/sid/StableUniqueStanzaIdTest.java @@ -78,7 +78,7 @@ public class StableUniqueStanzaIdTest extends SmackTestSuite { assertFalse(OriginIdElement.hasOriginId(message)); assertFalse(StanzaIdElement.hasStanzaId(message)); - OriginIdElement.addOriginId(messageBuilder); + OriginIdElement.addTo(messageBuilder); message = messageBuilder.build(); assertTrue(OriginIdElement.hasOriginId(message)); diff --git a/smack-extensions/src/main/java/org/jivesoftware/smackx/caps/EntityCapsManager.java b/smack-extensions/src/main/java/org/jivesoftware/smackx/caps/EntityCapsManager.java index a83c24875..c63471bcb 100644 --- a/smack-extensions/src/main/java/org/jivesoftware/smackx/caps/EntityCapsManager.java +++ b/smack-extensions/src/main/java/org/jivesoftware/smackx/caps/EntityCapsManager.java @@ -54,7 +54,6 @@ import org.jivesoftware.smack.packet.PresenceBuilder; import org.jivesoftware.smack.packet.Stanza; import org.jivesoftware.smack.roster.AbstractPresenceEventListener; import org.jivesoftware.smack.roster.Roster; -import org.jivesoftware.smack.util.Consumer; import org.jivesoftware.smack.util.StringUtils; import org.jivesoftware.smack.util.stringencoder.Base64; @@ -312,11 +311,11 @@ public final class EntityCapsManager extends Manager { // Intercept presence packages and add caps data when intended. // XEP-0115 specifies that a client SHOULD include entity capabilities // with every presence notification it sends. - private final Consumer presenceInterceptor = presenceBuilder -> { + private void addCapsExtension(PresenceBuilder presenceBuilder) { CapsVersionAndHash capsVersionAndHash = getCapsVersionAndHash(); CapsExtension caps = new CapsExtension(entityNode, capsVersionAndHash.version, capsVersionAndHash.hash); presenceBuilder.overrideExtension(caps); - }; + } private EntityCapsManager(XMPPConnection connection) { super(connection); @@ -402,7 +401,7 @@ public final class EntityCapsManager extends Manager { } public synchronized void enableEntityCaps() { - connection().addPresenceInterceptor(presenceInterceptor, p -> { + connection().addPresenceInterceptor(this::addCapsExtension, p -> { return PresenceTypeFilter.AVAILABLE.accept(p); }); @@ -415,7 +414,7 @@ public final class EntityCapsManager extends Manager { entityCapsEnabled = false; sdm.removeFeature(NAMESPACE); - connection().removePresenceInterceptor(presenceInterceptor); + connection().removePresenceInterceptor(this::addCapsExtension); } public boolean entityCapsEnabled() { diff --git a/smack-extensions/src/main/java/org/jivesoftware/smackx/commands/AdHocCommandManager.java b/smack-extensions/src/main/java/org/jivesoftware/smackx/commands/AdHocCommandManager.java index 0f4614181..c2627dd5b 100755 --- a/smack-extensions/src/main/java/org/jivesoftware/smackx/commands/AdHocCommandManager.java +++ b/smack-extensions/src/main/java/org/jivesoftware/smackx/commands/AdHocCommandManager.java @@ -517,7 +517,7 @@ public final class AdHocCommandManager extends Manager { private boolean sessionSweeperScheduled; - private final Runnable sessionSweeper = () -> { + private void sessionSweeper() { final long currentTime = System.currentTimeMillis(); synchronized (this) { for (Iterator> it = executingCommands.entrySet().iterator(); it.hasNext();) { @@ -553,7 +553,7 @@ public final class AdHocCommandManager extends Manager { } sessionSweeperScheduled = true; - schedule(sessionSweeper, 10, TimeUnit.SECONDS); + schedule(this::sessionSweeper, 10, TimeUnit.SECONDS); } /** diff --git a/smack-extensions/src/main/java/org/jivesoftware/smackx/ping/PingManager.java b/smack-extensions/src/main/java/org/jivesoftware/smackx/ping/PingManager.java index f30e58a9e..7f513802c 100644 --- a/smack-extensions/src/main/java/org/jivesoftware/smackx/ping/PingManager.java +++ b/smack-extensions/src/main/java/org/jivesoftware/smackx/ping/PingManager.java @@ -390,7 +390,7 @@ public final class PingManager extends Manager { int nextPingIn = pingInterval - delta; LOGGER.fine("Scheduling ServerPingTask in " + nextPingIn + " seconds (pingInterval=" + pingInterval + ", delta=" + delta + ")"); - nextAutomaticPing = schedule(pingServerRunnable, nextPingIn, TimeUnit.SECONDS); + nextAutomaticPing = schedule(this::pingServerIfNecessary, nextPingIn, TimeUnit.SECONDS); } } @@ -467,13 +467,4 @@ public final class PingManager extends Manager { } }); } - - private final Runnable pingServerRunnable = new Runnable() { - @Override - public void run() { - LOGGER.fine("ServerPingTask run()"); - pingServerIfNecessary(); - } - }; - } diff --git a/smack-extensions/src/main/java/org/jivesoftware/smackx/receipts/DeliveryReceiptManager.java b/smack-extensions/src/main/java/org/jivesoftware/smackx/receipts/DeliveryReceiptManager.java index b069709e1..972c5d81e 100644 --- a/smack-extensions/src/main/java/org/jivesoftware/smackx/receipts/DeliveryReceiptManager.java +++ b/smack-extensions/src/main/java/org/jivesoftware/smackx/receipts/DeliveryReceiptManager.java @@ -1,6 +1,6 @@ /** * - * Copyright 2013-2014 Georg Lukas, 2015-2019 Florian Schmaus + * Copyright 2013-2014 Georg Lukas, 2015-2020 Florian Schmaus * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -38,11 +38,9 @@ import org.jivesoftware.smack.filter.StanzaExtensionFilter; import org.jivesoftware.smack.filter.StanzaFilter; import org.jivesoftware.smack.filter.StanzaTypeFilter; import org.jivesoftware.smack.packet.Message; -import org.jivesoftware.smack.packet.MessageBuilder; import org.jivesoftware.smack.packet.Stanza; import org.jivesoftware.smack.packet.StanzaBuilder; import org.jivesoftware.smack.roster.Roster; -import org.jivesoftware.smack.util.Consumer; import org.jivesoftware.smack.util.StringUtils; import org.jivesoftware.smackx.disco.ServiceDiscoveryManager; @@ -273,8 +271,6 @@ public final class DeliveryReceiptManager extends Manager { ); // @formatter:on - private static final Consumer AUTO_ADD_DELIVERY_RECEIPT_REQUESTS_LISTENER = mb -> DeliveryReceiptRequest.addTo(mb); - /** * Enables automatic requests of delivery receipts for outgoing messages of * {@link org.jivesoftware.smack.packet.Message.Type#normal}, {@link org.jivesoftware.smack.packet.Message.Type#chat} or {@link org.jivesoftware.smack.packet.Message.Type#headline}, and @@ -284,7 +280,7 @@ public final class DeliveryReceiptManager extends Manager { * @see #dontAutoAddDeliveryReceiptRequests() */ public void autoAddDeliveryReceiptRequests() { - connection().addMessageInterceptor(AUTO_ADD_DELIVERY_RECEIPT_REQUESTS_LISTENER, m -> { + connection().addMessageInterceptor(DeliveryReceiptRequest::addTo, m -> { return MESSAGES_TO_REQUEST_RECEIPTS_FOR.accept(m); }); } @@ -296,7 +292,7 @@ public final class DeliveryReceiptManager extends Manager { * @see #autoAddDeliveryReceiptRequests() */ public void dontAutoAddDeliveryReceiptRequests() { - connection().removeMessageInterceptor(AUTO_ADD_DELIVERY_RECEIPT_REQUESTS_LISTENER); + connection().removeMessageInterceptor(DeliveryReceiptRequest::addTo); } /** diff --git a/smack-im/src/main/java/org/jivesoftware/smack/roster/rosterstore/DirectoryRosterStore.java b/smack-im/src/main/java/org/jivesoftware/smack/roster/rosterstore/DirectoryRosterStore.java index dec89bc68..a78a44c28 100644 --- a/smack-im/src/main/java/org/jivesoftware/smack/roster/rosterstore/DirectoryRosterStore.java +++ b/smack-im/src/main/java/org/jivesoftware/smack/roster/rosterstore/DirectoryRosterStore.java @@ -1,6 +1,6 @@ /** * - * Copyright 2013-2015 the original author or authors + * Copyright 2013-2015 the original author or authors, 2020 Florian Schmaus * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,7 +17,6 @@ package org.jivesoftware.smack.roster.rosterstore; import java.io.File; -import java.io.FileFilter; import java.io.FileNotFoundException; import java.io.FileReader; import java.io.IOException; @@ -56,15 +55,10 @@ public final class DirectoryRosterStore implements RosterStore { private static final String STORE_ID = "DEFAULT_ROSTER_STORE"; private static final Logger LOGGER = Logger.getLogger(DirectoryRosterStore.class.getName()); - private static final FileFilter rosterDirFilter = new FileFilter() { - - @Override - public boolean accept(File file) { - String name = file.getName(); - return name.startsWith(ENTRY_PREFIX); - } - - }; + private static boolean rosterDirFilter(File file) { + String name = file.getName(); + return name.startsWith(ENTRY_PREFIX); + } /** * @param baseDir TODO javadoc me please @@ -122,7 +116,7 @@ public final class DirectoryRosterStore implements RosterStore { public List getEntries() { List entries = new ArrayList<>(); - for (File file : fileDir.listFiles(rosterDirFilter)) { + for (File file : fileDir.listFiles(DirectoryRosterStore::rosterDirFilter)) { Item entry = readEntry(file); if (entry == null) { // Roster directory store corrupt. Abort and signal this by returning null. @@ -168,7 +162,7 @@ public final class DirectoryRosterStore implements RosterStore { @Override public boolean resetEntries(Collection items, String version) { - for (File file : fileDir.listFiles(rosterDirFilter)) { + for (File file : fileDir.listFiles(DirectoryRosterStore::rosterDirFilter)) { file.delete(); } for (Item item : items) { diff --git a/smack-integration-test/src/main/java/org/jivesoftware/smackx/chatstate/ChatStateIntegrationTest.java b/smack-integration-test/src/main/java/org/jivesoftware/smackx/chatstate/ChatStateIntegrationTest.java index 6b153b3a2..11e1340f4 100644 --- a/smack-integration-test/src/main/java/org/jivesoftware/smackx/chatstate/ChatStateIntegrationTest.java +++ b/smack-integration-test/src/main/java/org/jivesoftware/smackx/chatstate/ChatStateIntegrationTest.java @@ -21,7 +21,6 @@ import org.jivesoftware.smack.chat2.ChatManager; import org.jivesoftware.smack.packet.Message; import org.jivesoftware.smackx.chatstates.ChatState; -import org.jivesoftware.smackx.chatstates.ChatStateListener; import org.jivesoftware.smackx.chatstates.ChatStateManager; import org.igniterealtime.smack.inttest.AbstractSmackIntegrationTest; @@ -34,25 +33,19 @@ public class ChatStateIntegrationTest extends AbstractSmackIntegrationTest { // Listener for composing chat state private final SimpleResultSyncPoint composingSyncPoint = new SimpleResultSyncPoint(); - private final ChatStateListener composingListener = new ChatStateListener() { - @Override - public void stateChanged(Chat chat, ChatState state, Message message) { - if (state.equals(ChatState.composing)) { - composingSyncPoint.signal(); - } + private void composingListener(Chat chat, ChatState state, Message message) { + if (state.equals(ChatState.composing)) { + composingSyncPoint.signal(); } - }; + } // Listener for active chat state private final SimpleResultSyncPoint activeSyncPoint = new SimpleResultSyncPoint(); - private final ChatStateListener activeListener = new ChatStateListener() { - @Override - public void stateChanged(Chat chat, ChatState state, Message message) { - if (state.equals(ChatState.active)) { - activeSyncPoint.signal(); - } + private void activeListener(Chat chat, ChatState state, Message message) { + if (state.equals(ChatState.active)) { + activeSyncPoint.signal(); } - }; + } public ChatStateIntegrationTest(SmackIntegrationTestEnvironment environment) { @@ -65,8 +58,8 @@ public class ChatStateIntegrationTest extends AbstractSmackIntegrationTest { ChatStateManager manTwo = ChatStateManager.getInstance(conTwo); // Add chatState listeners. - manTwo.addChatStateListener(composingListener); - manTwo.addChatStateListener(activeListener); + manTwo.addChatStateListener(this::composingListener); + manTwo.addChatStateListener(this::activeListener); Chat chatOne = ChatManager.getInstanceFor(conOne) .chatWith(conTwo.getUser().asEntityBareJid()); @@ -86,7 +79,7 @@ public class ChatStateIntegrationTest extends AbstractSmackIntegrationTest { @AfterClass public void cleanup() { ChatStateManager manTwo = ChatStateManager.getInstance(conTwo); - manTwo.removeChatStateListener(composingListener); - manTwo.removeChatStateListener(activeListener); + manTwo.removeChatStateListener(this::composingListener); + manTwo.removeChatStateListener(this::activeListener); } } diff --git a/smack-omemo/src/main/java/org/jivesoftware/smackx/omemo/OmemoManager.java b/smack-omemo/src/main/java/org/jivesoftware/smackx/omemo/OmemoManager.java index 744bd6b89..88971cae8 100644 --- a/smack-omemo/src/main/java/org/jivesoftware/smackx/omemo/OmemoManager.java +++ b/smack-omemo/src/main/java/org/jivesoftware/smackx/omemo/OmemoManager.java @@ -1,6 +1,6 @@ /** * - * Copyright 2017 Paul Schaub + * Copyright 2017 Paul Schaub, 2020 Florian Schmaus * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -37,17 +37,14 @@ import java.util.logging.Logger; import org.jivesoftware.smack.ConnectionListener; import org.jivesoftware.smack.Manager; import org.jivesoftware.smack.SmackException; -import org.jivesoftware.smack.StanzaListener; import org.jivesoftware.smack.XMPPConnection; import org.jivesoftware.smack.XMPPException; -import org.jivesoftware.smack.filter.StanzaFilter; import org.jivesoftware.smack.packet.ExtensionElement; import org.jivesoftware.smack.packet.Message; import org.jivesoftware.smack.packet.MessageBuilder; import org.jivesoftware.smack.packet.Stanza; import org.jivesoftware.smack.util.Async; -import org.jivesoftware.smackx.carbons.CarbonCopyReceivedListener; import org.jivesoftware.smackx.carbons.CarbonManager; import org.jivesoftware.smackx.carbons.packet.CarbonExtension; import org.jivesoftware.smackx.disco.ServiceDiscoveryManager; @@ -74,7 +71,6 @@ import org.jivesoftware.smackx.omemo.trust.OmemoFingerprint; import org.jivesoftware.smackx.omemo.trust.OmemoTrustCallback; import org.jivesoftware.smackx.omemo.trust.TrustState; import org.jivesoftware.smackx.omemo.util.MessageOrOmemoMessage; -import org.jivesoftware.smackx.pep.PepListener; import org.jivesoftware.smackx.pep.PepManager; import org.jivesoftware.smackx.pubsub.EventElement; import org.jivesoftware.smackx.pubsub.ItemsExtension; @@ -897,23 +893,23 @@ public final class OmemoManager extends Manager { CarbonManager carbonManager = CarbonManager.getInstanceFor(connection()); // Remove listeners to avoid them getting added twice - connection().removeAsyncStanzaListener(internalOmemoMessageStanzaListener); - carbonManager.removeCarbonCopyReceivedListener(internalOmemoCarbonCopyListener); - pepManager.removePepListener(deviceListUpdateListener); + connection().removeAsyncStanzaListener(this::internalOmemoMessageStanzaListener); + carbonManager.removeCarbonCopyReceivedListener(this::internalOmemoCarbonCopyListener); + pepManager.removePepListener(this::deviceListUpdateListener); // Add listeners - pepManager.addPepListener(deviceListUpdateListener); - connection().addAsyncStanzaListener(internalOmemoMessageStanzaListener, omemoMessageStanzaFilter); - carbonManager.addCarbonCopyReceivedListener(internalOmemoCarbonCopyListener); + pepManager.addPepListener(this::deviceListUpdateListener); + connection().addAsyncStanzaListener(this::internalOmemoMessageStanzaListener, OmemoManager::isOmemoMessage); + carbonManager.addCarbonCopyReceivedListener(this::internalOmemoCarbonCopyListener); } /** * Remove active stanza listeners needed for OMEMO. */ public void stopStanzaAndPEPListeners() { - PepManager.getInstanceFor(connection()).removePepListener(deviceListUpdateListener); - connection().removeAsyncStanzaListener(internalOmemoMessageStanzaListener); - CarbonManager.getInstanceFor(connection()).removeCarbonCopyReceivedListener(internalOmemoCarbonCopyListener); + PepManager.getInstanceFor(connection()).removePepListener(this::deviceListUpdateListener); + connection().removeAsyncStanzaListener(this::internalOmemoMessageStanzaListener); + CarbonManager.getInstanceFor(connection()).removeCarbonCopyReceivedListener(this::internalOmemoCarbonCopyListener); } /** @@ -961,127 +957,113 @@ public final class OmemoManager extends Manager { /** * StanzaListener that listens for incoming Stanzas which contain OMEMO elements. */ - private final StanzaListener internalOmemoMessageStanzaListener = new StanzaListener() { + private void internalOmemoMessageStanzaListener(final Stanza packet) { + Async.go(new Runnable() { + @Override + public void run() { + try { + getOmemoService().onOmemoMessageStanzaReceived(packet, + new LoggedInOmemoManager(OmemoManager.this)); + } catch (SmackException.NotLoggedInException | IOException e) { + LOGGER.log(Level.SEVERE, "Exception while processing OMEMO stanza", e); + } + } + }); + } - @Override - public void processStanza(final Stanza packet) { - Async.go(new Runnable() { - @Override - public void run() { + /** + * CarbonCopyListener that listens for incoming carbon copies which contain OMEMO elements. + */ + private void internalOmemoCarbonCopyListener(final CarbonExtension.Direction direction, + final Message carbonCopy, + final Message wrappingMessage) { + Async.go(new Runnable() { + @Override + public void run() { + if (isOmemoMessage(carbonCopy)) { try { - getOmemoService().onOmemoMessageStanzaReceived(packet, + getOmemoService().onOmemoCarbonCopyReceived(direction, carbonCopy, wrappingMessage, new LoggedInOmemoManager(OmemoManager.this)); } catch (SmackException.NotLoggedInException | IOException e) { LOGGER.log(Level.SEVERE, "Exception while processing OMEMO stanza", e); } } - }); - } - }; - - /** - * CarbonCopyListener that listens for incoming carbon copies which contain OMEMO elements. - */ - private final CarbonCopyReceivedListener internalOmemoCarbonCopyListener = new CarbonCopyReceivedListener() { - @Override - public void onCarbonCopyReceived(final CarbonExtension.Direction direction, - final Message carbonCopy, - final Message wrappingMessage) { - Async.go(new Runnable() { - @Override - public void run() { - if (omemoMessageStanzaFilter.accept(carbonCopy)) { - try { - getOmemoService().onOmemoCarbonCopyReceived(direction, carbonCopy, wrappingMessage, - new LoggedInOmemoManager(OmemoManager.this)); - } catch (SmackException.NotLoggedInException | IOException e) { - LOGGER.log(Level.SEVERE, "Exception while processing OMEMO stanza", e); - } - } - } - }); - } - }; + } + }); + } /** * PEPListener that listens for OMEMO deviceList updates. */ - private final PepListener deviceListUpdateListener = new PepListener() { - @Override - public void eventReceived(EntityBareJid from, EventElement event, Message message) { + private void deviceListUpdateListener(EntityBareJid from, EventElement event, Message message) { + // Unknown sender, no more work to do. + if (from == null) { + // TODO: This DOES happen for some reason. Figure out when... + return; + } - // Unknown sender, no more work to do. - if (from == null) { - // TODO: This DOES happen for some reason. Figure out when... - return; + for (ExtensionElement items : event.getExtensions()) { + if (!(items instanceof ItemsExtension)) { + continue; } - for (ExtensionElement items : event.getExtensions()) { - if (!(items instanceof ItemsExtension)) { + for (ExtensionElement item : ((ItemsExtension) items).getExtensions()) { + if (!(item instanceof PayloadItem)) { continue; } - for (ExtensionElement item : ((ItemsExtension) items).getExtensions()) { - if (!(item instanceof PayloadItem)) { + PayloadItem payloadItem = (PayloadItem) item; + + if (!(payloadItem.getPayload() instanceof OmemoDeviceListElement)) { + continue; + } + + // Device List + OmemoCachedDeviceList deviceList; + OmemoDeviceListElement receivedDeviceList = (OmemoDeviceListElement) payloadItem.getPayload(); + try { + getOmemoService().getOmemoStoreBackend().mergeCachedDeviceList(getOwnDevice(), from, + receivedDeviceList); + + if (!from.asBareJid().equals(getOwnJid())) { continue; } - PayloadItem payloadItem = (PayloadItem) item; + deviceList = getOmemoService().cleanUpDeviceList(getOwnDevice()); + } catch (IOException e) { + LOGGER.log(Level.SEVERE, + "IOException while processing OMEMO PEP device updates. Message: " + message, + e); + continue; + } + final OmemoDeviceListElement_VAxolotl newDeviceList = new OmemoDeviceListElement_VAxolotl(deviceList); - if (!(payloadItem.getPayload() instanceof OmemoDeviceListElement)) { - continue; - } - - // Device List - OmemoCachedDeviceList deviceList; - OmemoDeviceListElement receivedDeviceList = (OmemoDeviceListElement) payloadItem.getPayload(); - try { - getOmemoService().getOmemoStoreBackend().mergeCachedDeviceList(getOwnDevice(), from, - receivedDeviceList); - - if (!from.asBareJid().equals(getOwnJid())) { - continue; - } - - deviceList = getOmemoService().cleanUpDeviceList(getOwnDevice()); - } catch (IOException e) { - LOGGER.log(Level.SEVERE, - "IOException while processing OMEMO PEP device updates. Message: " + message, - e); - continue; - } - final OmemoDeviceListElement_VAxolotl newDeviceList = new OmemoDeviceListElement_VAxolotl(deviceList); - - if (!newDeviceList.copyDeviceIds().equals(receivedDeviceList.copyDeviceIds())) { - LOGGER.log(Level.FINE, "Republish deviceList due to changes:" + - " Received: " + Arrays.toString(receivedDeviceList.copyDeviceIds().toArray()) + - " Published: " + Arrays.toString(newDeviceList.copyDeviceIds().toArray())); - Async.go(new Runnable() { - @Override - public void run() { - try { - OmemoService.publishDeviceList(connection(), newDeviceList); - } catch (InterruptedException | XMPPException.XMPPErrorException | - SmackException.NotConnectedException | SmackException.NoResponseException | PubSubException.NotALeafNodeException e) { - LOGGER.log(Level.WARNING, "Could not publish our deviceList upon an received update.", e); - } + if (!newDeviceList.copyDeviceIds().equals(receivedDeviceList.copyDeviceIds())) { + LOGGER.log(Level.FINE, "Republish deviceList due to changes:" + + " Received: " + Arrays.toString(receivedDeviceList.copyDeviceIds().toArray()) + + " Published: " + Arrays.toString(newDeviceList.copyDeviceIds().toArray())); + Async.go(new Runnable() { + @Override + public void run() { + try { + OmemoService.publishDeviceList(connection(), newDeviceList); + } catch (InterruptedException | XMPPException.XMPPErrorException | + SmackException.NotConnectedException | SmackException.NoResponseException | PubSubException.NotALeafNodeException e) { + LOGGER.log(Level.WARNING, "Could not publish our deviceList upon an received update.", e); } - }); - } + } + }); } } } - }; + } /** * StanzaFilter that filters messages containing a OMEMO element. */ - private final StanzaFilter omemoMessageStanzaFilter = new StanzaFilter() { - @Override - public boolean accept(Stanza stanza) { - return stanza instanceof Message && OmemoManager.stanzaContainsOmemoElement(stanza); - } - }; + private static boolean isOmemoMessage(Stanza stanza) { + return stanza instanceof Message && OmemoManager.stanzaContainsOmemoElement(stanza); + } /** * Guard class which ensures that the wrapped OmemoManager knows its BareJid. diff --git a/smack-openpgp/src/main/java/org/jivesoftware/smackx/ox/OpenPgpManager.java b/smack-openpgp/src/main/java/org/jivesoftware/smackx/ox/OpenPgpManager.java index d16296a05..25be6ee16 100644 --- a/smack-openpgp/src/main/java/org/jivesoftware/smackx/ox/OpenPgpManager.java +++ b/smack-openpgp/src/main/java/org/jivesoftware/smackx/ox/OpenPgpManager.java @@ -1,6 +1,6 @@ /** * - * Copyright 2017-2019 Florian Schmaus, 2018 Paul Schaub. + * Copyright 2017-2020 Florian Schmaus, 2018 Paul Schaub. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -39,7 +39,6 @@ import org.jivesoftware.smack.XMPPConnection; import org.jivesoftware.smack.XMPPException; import org.jivesoftware.smack.chat2.Chat; import org.jivesoftware.smack.chat2.ChatManager; -import org.jivesoftware.smack.chat2.IncomingChatMessageListener; import org.jivesoftware.smack.packet.Message; import org.jivesoftware.smack.util.Async; import org.jivesoftware.smack.util.stringencoder.Base64; @@ -180,7 +179,7 @@ public final class OpenPgpManager extends Manager { */ private OpenPgpManager(XMPPConnection connection) { super(connection); - ChatManager.getInstanceFor(connection).addIncomingListener(incomingOpenPgpMessageListener); + ChatManager.getInstanceFor(connection).addIncomingListener(this::incomingChatMessageListener); pepManager = PepManager.getInstanceFor(connection); } @@ -279,7 +278,7 @@ public final class OpenPgpManager extends Manager { publishPublicKey(pepManager, pubkeyElement, primaryFingerprint); // Subscribe to public key changes - PepManager.getInstanceFor(connection()).addPepListener(metadataListener); + PepManager.getInstanceFor(connection()).addPepListener(this::metadataListener); ServiceDiscoveryManager.getInstanceFor(connection()) .addFeature(PEP_NODE_PUBLIC_KEYS_NOTIFY); } @@ -381,7 +380,7 @@ public final class OpenPgpManager extends Manager { * Remove the metadata listener. This method is mainly used in tests. */ public void stopMetadataListener() { - PepManager.getInstanceFor(connection()).removePepListener(metadataListener); + PepManager.getInstanceFor(connection()).removePepListener(this::metadataListener); } /** @@ -502,25 +501,22 @@ public final class OpenPgpManager extends Manager { * * @see XEP-0373 ยง4.4 */ - private final PepListener metadataListener = new PepListener() { - @Override - public void eventReceived(final EntityBareJid from, final EventElement event, final Message message) { - if (PEP_NODE_PUBLIC_KEYS.equals(event.getEvent().getNode())) { - final BareJid contact = from.asBareJid(); - LOGGER.log(Level.INFO, "Received OpenPGP metadata update from " + contact); - Async.go(new Runnable() { - @Override - public void run() { - ItemsExtension items = (ItemsExtension) event.getExtensions().get(0); - PayloadItem payload = (PayloadItem) items.getItems().get(0); - PublicKeysListElement listElement = (PublicKeysListElement) payload.getPayload(); + private void metadataListener(final EntityBareJid from, final EventElement event, final Message message) { + if (PEP_NODE_PUBLIC_KEYS.equals(event.getEvent().getNode())) { + final BareJid contact = from.asBareJid(); + LOGGER.log(Level.INFO, "Received OpenPGP metadata update from " + contact); + Async.go(new Runnable() { + @Override + public void run() { + ItemsExtension items = (ItemsExtension) event.getExtensions().get(0); + PayloadItem payload = (PayloadItem) items.getItems().get(0); + PublicKeysListElement listElement = (PublicKeysListElement) payload.getPayload(); - processPublicKeysListElement(from, listElement); - } - }, "ProcessOXMetadata"); - } + processPublicKeysListElement(from, listElement); + } + }, "ProcessOXMetadata"); } - }; + } private void processPublicKeysListElement(BareJid contact, PublicKeysListElement listElement) { OpenPgpContact openPgpContact = getOpenPgpContact(contact.asEntityBareJidIfPossible()); @@ -548,62 +544,60 @@ public final class OpenPgpManager extends Manager { return provider.decryptAndOrVerify(element, getOpenPgpSelf(), sender); } - private final IncomingChatMessageListener incomingOpenPgpMessageListener = - new IncomingChatMessageListener() { - @Override - public void newIncomingMessage(final EntityBareJid from, final Message message, Chat chat) { - Async.go(new Runnable() { - @Override - public void run() { - OpenPgpElement element = message.getExtension(OpenPgpElement.class); - if (element == null) { - // Message does not contain an OpenPgpElement -> discard - return; - } - - OpenPgpContact contact = getOpenPgpContact(from); - - OpenPgpMessage decrypted = null; - OpenPgpContentElement contentElement = null; - try { - decrypted = decryptOpenPgpElement(element, contact); - contentElement = decrypted.getOpenPgpContentElement(); - } catch (PGPException e) { - LOGGER.log(Level.WARNING, "Could not decrypt incoming OpenPGP encrypted message", e); - } catch (XmlPullParserException | IOException e) { - LOGGER.log(Level.WARNING, "Invalid XML content of incoming OpenPGP encrypted message", e); - } catch (SmackException.NotLoggedInException e) { - LOGGER.log(Level.WARNING, "Cannot determine our JID, since we are not logged in.", e); - } - - if (contentElement instanceof SigncryptElement) { - for (SigncryptElementReceivedListener l : signcryptElementReceivedListeners) { - l.signcryptElementReceived(contact, message, (SigncryptElement) contentElement, decrypted.getMetadata()); - } - return; - } - - if (contentElement instanceof SignElement) { - for (SignElementReceivedListener l : signElementReceivedListeners) { - l.signElementReceived(contact, message, (SignElement) contentElement, decrypted.getMetadata()); - } - return; - } - - if (contentElement instanceof CryptElement) { - for (CryptElementReceivedListener l : cryptElementReceivedListeners) { - l.cryptElementReceived(contact, message, (CryptElement) contentElement, decrypted.getMetadata()); - } - return; - } - - else { - throw new AssertionError("Invalid element received: " + contentElement.getClass().getName()); - } - } - }); + private void incomingChatMessageListener(final EntityBareJid from, final Message message, Chat chat) { + Async.go(new Runnable() { + @Override + public void run() { + OpenPgpElement element = message.getExtension(OpenPgpElement.class); + if (element == null) { + // Message does not contain an OpenPgpElement -> discard + return; } - }; + + OpenPgpContact contact = getOpenPgpContact(from); + + OpenPgpMessage decrypted = null; + OpenPgpContentElement contentElement = null; + try { + decrypted = decryptOpenPgpElement(element, contact); + contentElement = decrypted.getOpenPgpContentElement(); + } catch (PGPException e) { + LOGGER.log(Level.WARNING, "Could not decrypt incoming OpenPGP encrypted message", e); + } catch (XmlPullParserException | IOException e) { + LOGGER.log(Level.WARNING, "Invalid XML content of incoming OpenPGP encrypted message", e); + } catch (SmackException.NotLoggedInException e) { + LOGGER.log(Level.WARNING, "Cannot determine our JID, since we are not logged in.", e); + } + + if (contentElement instanceof SigncryptElement) { + for (SigncryptElementReceivedListener l : signcryptElementReceivedListeners) { + l.signcryptElementReceived(contact, message, (SigncryptElement) contentElement, + decrypted.getMetadata()); + } + return; + } + + if (contentElement instanceof SignElement) { + for (SignElementReceivedListener l : signElementReceivedListeners) { + l.signElementReceived(contact, message, (SignElement) contentElement, decrypted.getMetadata()); + } + return; + } + + if (contentElement instanceof CryptElement) { + for (CryptElementReceivedListener l : cryptElementReceivedListeners) { + l.cryptElementReceived(contact, message, (CryptElement) contentElement, + decrypted.getMetadata()); + } + return; + } + + else { + throw new AssertionError("Invalid element received: " + contentElement.getClass().getName()); + } + } + }); + } /** * Create a {@link PubkeyElement} which contains the OpenPGP public key of {@code owner} which belongs to diff --git a/smack-openpgp/src/main/java/org/jivesoftware/smackx/ox_im/OXInstantMessagingManager.java b/smack-openpgp/src/main/java/org/jivesoftware/smackx/ox_im/OXInstantMessagingManager.java index 1ccdabe0f..4383a02b3 100644 --- a/smack-openpgp/src/main/java/org/jivesoftware/smackx/ox_im/OXInstantMessagingManager.java +++ b/smack-openpgp/src/main/java/org/jivesoftware/smackx/ox_im/OXInstantMessagingManager.java @@ -44,7 +44,6 @@ import org.jivesoftware.smackx.ox.crypto.OpenPgpElementAndMetadata; import org.jivesoftware.smackx.ox.element.OpenPgpContentElement; import org.jivesoftware.smackx.ox.element.OpenPgpElement; import org.jivesoftware.smackx.ox.element.SigncryptElement; -import org.jivesoftware.smackx.ox.listener.SigncryptElementReceivedListener; import org.bouncycastle.openpgp.PGPException; import org.jxmpp.jid.BareJid; @@ -127,7 +126,7 @@ public final class OXInstantMessagingManager extends Manager { private OXInstantMessagingManager(final XMPPConnection connection) { super(connection); openPgpManager = OpenPgpManager.getInstanceFor(connection); - openPgpManager.registerSigncryptReceivedListener(signcryptElementReceivedListener); + openPgpManager.registerSigncryptReceivedListener(this::signcryptElementReceivedListener); announceSupportForOxInstantMessaging(); } @@ -358,12 +357,9 @@ public final class OXInstantMessagingManager extends Manager { message.setBody("This message is encrypted using XEP-0374: OpenPGP for XMPP: Instant Messaging."); } - private final SigncryptElementReceivedListener signcryptElementReceivedListener = new SigncryptElementReceivedListener() { - @Override - public void signcryptElementReceived(OpenPgpContact contact, Message originalMessage, SigncryptElement signcryptElement, OpenPgpMetadata metadata) { - for (OxMessageListener listener : oxMessageListeners) { - listener.newIncomingOxMessage(contact, originalMessage, signcryptElement, metadata); - } + private void signcryptElementReceivedListener(OpenPgpContact contact, Message originalMessage, SigncryptElement signcryptElement, OpenPgpMetadata metadata) { + for (OxMessageListener listener : oxMessageListeners) { + listener.newIncomingOxMessage(contact, originalMessage, signcryptElement, metadata); } - }; + } } diff --git a/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XmppTcpTransportModule.java b/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XmppTcpTransportModule.java index ead3a6438..3df54dc1a 100644 --- a/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XmppTcpTransportModule.java +++ b/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XmppTcpTransportModule.java @@ -21,6 +21,7 @@ import java.net.InetSocketAddress; import java.nio.Buffer; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; +import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; import java.security.KeyManagementException; @@ -61,7 +62,6 @@ import org.jivesoftware.smack.SmackException.SecurityRequiredByServerException; import org.jivesoftware.smack.SmackException.SmackWrappedException; import org.jivesoftware.smack.SmackFuture; import org.jivesoftware.smack.SmackFuture.InternalSmackFuture; -import org.jivesoftware.smack.SmackReactor.ChannelSelectedCallback; import org.jivesoftware.smack.SmackReactor.SelectionKeyAttachment; import org.jivesoftware.smack.XMPPException.FailedNonzaException; import org.jivesoftware.smack.XmppInputOutputFilter; @@ -280,294 +280,292 @@ public class XmppTcpTransportModule extends ModularXmppClientToServerConnectionM } }; - private final ChannelSelectedCallback channelSelectedCallback = - (selectedChannel, selectedSelectionKey) -> { - assert selectionKey == null || selectionKey == selectedSelectionKey; - SocketChannel selectedSocketChannel = (SocketChannel) selectedChannel; - // We are *always* interested in OP_READ. - int newInterestedOps = SelectionKey.OP_READ; - boolean newPendingOutputFilterData = false; + private void onChannelSelected(SelectableChannel selectedChannel, SelectionKey selectedSelectionKey) { + assert selectionKey == null || selectionKey == selectedSelectionKey; + SocketChannel selectedSocketChannel = (SocketChannel) selectedChannel; + // We are *always* interested in OP_READ. + int newInterestedOps = SelectionKey.OP_READ; + boolean newPendingOutputFilterData = false; - if (!channelSelectedCallbackLock.tryLock()) { - rejectedChannelSelectedCallbacks.incrementAndGet(); + if (!channelSelectedCallbackLock.tryLock()) { + rejectedChannelSelectedCallbacks.incrementAndGet(); + return; + } + + handledChannelSelectedCallbacks++; + + long callbackBytesRead = 0; + long callbackBytesWritten = 0; + + try { + boolean destinationAddressChanged = false; + boolean isLastPartOfElement = false; + TopLevelStreamElement currentlyOutgonigTopLevelStreamElement = null; + StringBuilder outgoingStreamForDebugger = null; + + writeLoop: while (true) { + final boolean moreDataAvailable = !isLastPartOfElement || !connectionInternal.outgoingElementsQueue.isEmpty(); + + if (filteredOutgoingBuffer != null || !networkOutgoingBuffers.isEmpty()) { + if (filteredOutgoingBuffer != null) { + networkOutgoingBuffers.add(filteredOutgoingBuffer); + networkOutgoingBuffersBytes += filteredOutgoingBuffer.remaining(); + + filteredOutgoingBuffer = null; + if (moreDataAvailable && networkOutgoingBuffersBytes < 8096) { + continue; + } + } + + ByteBuffer[] output = networkOutgoingBuffers.toArray(new ByteBuffer[networkOutgoingBuffers.size()]); + long bytesWritten; + try { + bytesWritten = selectedSocketChannel.write(output); + } catch (IOException e) { + // We have seen here so far + // - IOException "Broken pipe" + handleReadWriteIoException(e); + break; + } + + if (bytesWritten == 0) { + newInterestedOps |= SelectionKey.OP_WRITE; + break; + } + + callbackBytesWritten += bytesWritten; + + networkOutgoingBuffersBytes -= bytesWritten; + + List prunedBuffers = pruneBufferList(networkOutgoingBuffers); + + for (Buffer prunedBuffer : prunedBuffers) { + List sendElements = bufferToElementMap.remove(prunedBuffer); + if (sendElements == null) { + continue; + } + for (TopLevelStreamElement elementJustSend : sendElements) { + connectionInternal.fireFirstLevelElementSendListeners(elementJustSend); + } + } + + // Prevent one callback from dominating the reactor thread. Break out of the write-loop if we have + // written a certain amount. + if (callbackBytesWritten > CALLBACK_MAX_BYTES_WRITEN) { + newInterestedOps |= SelectionKey.OP_WRITE; + callbackPreemtBecauseBytesWritten++; + break; + } + } else if (outgoingBuffer != null || pendingOutputFilterData) { + pendingOutputFilterData = false; + + if (outgoingBuffer != null) { + totalBytesWrittenBeforeFilter += outgoingBuffer.remaining(); + if (isLastPartOfElement) { + assert currentlyOutgonigTopLevelStreamElement != null; + currentlyOutgoingElements.add(currentlyOutgonigTopLevelStreamElement); + } + } + + ByteBuffer outputFilterInputData = outgoingBuffer; + // We can now null the outgoingBuffer since the filter step will take care of it from now on. + outgoingBuffer = null; + + for (ListIterator it = connectionInternal.getXmppInputOutputFilterBeginIterator(); it.hasNext();) { + XmppInputOutputFilter inputOutputFilter = it.next(); + XmppInputOutputFilter.OutputResult outputResult; + try { + outputResult = inputOutputFilter.output(outputFilterInputData, isLastPartOfElement, + destinationAddressChanged, moreDataAvailable); + } catch (IOException e) { + connectionInternal.notifyConnectionError(e); + break writeLoop; + } + newPendingOutputFilterData |= outputResult.pendingFilterData; + outputFilterInputData = outputResult.filteredOutputData; + if (outputFilterInputData != null) { + outputFilterInputData.flip(); + } + } + + // It is ok if outpuFilterInputData is 'null' here, this is expected behavior. + if (outputFilterInputData != null && outputFilterInputData.hasRemaining()) { + filteredOutgoingBuffer = outputFilterInputData; + } else { + filteredOutgoingBuffer = null; + } + + // If the filters did eventually not produce any output data but if there is + // pending output data then we have a pending write request after read. + if (filteredOutgoingBuffer == null && newPendingOutputFilterData) { + pendingWriteInterestAfterRead = true; + } + + if (filteredOutgoingBuffer != null && isLastPartOfElement) { + bufferToElementMap.put(filteredOutgoingBuffer, new ArrayList<>(currentlyOutgoingElements)); + currentlyOutgoingElements.clear(); + } + + // Reset that the destination address has changed. + if (destinationAddressChanged) { + destinationAddressChanged = false; + } + } else if (outgoingCharSequenceIterator != null) { + CharSequence nextCharSequence = outgoingCharSequenceIterator.next(); + outgoingBuffer = UTF8.encode(nextCharSequence); + if (!outgoingCharSequenceIterator.hasNext()) { + outgoingCharSequenceIterator = null; + isLastPartOfElement = true; + } else { + isLastPartOfElement = false; + } + + final SmackDebugger debugger = connectionInternal.smackDebugger; + if (debugger != null) { + if (outgoingStreamForDebugger == null) { + outgoingStreamForDebugger = new StringBuilder(); + } + outgoingStreamForDebugger.append(nextCharSequence); + + if (isLastPartOfElement) { + try { + outputDebugSplitter.append(outgoingStreamForDebugger); + } catch (IOException e) { + throw new AssertionError(e); + } + debugger.onOutgoingElementCompleted(); + outgoingStreamForDebugger = null; + } + } + } else if (!connectionInternal.outgoingElementsQueue.isEmpty()) { + currentlyOutgonigTopLevelStreamElement = connectionInternal.outgoingElementsQueue.poll(); + if (currentlyOutgonigTopLevelStreamElement instanceof Stanza) { + Stanza currentlyOutgoingStanza = (Stanza) currentlyOutgonigTopLevelStreamElement; + Jid currentDestinationAddress = currentlyOutgoingStanza.getTo(); + destinationAddressChanged = !JidUtil.equals(lastDestinationAddress, currentDestinationAddress); + lastDestinationAddress = currentDestinationAddress; + } + CharSequence nextCharSequence = currentlyOutgonigTopLevelStreamElement.toXML(StreamOpen.CLIENT_NAMESPACE); + if (nextCharSequence instanceof XmlStringBuilder) { + XmlStringBuilder xmlStringBuilder = (XmlStringBuilder) nextCharSequence; + XmlEnvironment outgoingStreamXmlEnvironment = connectionInternal.getOutgoingStreamXmlEnvironment(); + outgoingCharSequenceIterator = xmlStringBuilder.toList(outgoingStreamXmlEnvironment).iterator(); + } else { + outgoingCharSequenceIterator = Collections.singletonList(nextCharSequence).iterator(); + } + assert outgoingCharSequenceIterator != null; + } else { + // There is nothing more to write. + break; + } + } + + pendingOutputFilterData = newPendingOutputFilterData; + if (!pendingWriteInterestAfterRead && pendingOutputFilterData) { + newInterestedOps |= SelectionKey.OP_WRITE; + } + + readLoop: while (true) { + // Prevent one callback from dominating the reactor thread. Break out of the read-loop if we have + // read a certain amount. + if (callbackBytesRead > CALLBACK_MAX_BYTES_READ) { + callbackPreemtBecauseBytesRead++; + break; + } + + int bytesRead; + incomingBuffer.clear(); + try { + bytesRead = selectedSocketChannel.read(incomingBuffer); + } catch (IOException e) { + handleReadWriteIoException(e); return; } - handledChannelSelectedCallbacks++; - - long callbackBytesRead = 0; - long callbackBytesWritten = 0; - - try { - boolean destinationAddressChanged = false; - boolean isLastPartOfElement = false; - TopLevelStreamElement currentlyOutgonigTopLevelStreamElement = null; - StringBuilder outgoingStreamForDebugger = null; - - writeLoop: while (true) { - final boolean moreDataAvailable = !isLastPartOfElement || !connectionInternal.outgoingElementsQueue.isEmpty(); - - if (filteredOutgoingBuffer != null || !networkOutgoingBuffers.isEmpty()) { - if (filteredOutgoingBuffer != null) { - networkOutgoingBuffers.add(filteredOutgoingBuffer); - networkOutgoingBuffersBytes += filteredOutgoingBuffer.remaining(); - - filteredOutgoingBuffer = null; - if (moreDataAvailable && networkOutgoingBuffersBytes < 8096) { - continue; - } - } - - ByteBuffer[] output = networkOutgoingBuffers.toArray(new ByteBuffer[networkOutgoingBuffers.size()]); - long bytesWritten; - try { - bytesWritten = selectedSocketChannel.write(output); - } catch (IOException e) { - // We have seen here so far - // - IOException "Broken pipe" - handleReadWriteIoException(e); - break; - } - - if (bytesWritten == 0) { - newInterestedOps |= SelectionKey.OP_WRITE; - break; - } - - callbackBytesWritten += bytesWritten; - - networkOutgoingBuffersBytes -= bytesWritten; - - List prunedBuffers = pruneBufferList(networkOutgoingBuffers); - - for (Buffer prunedBuffer : prunedBuffers) { - List sendElements = bufferToElementMap.remove(prunedBuffer); - if (sendElements == null) { - continue; - } - for (TopLevelStreamElement elementJustSend : sendElements) { - connectionInternal.fireFirstLevelElementSendListeners(elementJustSend); - } - } - - // Prevent one callback from dominating the reactor thread. Break out of the write-loop if we have - // written a certain amount. - if (callbackBytesWritten > CALLBACK_MAX_BYTES_WRITEN) { - newInterestedOps |= SelectionKey.OP_WRITE; - callbackPreemtBecauseBytesWritten++; - break; - } - } else if (outgoingBuffer != null || pendingOutputFilterData) { - pendingOutputFilterData = false; - - if (outgoingBuffer != null) { - totalBytesWrittenBeforeFilter += outgoingBuffer.remaining(); - if (isLastPartOfElement) { - assert currentlyOutgonigTopLevelStreamElement != null; - currentlyOutgoingElements.add(currentlyOutgonigTopLevelStreamElement); - } - } - - ByteBuffer outputFilterInputData = outgoingBuffer; - // We can now null the outgoingBuffer since the filter step will take care of it from now on. - outgoingBuffer = null; - - for (ListIterator it = connectionInternal.getXmppInputOutputFilterBeginIterator(); it.hasNext();) { - XmppInputOutputFilter inputOutputFilter = it.next(); - XmppInputOutputFilter.OutputResult outputResult; - try { - outputResult = inputOutputFilter.output(outputFilterInputData, isLastPartOfElement, - destinationAddressChanged, moreDataAvailable); - } catch (IOException e) { - connectionInternal.notifyConnectionError(e); - break writeLoop; - } - newPendingOutputFilterData |= outputResult.pendingFilterData; - outputFilterInputData = outputResult.filteredOutputData; - if (outputFilterInputData != null) { - outputFilterInputData.flip(); - } - } - - // It is ok if outpuFilterInputData is 'null' here, this is expected behavior. - if (outputFilterInputData != null && outputFilterInputData.hasRemaining()) { - filteredOutgoingBuffer = outputFilterInputData; - } else { - filteredOutgoingBuffer = null; - } - - // If the filters did eventually not produce any output data but if there is - // pending output data then we have a pending write request after read. - if (filteredOutgoingBuffer == null && newPendingOutputFilterData) { - pendingWriteInterestAfterRead = true; - } - - if (filteredOutgoingBuffer != null && isLastPartOfElement) { - bufferToElementMap.put(filteredOutgoingBuffer, new ArrayList<>(currentlyOutgoingElements)); - currentlyOutgoingElements.clear(); - } - - // Reset that the destination address has changed. - if (destinationAddressChanged) { - destinationAddressChanged = false; - } - } else if (outgoingCharSequenceIterator != null) { - CharSequence nextCharSequence = outgoingCharSequenceIterator.next(); - outgoingBuffer = UTF8.encode(nextCharSequence); - if (!outgoingCharSequenceIterator.hasNext()) { - outgoingCharSequenceIterator = null; - isLastPartOfElement = true; - } else { - isLastPartOfElement = false; - } - - final SmackDebugger debugger = connectionInternal.smackDebugger; - if (debugger != null) { - if (outgoingStreamForDebugger == null) { - outgoingStreamForDebugger = new StringBuilder(); - } - outgoingStreamForDebugger.append(nextCharSequence); - - if (isLastPartOfElement) { - try { - outputDebugSplitter.append(outgoingStreamForDebugger); - } catch (IOException e) { - throw new AssertionError(e); - } - debugger.onOutgoingElementCompleted(); - outgoingStreamForDebugger = null; - } - } - } else if (!connectionInternal.outgoingElementsQueue.isEmpty()) { - currentlyOutgonigTopLevelStreamElement = connectionInternal.outgoingElementsQueue.poll(); - if (currentlyOutgonigTopLevelStreamElement instanceof Stanza) { - Stanza currentlyOutgoingStanza = (Stanza) currentlyOutgonigTopLevelStreamElement; - Jid currentDestinationAddress = currentlyOutgoingStanza.getTo(); - destinationAddressChanged = !JidUtil.equals(lastDestinationAddress, currentDestinationAddress); - lastDestinationAddress = currentDestinationAddress; - } - CharSequence nextCharSequence = currentlyOutgonigTopLevelStreamElement.toXML(StreamOpen.CLIENT_NAMESPACE); - if (nextCharSequence instanceof XmlStringBuilder) { - XmlStringBuilder xmlStringBuilder = (XmlStringBuilder) nextCharSequence; - XmlEnvironment outgoingStreamXmlEnvironment = connectionInternal.getOutgoingStreamXmlEnvironment(); - outgoingCharSequenceIterator = xmlStringBuilder.toList(outgoingStreamXmlEnvironment).iterator(); - } else { - outgoingCharSequenceIterator = Collections.singletonList(nextCharSequence).iterator(); - } - assert outgoingCharSequenceIterator != null; - } else { - // There is nothing more to write. - break; - } - } - - pendingOutputFilterData = newPendingOutputFilterData; - if (!pendingWriteInterestAfterRead && pendingOutputFilterData) { - newInterestedOps |= SelectionKey.OP_WRITE; - } - - readLoop: while (true) { - // Prevent one callback from dominating the reactor thread. Break out of the read-loop if we have - // read a certain amount. - if (callbackBytesRead > CALLBACK_MAX_BYTES_READ) { - callbackPreemtBecauseBytesRead++; - break; - } - - int bytesRead; - incomingBuffer.clear(); - try { - bytesRead = selectedSocketChannel.read(incomingBuffer); - } catch (IOException e) { - handleReadWriteIoException(e); - return; - } - - if (bytesRead < 0) { - LOGGER.finer("NIO read() returned " + bytesRead - + " for " + this + ". This probably means that the TCP connection was terminated."); - // According to the socket channel javadoc section about "asynchronous reads" a socket channel's - // read() may return -1 if the input side of a socket is shut down. - - // Note that we do not call notifyConnectionError() here because the connection may be - // cleanly shutdown which would also cause read() to return '-1. I assume that this socket - // will be selected again, on which read() would throw an IOException, which will be catched - // and invoke notifyConnectionError() (see a few lines above). - /* - IOException exception = new IOException("NIO read() returned " + bytesRead); - notifyConnectionError(exception); - */ - return; - } - - if (!pendingInputFilterData) { - if (bytesRead == 0) { - // Nothing more to read. - break; - } - } else { - pendingInputFilterData = false; - } - - // We have successfully read something. It is now possible that a filter is now also able to write - // additional data (for example SSLEngine). - if (pendingWriteInterestAfterRead) { - pendingWriteInterestAfterRead = false; - newInterestedOps |= SelectionKey.OP_WRITE; - } - - callbackBytesRead += bytesRead; - - ByteBuffer filteredIncomingBuffer = incomingBuffer; - for (ListIterator it = connectionInternal.getXmppInputOutputFilterEndIterator(); it.hasPrevious();) { - filteredIncomingBuffer.flip(); - - ByteBuffer newFilteredIncomingBuffer; - try { - newFilteredIncomingBuffer = it.previous().input(filteredIncomingBuffer); - } catch (IOException e) { - connectionInternal.notifyConnectionError(e); - return; - } - if (newFilteredIncomingBuffer == null) { - break readLoop; - } - filteredIncomingBuffer = newFilteredIncomingBuffer; - } - - final int bytesReadAfterFilter = filteredIncomingBuffer.flip().remaining(); - - totalBytesReadAfterFilter += bytesReadAfterFilter; - - try { - splitter.write(filteredIncomingBuffer); - } catch (IOException e) { - connectionInternal.notifyConnectionError(e); - return; - } - } - } finally { - totalBytesWritten += callbackBytesWritten; - totalBytesRead += callbackBytesRead; - - channelSelectedCallbackLock.unlock(); + if (bytesRead < 0) { + LOGGER.finer("NIO read() returned " + bytesRead + + " for " + this + ". This probably means that the TCP connection was terminated."); + // According to the socket channel javadoc section about "asynchronous reads" a socket channel's + // read() may return -1 if the input side of a socket is shut down. + // Note that we do not call notifyConnectionError() here because the connection may be + // cleanly shutdown which would also cause read() to return '-1. I assume that this socket + // will be selected again, on which read() would throw an IOException, which will be catched + // and invoke notifyConnectionError() (see a few lines above). + /* + IOException exception = new IOException("NIO read() returned " + bytesRead); + notifyConnectionError(exception); + */ + return; } - // Indicate that there is no reactor thread racing towards handling this selection key. - final SelectionKeyAttachment selectionKeyAttachment = this.selectionKeyAttachment; - if (selectionKeyAttachment != null) { - selectionKeyAttachment.resetReactorThreadRacing(); + if (!pendingInputFilterData) { + if (bytesRead == 0) { + // Nothing more to read. + break; + } + } else { + pendingInputFilterData = false; } - // Check the queue again to prevent lost wakeups caused by elements inserted before we - // called resetReactorThreadRacing() a few lines above. - if (!connectionInternal.outgoingElementsQueue.isEmpty()) { - setWriteInterestAfterChannelSelectedCallback.incrementAndGet(); + // We have successfully read something. It is now possible that a filter is now also able to write + // additional data (for example SSLEngine). + if (pendingWriteInterestAfterRead) { + pendingWriteInterestAfterRead = false; newInterestedOps |= SelectionKey.OP_WRITE; } - connectionInternal.setInterestOps(selectionKey, newInterestedOps); - }; + callbackBytesRead += bytesRead; + + ByteBuffer filteredIncomingBuffer = incomingBuffer; + for (ListIterator it = connectionInternal.getXmppInputOutputFilterEndIterator(); it.hasPrevious();) { + filteredIncomingBuffer.flip(); + + ByteBuffer newFilteredIncomingBuffer; + try { + newFilteredIncomingBuffer = it.previous().input(filteredIncomingBuffer); + } catch (IOException e) { + connectionInternal.notifyConnectionError(e); + return; + } + if (newFilteredIncomingBuffer == null) { + break readLoop; + } + filteredIncomingBuffer = newFilteredIncomingBuffer; + } + + final int bytesReadAfterFilter = filteredIncomingBuffer.flip().remaining(); + + totalBytesReadAfterFilter += bytesReadAfterFilter; + + try { + splitter.write(filteredIncomingBuffer); + } catch (IOException e) { + connectionInternal.notifyConnectionError(e); + return; + } + } + } finally { + totalBytesWritten += callbackBytesWritten; + totalBytesRead += callbackBytesRead; + + channelSelectedCallbackLock.unlock(); + } + + // Indicate that there is no reactor thread racing towards handling this selection key. + final SelectionKeyAttachment selectionKeyAttachment = this.selectionKeyAttachment; + if (selectionKeyAttachment != null) { + selectionKeyAttachment.resetReactorThreadRacing(); + } + + // Check the queue again to prevent lost wakeups caused by elements inserted before we + // called resetReactorThreadRacing() a few lines above. + if (!connectionInternal.outgoingElementsQueue.isEmpty()) { + setWriteInterestAfterChannelSelectedCallback.incrementAndGet(); + newInterestedOps |= SelectionKey.OP_WRITE; + } + + connectionInternal.setInterestOps(selectionKey, newInterestedOps); + } private void handleReadWriteIoException(IOException e) { if (e instanceof ClosedChannelException && !tcpNioTransport.isConnected()) { @@ -677,7 +675,7 @@ public class XmppTcpTransportModule extends ModularXmppClientToServerConnectionM } @Override - public Stats getStats() { + public XmppTcpTransportModule.Stats getStats() { return XmppTcpTransportModule.this.getStats(); } @@ -774,7 +772,7 @@ public class XmppTcpTransportModule extends ModularXmppClientToServerConnectionM remoteAddress = (InetSocketAddress) socketChannel.socket().getRemoteSocketAddress(); selectionKey = connectionInternal.registerWithSelector(socketChannel, SelectionKey.OP_READ, - channelSelectedCallback); + XmppTcpTransportModule.this::onChannelSelected); selectionKeyAttachment = (SelectionKeyAttachment) selectionKey.attachment(); connectionInternal.setTransport(tcpNioTransport); @@ -1316,7 +1314,7 @@ public class XmppTcpTransportModule extends ModularXmppClientToServerConnectionM pendingOutputFilterData = true; } - channelSelectedCallback.onChannelSelected(channel, key); + onChannelSelected(channel, key); } finally { channelSelectedCallbackLock.unlock(); } @@ -1347,7 +1345,7 @@ public class XmppTcpTransportModule extends ModularXmppClientToServerConnectionM return CollectionUtil.removeUntil(buffers, b -> b.hasRemaining()); } - public Stats getStats() { + public XmppTcpTransportModule.Stats getStats() { return new Stats(this); }