diff --git a/resources/releasedocs/changelog.html b/resources/releasedocs/changelog.html
index 688d8bd6f..7ffbe2f85 100644
--- a/resources/releasedocs/changelog.html
+++ b/resources/releasedocs/changelog.html
@@ -141,6 +141,18 @@ hr {
+
4.1.5 -- 2015-11-22
+
+
Bug
+
+
+- [SMACK-698] - Time creates invalid XML
+
+- [SMACK-700] - Duplicate stanzas in unacknowledgedStanzas queue when stream is resumed
+
+- [SMACK-702] - RejectedExecutionException in AbstractXMPPConnection.processPacket() causes connection Termination
+
+
4.1.4 -- 2015-09-14
diff --git a/smack-core/src/main/java/org/jivesoftware/smack/AbstractXMPPConnection.java b/smack-core/src/main/java/org/jivesoftware/smack/AbstractXMPPConnection.java
index d99cbf563..98a38a83b 100644
--- a/smack-core/src/main/java/org/jivesoftware/smack/AbstractXMPPConnection.java
+++ b/smack-core/src/main/java/org/jivesoftware/smack/AbstractXMPPConnection.java
@@ -27,14 +27,12 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
@@ -75,6 +73,7 @@ import org.jivesoftware.smack.parsing.ParsingExceptionCallback;
import org.jivesoftware.smack.provider.ExtensionElementProvider;
import org.jivesoftware.smack.provider.ProviderManager;
import org.jivesoftware.smack.sasl.core.SASLAnonymous;
+import org.jivesoftware.smack.util.BoundedThreadPoolExecutor;
import org.jivesoftware.smack.util.DNSUtil;
import org.jivesoftware.smack.util.Objects;
import org.jivesoftware.smack.util.PacketParserUtils;
@@ -233,8 +232,8 @@ public abstract class AbstractXMPPConnection implements XMPPConnection {
* important that we use a
single threaded ExecutorService in order to guarantee that the
* PacketListeners are invoked in the same order the stanzas arrived.
*/
- private final ThreadPoolExecutor executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS,
- new ArrayBlockingQueue
(100), new SmackExecutorThreadFactory(this, "Incoming Processor"));
+ private final BoundedThreadPoolExecutor executorService = new BoundedThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS,
+ 100, new SmackExecutorThreadFactory(this, "Incoming Processor"));
/**
* This scheduled thread pool executor is used to remove pending callbacks.
@@ -1002,12 +1001,13 @@ public abstract class AbstractXMPPConnection implements XMPPConnection {
* they are a match with the filter.
*
* @param stanza the stanza to process.
+ * @throws InterruptedException
*/
- protected void processStanza(final Stanza stanza) {
+ protected void processStanza(final Stanza stanza) throws InterruptedException {
assert(stanza != null);
lastStanzaReceived = System.currentTimeMillis();
// Deliver the incoming packet to listeners.
- executorService.submit(new Runnable() {
+ executorService.executeBlocking(new Runnable() {
@Override
public void run() {
invokePacketCollectorsAndNotifyRecvListeners(stanza);
diff --git a/smack-core/src/main/java/org/jivesoftware/smack/util/BoundedThreadPoolExecutor.java b/smack-core/src/main/java/org/jivesoftware/smack/util/BoundedThreadPoolExecutor.java
new file mode 100644
index 000000000..c09ddd5d5
--- /dev/null
+++ b/smack-core/src/main/java/org/jivesoftware/smack/util/BoundedThreadPoolExecutor.java
@@ -0,0 +1,63 @@
+/**
+ *
+ * Copyright 2015 Florian Schmaus
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.jivesoftware.smack.util;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+public class BoundedThreadPoolExecutor extends ThreadPoolExecutor {
+
+ private final Semaphore semaphore;
+
+ public BoundedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
+ TimeUnit unit, int bound, ThreadFactory threadFactory) {
+ // One could think that the array blocking queue bound should be "bound - 1" because the bound protected by the
+ // Semaphore also includes the "slot" in the worker thread executing the Runnable. But using that as bound could
+ // actually cause a RejectedExecutionException as the queue could fill up while the worker thread remains
+ // unscheduled and is thus not removing any entries.
+ super(corePoolSize, maximumPoolSize, keepAliveTime,
+ unit, new ArrayBlockingQueue(bound), threadFactory);
+ semaphore = new Semaphore(bound);
+ }
+
+ public void executeBlocking(final Runnable command) throws InterruptedException {
+ semaphore.acquire();
+ try {
+ execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ command.run();
+ } finally {
+ semaphore.release();
+ }
+ }
+ });
+ } catch (Exception e) {
+ semaphore.release();
+ if (e instanceof RejectedExecutionException) {
+ throw (RejectedExecutionException) e;
+ } else {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+}
diff --git a/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java b/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java
index bd179028a..62afe2520 100644
--- a/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java
+++ b/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java
@@ -1086,16 +1086,17 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
if (!smSessionId.equals(resumed.getPrevId())) {
throw new StreamIdDoesNotMatchException(smSessionId, resumed.getPrevId());
}
+ // Mark SM as enabled and resumption as successful.
+ smResumedSyncPoint.reportSuccess();
+ smEnabledSyncPoint.reportSuccess();
// First, drop the stanzas already handled by the server
processHandledCount(resumed.getHandledCount());
// Then re-send what is left in the unacknowledged queue
- List stanzasToResend = new LinkedList();
- stanzasToResend.addAll(unacknowledgedStanzas);
+ List stanzasToResend = new ArrayList<>(unacknowledgedStanzas.size());
+ unacknowledgedStanzas.drainTo(stanzasToResend);
for (Stanza stanza : stanzasToResend) {
- packetWriter.sendStreamElement(stanza);
+ sendStanzaInternal(stanza);
}
- smResumedSyncPoint.reportSuccess();
- smEnabledSyncPoint.reportSuccess();
// If there where stanzas resent, then request a SM ack for them.
// Writer's sendStreamElement() won't do it automatically based on
// predicates.
@@ -1469,7 +1470,7 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
*/
@Deprecated
public static void setUseStreamManagementResumptiodDefault(boolean useSmResumptionDefault) {
- setUseStreamManagementDefault(useSmResumptionDefault);
+ setUseStreamManagementResumptionDefault(useSmResumptionDefault);
}
/**
@@ -1765,7 +1766,7 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
private void processHandledCount(long handledCount) throws StreamManagementCounterError {
long ackedStanzasCount = SMUtils.calculateDelta(handledCount, serverHandledStanzasCount);
final List ackedStanzas = new ArrayList(
- handledCount <= Integer.MAX_VALUE ? (int) handledCount
+ ackedStanzasCount <= Integer.MAX_VALUE ? (int) ackedStanzasCount
: Integer.MAX_VALUE);
for (long i = 0; i < ackedStanzasCount; i++) {
Stanza ackedStanza = unacknowledgedStanzas.poll();