Add AbstractBlockingQueueWithShutdown.putAll(Collection)

This commit is contained in:
Florian Schmaus 2018-09-11 21:53:48 +02:00
parent 98c7ba1aa9
commit 09279b8ac0
1 changed files with 38 additions and 12 deletions

View File

@ -58,10 +58,16 @@ public class ArrayBlockingQueueWithShutdown<E> extends AbstractQueue<E> implemen
} }
private void insert(E e) { private void insert(E e) {
insert(e, true);
}
private void insert(E e, boolean signalNotEmpty) {
items[putIndex] = e; items[putIndex] = e;
putIndex = inc(putIndex); putIndex = inc(putIndex);
count++; count++;
notEmpty.signal(); if (signalNotEmpty) {
notEmpty.signal();
}
} }
private E extract() { private E extract() {
@ -226,6 +232,22 @@ public class ArrayBlockingQueueWithShutdown<E> extends AbstractQueue<E> implemen
} }
} }
private void putInternal(E e, boolean signalNotEmpty) throws InterruptedException {
assert lock.isHeldByCurrentThread();
while (isFull()) {
try {
notFull.await();
checkNotShutdown();
}
catch (InterruptedException ie) {
notFull.signal();
throw ie;
}
}
insert(e, signalNotEmpty);
}
/** /**
* Inserts the specified element into this queue, waiting if necessary * Inserts the specified element into this queue, waiting if necessary
* for space to become available. * for space to become available.
@ -246,23 +268,27 @@ public class ArrayBlockingQueueWithShutdown<E> extends AbstractQueue<E> implemen
lock.lockInterruptibly(); lock.lockInterruptibly();
try { try {
while (isFull()) { putInternal(e, true);
try {
notFull.await();
checkNotShutdown();
}
catch (InterruptedException ie) {
notFull.signal();
throw ie;
}
}
insert(e);
} }
finally { finally {
lock.unlock(); lock.unlock();
} }
} }
public void putAll(Collection<? extends E> elements) throws InterruptedException {
checkNotNull(elements);
lock.lockInterruptibly();
try {
for (E element : elements) {
putInternal(element, false);
}
} finally {
notEmpty.signalAll();
lock.unlock();
}
}
public enum TryPutResult { public enum TryPutResult {
/** /**
* The method was unable to acquire the queue lock. * The method was unable to acquire the queue lock.