/** * * Copyright 2014-2019 Florian Schmaus * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.jivesoftware.smack.util; import java.util.AbstractQueue; import java.util.Collection; import java.util.Iterator; import java.util.NoSuchElementException; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; /** * Like ArrayBlockingQueue but with additional {@link #shutdown()} and {@link #start} methods. Will * throw {@link InterruptedException} if Queue has been shutdown on {@link #take()} and * {@link #poll(long, TimeUnit)}. *
* Based on ArrayBlockingQueue of OpenJDK by Doug Lea (who released ArrayBlockingQueue as public
* domain).
*
* @param
* This may throw an {@link InterruptedException} in two cases
* true
if the queues was shutdown before, false
if not.
*/
public boolean start() {
boolean previousIsShutdown;
lock.lock();
try {
previousIsShutdown = isShutdown;
isShutdown = false;
}
finally {
lock.unlock();
}
return previousIsShutdown;
}
/**
* Returns true if the queue is currently shut down.
*
* @return true if the queue is shut down.
*/
public boolean isShutdown() {
lock.lock();
try {
return isShutdown;
} finally {
lock.unlock();
}
}
@Override
public E poll() {
lock.lock();
try {
if (hasNoElements()) {
return null;
}
E e = extract();
return e;
}
finally {
lock.unlock();
}
}
@Override
public E peek() {
lock.lock();
try {
return hasNoElements() ? null : items[takeIndex];
}
finally {
lock.unlock();
}
}
@Override
public boolean offer(E e) {
checkNotNull(e);
lock.lock();
try {
if (isFull() || isShutdown) {
return false;
}
else {
insert(e);
return true;
}
}
finally {
lock.unlock();
}
}
public boolean offerAndShutdown(E e) {
checkNotNull(e);
boolean res;
lock.lock();
try {
res = offer(e);
shutdown();
} finally {
lock.unlock();
}
return res;
}
private void putInternal(E e, boolean signalNotEmpty) throws InterruptedException {
assert lock.isHeldByCurrentThread();
while (isFull()) {
try {
notFull.await();
checkNotShutdown();
}
catch (InterruptedException ie) {
notFull.signal();
throw ie;
}
}
insert(e, signalNotEmpty);
}
/**
* Inserts the specified element into this queue, waiting if necessary
* for space to become available.
*
*
* So you have to check which is the case, e.g. by calling {@link #isShutdown()}.
*
* @param e the element to add.
* @throws InterruptedException if interrupted while waiting or if the queue was shut down.
*/
@Override
public void put(E e) throws InterruptedException {
checkNotNull(e);
lock.lockInterruptibly();
try {
putInternal(e, true);
}
finally {
lock.unlock();
}
}
/**
* Put if the queue has not been shutdown yet.
*
* @param e the element to put into the queue.
* @return true
if the element has been put into the queue, false
if the queue was shutdown.
* @throws InterruptedException if the calling thread was interrupted.
* @since 4.4
*/
public boolean putIfNotShutdown(E e) throws InterruptedException {
checkNotNull(e);
lock.lockInterruptibly();
try {
if (isShutdown) {
return false;
}
putInternal(e, true);
return true;
} finally {
lock.unlock();
}
}
public void putAll(Collection extends E> elements) throws InterruptedException {
checkNotNull(elements);
lock.lockInterruptibly();
try {
for (E element : elements) {
putInternal(element, false);
}
} finally {
notEmpty.signalAll();
lock.unlock();
}
}
public enum TryPutResult {
/**
* The method was unable to acquire the queue lock.
*/
couldNotLock,
/**
* The queue was shut down.
*/
queueWasShutDown,
/**
* The method was unable to put another element into the queue because the queue was full.
*/
queueWasFull,
/**
* The element was successfully placed into the queue.
*/
putSuccessful,
}
public TryPutResult tryPut(E e) {
checkNotNull(e);
boolean locked = lock.tryLock();
if (!locked) {
return TryPutResult.couldNotLock;
}
try {
if (isShutdown) {
return TryPutResult.queueWasShutDown;
}
if (isFull()) {
return TryPutResult.queueWasFull;
}
insert(e);
return TryPutResult.putSuccessful;
} finally {
lock.unlock();
}
}
@Override
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
checkNotNull(e);
long nanos = unit.toNanos(timeout);
lock.lockInterruptibly();
try {
while (true) {
if (isNotFull()) {
insert(e);
return true;
}
if (nanos <= 0) {
return false;
}
try {
nanos = notFull.awaitNanos(nanos);
checkNotShutdown();
}
catch (InterruptedException ie) {
notFull.signal();
throw ie;
}
}
}
finally {
lock.unlock();
}
}
@Override
public E take() throws InterruptedException {
lock.lockInterruptibly();
try {
checkNotShutdown();
try {
while (hasNoElements()) {
notEmpty.await();
checkNotShutdown();
}
}
catch (InterruptedException ie) {
notEmpty.signal();
throw ie;
}
E e = extract();
return e;
}
finally {
lock.unlock();
}
}
public enum TryTakeResultCode {
/**
* The method was unable to acquire the queue lock.
*/
couldNotLock,
/**
* The queue was shut down.
*/
queueWasShutDown,
/**
* The queue was empty.
*/
queueWasEmpty,
/**
* An element was successfully removed from the queue.
*/
takeSuccessful,
}
public static final class TryTakeResult