-
Notifications
You must be signed in to change notification settings - Fork 4
Open
Description
/*
* Copyright (c) 2013-2025 Cinchapi Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.cinchapi.concourse.server.concurrent;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.ReadWriteLock;
import com.cinchapi.common.reflect.Reflection;
import com.cinchapi.concourse.util.Logger;
/**
* A {@link ReadWriteLock} that enforces a configurable priority for either
* readers or writers under contention.
* <p>
* A {@link BiasedLock} is designed for workloads with predictable access
* patterns where giving preference to either readers or writers can improve
* overall throughput. For example, in read-heavy workloads, prioritizing
* readers can increase concurrency, while in write-heavy workloads,
* prioritizing writers can reduce lock contention and improve write throughput.
* </p>
* <p>
* Unlike standard read-write locks, this implementation actively enforces the
* configured bias by allowing the preferred lock type to "jump ahead" of
* waiting threads of the non-preferred type. However, it also includes
* starvation prevention mechanisms to ensure that non-preferred lock requests
* will eventually be granted, even under continuous contention from preferred
* lock types.
* </p>
* <p>
* <strong>IMPORTANT:</strong> This lock is <em>not</em> reentrant. A thread
* that holds a read lock cannot acquire another read lock, and a thread that
* holds a write lock cannot acquire another write lock. Attempting to do so
* will result in deadlock. Additionally, there is no automatic upgrading from
* read to write locks - a thread must release its read lock before acquiring a
* write lock.
* </p>
* <p>
* This lock is particularly useful in scenarios where the application has
* specific performance requirements that can benefit from biased lock
* acquisition policies.
* </p>
*
* @author Jeff Nelson
*/
@SuppressWarnings("restriction")
public class BiasedLock implements ReadWriteLock {
/**
* Creates a new {@link BiasedLock} instance with the default bias favoring
* read operations.
* <p>
* This is equivalent to calling {@link #favoringReads()} and is suitable
* for read-heavy workloads where maximizing concurrent read access is more
* important than write latency.
* </p>
*
* @return a new {@link BiasedLock} instance that favors read operations
*/
public static BiasedLock create() {
return favoringReads();
}
/**
* Creates a new {@link BiasedLock} instance that favors read operations
* over write operations.
* <p>
* When there is contention for the lock, read operations will be given
* priority. This configuration is optimal for read-heavy workloads where
* maximizing concurrent read access is more important than write latency.
* </p>
*
* @return a new {@link BiasedLock} instance that favors read operations
*/
public static BiasedLock favoringReads() {
return new BiasedLock(Bias.READS);
}
/**
* Creates a new {@link BiasedLock} instance that favors write operations
* over read operations.
* <p>
* When there is contention for the lock, write operations will be given
* priority. This configuration is optimal for write-heavy workloads or
* scenarios where minimizing write latency is more critical than maximizing
* read concurrency.
* </p>
*
* @return a new {@link BiasedLock} instance that favors write operations
*/
public static BiasedLock favoringWrites() {
return new BiasedLock(Bias.WRITES);
}
/**
* Returns the number of exclusive holds represented in count
*
* @param c the current state value
* @return the number of exclusive (writer) locks currently held (0 or 1)
*/
static int exclusiveCount(int c) {
return c & EXCLUSIVE_MASK;
}
/**
* Returns the number of shared holds represented in count
*
* @param c the current state value
* @return the number of shared (reader) locks currently held
*/
static int sharedCount(int c) {
return c >>> SHARED_SHIFT;
}
/**
* Bit shift for the shared (reader) count in the state value. This
* determines how many bits are allocated for reader vs writer counts in the
* state integer.
*/
private static final int SHARED_SHIFT = 16;
/**
* Unit increment for each reader in the state value. Each reader
* acquisition adds this value to the state.
*/
private static final int SHARED_UNIT = (1 << SHARED_SHIFT);
/**
* Maximum number of concurrent readers supported by this lock
* implementation. This is determined by the number of bits allocated for
* reader count.
*/
private static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
/**
* Mask to extract the exclusive (writer) count from the state value.
* Used to determine if a writer holds the lock.
*/
private static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
/**
* Maximum backoff time in nanoseconds before a deferred actor
* is queued normally.
*/
private static final long MAX_BACKOFF_NANOS = 1_000_000;
/**
* Initial backoff time in nanoseconds for deferred actors.
*/
private static final long INITIAL_BACKOFF_NANOS = 100_000;
/**
* Instance of {@link sun.misc.Unsafe} to use for low-level operations.
* This provides direct memory access for improved performance in critical
* sections.
*/
private static final sun.misc.Unsafe unsafe = Reflection
.getStatic("theUnsafe", sun.misc.Unsafe.class);
/**
* Memory offset for the head field in {@link AbstractQueuedSynchronizer}.
* Used for direct field access via Unsafe.
*/
private static final long HEAD_OFFSET;
/**
* Memory offset for the next field in
* {@link AbstractQueuedSynchronizer.Node}.
* Used for direct field access via Unsafe.
*/
private static final long NEXT_OFFSET;
/**
* Memory offset for the nextWaiter field in
* {@link AbstractQueuedSynchronizer.Node}.
* Used for direct field access via Unsafe.
*/
private static final long NEXT_WAIT_OFFSET;
/**
* Reference to the SHARED static field in
* {@link AbstractQueuedSynchronizer.Node}.
* Used to identify nodes that represent shared (read) lock requests.
*/
private static final Object SHARED;
/**
* Memory offset for the thread field in
* {@link AbstractQueuedSynchronizer.Node}.
* Used for direct field access via Unsafe.
*/
private static final long THREAD_OFFSET;
static {
HEAD_OFFSET = unsafe.objectFieldOffset(Reflection
.getDeclaredField("head", AbstractQueuedSynchronizer.class));
Class<?> NODE_CLASS = Reflection.getClassCasted(
"java.util.concurrent.locks.AbstractQueuedSynchronizer$Node");
NEXT_OFFSET = unsafe.objectFieldOffset(
Reflection.getDeclaredField("next", NODE_CLASS));
NEXT_WAIT_OFFSET = unsafe.objectFieldOffset(
Reflection.getDeclaredField("nextWaiter", NODE_CLASS));
SHARED = Reflection.getStatic("SHARED", NODE_CLASS);
THREAD_OFFSET = unsafe.objectFieldOffset(
Reflection.getDeclaredField("thread", NODE_CLASS));
}
/**
* The bias preference for this lock instance, determining whether readers
* or writers are given priority under contention.
*/
private final Bias bias;
/**
* The synchronizer that manages lock state and implements the
* core locking mechanisms.
*/
private final Sync sync;
/**
* The read lock implementation returned by {@link #readLock()}.
*/
private final Lock readLock = new ReadLock();
/**
* The write lock implementation returned by {@link #writeLock()}.
*/
private final Lock writeLock = new WriteLock();
/**
* Construct a new instance.
*
* @param bias the preference for this lock, determining whether readers or
* writers
* are given priority under contention
*/
public BiasedLock(Bias bias) {
this.bias = bias;
this.sync = new Sync();
}
@Override
public Lock readLock() {
return readLock;
}
@Override
public Lock writeLock() {
return writeLock;
}
/**
* Defines the bias preference for the lock, determining whether read or
* write operations should be given priority when there is contention.
*/
public enum Bias {
/**
* Indicates that read operations should be given priority over write
* operations when there is contention for the lock. This is useful for
* read-heavy workloads where maximizing concurrent read access is more
* important than write latency.
*/
READS,
/**
* Indicates that write operations should be given priority over read
* operations when there is contention for the lock. This is useful for
* write-heavy workloads or when write latency is more critical than
* read concurrency.
*/
WRITES
}
/**
* Read lock implementation with adaptive back-off for non-preferred actors
*/
private class ReadLock implements Lock {
@Override
public void lock() {
if(bias == Bias.READS) {
sync.acquireShared(1);
}
else {
long backoff = INITIAL_BACKOFF_NANOS;
while (backoff < MAX_BACKOFF_NANOS) {
LockSupport.parkNanos(backoff);
if(sync.tryAcquireShared(1) >= 0) {
// If we can acquire immediately, it means there is no
// contention and we can proceed, even if we are not the
// preferred actor
return;
}
backoff = Math.min(backoff * 2, MAX_BACKOFF_NANOS);
}
// If we've reached max backoff, queue normally
sync.acquireShared(1);
}
}
@Override
public void lockInterruptibly() throws InterruptedException {
if(bias == Bias.READS) {
sync.acquireSharedInterruptibly(1);
}
else {
Thread.yield();
long backoff = INITIAL_BACKOFF_NANOS;
while (backoff < MAX_BACKOFF_NANOS) {
if(Thread.currentThread().isInterrupted()) {
throw new InterruptedException();
}
else if(sync.tryAcquireShared(1) >= 0) {
// If we can acquire immediately, it means there is no
// contention and we can proceed, even if we are not the
// preferred actor
return;
}
else {
// If there is condition, backoff exponentially until we
// reach a defined threshold so that we aren't starved
// forever
LockSupport.parkNanos(backoff);
backoff = Math.min(backoff * 2, MAX_BACKOFF_NANOS);
}
}
// If we've reached max backoff, queue normally
sync.acquireSharedInterruptibly(1);
}
}
@Override
public Condition newCondition() {
throw new UnsupportedOperationException(
"Conditions not supported on read lock");
}
@Override
public boolean tryLock() {
if(bias != Bias.READS) {
Thread.yield();
}
return sync.tryAcquireShared(1) >= 0;
}
@Override
public boolean tryLock(long time, TimeUnit unit)
throws InterruptedException {
long timeout = unit.toNanos(time);
if(bias == Bias.READS) {
return sync.tryAcquireSharedNanos(1, timeout);
}
else {
long deadline = System.nanoTime() + timeout;
Thread.yield();
long remaining = deadline - System.nanoTime();
long backoff = INITIAL_BACKOFF_NANOS;
while (remaining > 0 && backoff < MAX_BACKOFF_NANOS) {
if(Thread.currentThread().isInterrupted()) {
throw new InterruptedException();
}
else if(sync.tryAcquireShared(1) >= 0) {
// If we can acquire immediately, it means there is no
// contention and we can proceed, even if we are not the
// preferred actor
return true;
}
else {
// If there is condition, backoff exponentially (up to
// starvation threshhold) while staying within the
// deadline.
long parkTime = Math.min(backoff, remaining);
if(parkTime > 0) {
LockSupport.parkNanos(parkTime);
backoff = Math.min(backoff * 2, MAX_BACKOFF_NANOS);
}
remaining = deadline - System.nanoTime();
}
}
if(remaining > 0 && sync.tryAcquireSharedNanos(1, remaining)) {
// The deadline is longer than our starvation threshold, but
// we were able to acquire within the remaining time
return true;
}
else {
return false;
}
}
}
@Override
public void unlock() {
sync.releaseShared(1);
}
}
/**
* Core synchronizer using AQS state: high bits for readers, low bits for
* writer.
*/
private class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1L;
/**
* Threshold for the number of consecutive reader acquisitions before
* forcing readers to queue when writers are waiting. This prevents
* writer starvation in read-biased locks under high read contention.
*/
private static final int WRITER_STARVATION_THRESHOLD = 5;
/**
* Counter for tracking consecutive reader acquisitions to prevent
* writer starvation.
*/
private volatile int consecutiveReaderCount = 0;
@Override
protected boolean isHeldExclusively() {
return getExclusiveOwnerThread() == Thread.currentThread();
}
@Override
protected boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
if(c != 0) {
// These locks are non-reentrant, so if any hold exists
// acquisition cannot occur
return false;
}
else if(bias == Bias.READS && hasQueuedPredecessors()) {
// When there is read bias, an exclusive acquisition attempt
// cannot happen if others are in the queue ahead of this
// thread. If the predecessors are writers, this just enforces
// FIFO so writes can't barge ahead of others.
return false;
}
else if(firstQueuedIsExclusive()) {
// If the next queued thread is a writer, let them go forward
// and allow this thread to be placed at the end of the queue.
return false;
}
else if(compareAndSetState(c, acquires)) {
// Reset consecutive reader count when a writer acquires the
// lock
consecutiveReaderCount = 0;
setExclusiveOwnerThread(current);
return true;
}
else {
return false;
}
}
@Override
protected int tryAcquireShared(int $) {
for (;;) {
int c = getState();
if(exclusiveCount(c) != 0) { // A writer is holding
return -1;
}
else if(bias == Bias.WRITES
&& hasQueuedExclusivePredecessors()) {
// When there is a write bias, do not allow this read to
// acquire while writes are ahead of this thread in the
// queue
return -1;
}
else if(readerShouldQueue()) {
// Force readers to queue when writers have been waiting too
// long
return -1;
}
else {
int r = sharedCount(c);
if(r == MAX_COUNT) {
throw new Error("Maximum lock count exceeded");
}
int next = c + SHARED_UNIT;
if(compareAndSetState(c, next)) {
consecutiveReaderCount++;
return 1;
}
}
}
}
@Override
protected boolean tryRelease(int $) {
if(getExclusiveOwnerThread() != Thread.currentThread()) {
throw new IllegalMonitorStateException();
}
setExclusiveOwnerThread(null);
setState(0); // Non-reentrant, so always release fully
return true;
}
@Override
protected boolean tryReleaseShared(int $) {
for (;;) {
int c = getState();
int r = sharedCount(c);
if(r == 0) {
throw new IllegalMonitorStateException(
"Read lock not held");
}
int next = c - SHARED_UNIT;
if(compareAndSetState(c, next)) {
return (next >>> SHARED_SHIFT) == 0;
}
}
}
Condition newCondition() {
return new ConditionObject();
}
/**
* Determines if the apparent first queued thread is waiting for
* exclusive (write) access.
* <p>
* This method examines the wait queue to determine if the next thread
* in line is attempting to acquire the lock in exclusive mode. This
* information is used to implement the biasing policy, allowing write
* operations to proceed when appropriate even if they are not at the
* head of the queue.
* </p>
* <p>
* The method uses {@link sun.misc.Unsafe} to directly access the
* internal queue structure of {@link AbstractQueuedSynchronizer} for
* improved performance.
* </p>
*
* @return {@code true} if the next queued thread is waiting for
* exclusive access, {@code false} otherwise
*/
private boolean firstQueuedIsExclusive() {
try {
// Get the head node
Object headNode = unsafe.getObject(this, HEAD_OFFSET);
if(headNode == null) {
// queue is empty
return false;
}
// Get head.next
Object first = unsafe.getObject(headNode, NEXT_OFFSET);
if(first == null) {
// no real nodes enqueued yet
return false;
}
// Determine who "owns" that node?
Thread owner = (Thread) unsafe.getObject(first, THREAD_OFFSET);
if(owner == Thread.currentThread()) {
// if it's *you* at the head, let yourself through
return false;
}
// Inspect its nextWaiter field
Object waitMode = unsafe.getObject(first, NEXT_WAIT_OFFSET);
// SHARED marker means this is a reader; anything else is an
// exclusive waiter
return waitMode != SHARED;
}
catch (Exception e1) {
Logger.error("", e1);
return false;
}
}
/**
* Checks if there are any exclusive (write) mode waiters ahead of the
* current thread in the queue.
*
* <p>
* IMPORTANT: This method uses {@link sun.misc.Unsafe} to directly
* access the internal queue structure of
* {@link AbstractQueuedSynchronizer}. This approach is
* brittle and may break with Java upgrades or JVM changes, as it
* depends on the internal implementation details of AQS.
* </p>
*
* @return true if there are exclusive mode waiters ahead in the queue
*/
private boolean hasQueuedExclusivePredecessors() {
try {
// Get the head node of the wait queue
// This is a direct field access to avoid method call overhead
Object h = unsafe.getObject(this, HEAD_OFFSET);
if(h == null)
return false;
// Traverse the queue starting from head.next
// We're looking for any exclusive mode nodes (writers)
Object p = unsafe.getObject(h, NEXT_OFFSET);
while (p != null) {
// Check if this node is in exclusive mode by examining its
// nextWaiter field
// In AQS implementation, shared mode nodes have nextWaiter
// == SHARED
Object mode = unsafe.getObject(p, NEXT_WAIT_OFFSET);
if(mode != SHARED) {
return true; // Found a writer-mode node
}
// Move to the next node in the queue
p = unsafe.getObject(p, NEXT_OFFSET);
}
return false; // No exclusive mode waiters found
}
catch (Exception e) {
// If our unsafe approach fails, fall back to the standard
// method This is less efficient but safer
Logger.error("", e);
return getExclusiveQueuedThreads().size() > 0;
}
}
/**
* Determines if a reader should queue instead of immediately acquiring
* the lock.
* This method helps prevent writer starvation by forcing readers to
* queue
* when there have been too many consecutive reader acquisitions and
* there are
* writers waiting.
*
* @return {@code true} if the reader should queue, {@code false} if it
* can
* proceed with acquisition
*/
private boolean readerShouldQueue() {
// Only force readers to queue if:
// 1. There have been too many consecutive reader acquisitions
// 2. There is at least one writer waiting in the queue
return consecutiveReaderCount >= WRITER_STARVATION_THRESHOLD
&& hasQueuedExclusivePredecessors();
}
}
/**
* Write lock with adaptive back-off for non-preferred actors
*/
private class WriteLock implements Lock {
@Override
public void lock() {
if(bias == Bias.WRITES) {
sync.acquire(1);
}
else {
long backoff = INITIAL_BACKOFF_NANOS;
while (backoff < MAX_BACKOFF_NANOS) {
LockSupport.parkNanos(backoff); // TODO: interrupt?
if(sync.tryAcquire(1)) {
// If we can acquire immediately, it means there is no
// contention and we can proceed, even if we are not the
// preferred actor
return;
}
backoff = Math.min(backoff * 2, MAX_BACKOFF_NANOS);
}
// If we've reached max backoff, queue normally
sync.acquire(1);
}
}
@Override
public void lockInterruptibly() throws InterruptedException {
if(bias == Bias.WRITES) {
sync.acquireInterruptibly(1);
}
else {
Thread.yield();
long backoff = INITIAL_BACKOFF_NANOS;
while (backoff < MAX_BACKOFF_NANOS) {
if(Thread.currentThread().isInterrupted()) {
throw new InterruptedException();
}
else if(sync.tryAcquire(1)) {
// If we can acquire immediately, it means there is no
// contention and we can proceed, even if we are not the
// preferred actor
return;
}
else {
// If there is condition, backoff exponentially until we
// reach a defined threshold so that we aren't starved
// forever
LockSupport.parkNanos(backoff);
backoff = Math.min(backoff * 2, MAX_BACKOFF_NANOS);
}
}
// If we've reached max backoff, queue normally
sync.acquireInterruptibly(1);
}
}
@Override
public Condition newCondition() {
return sync.newCondition();
}
@Override
public boolean tryLock() {
if(bias != Bias.WRITES) {
Thread.yield();
}
return sync.tryAcquire(1);
}
@Override
public boolean tryLock(long time, TimeUnit unit)
throws InterruptedException {
long timeout = unit.toNanos(time);
if(bias == Bias.WRITES) {
return sync.tryAcquireNanos(1, timeout);
}
else {
long deadline = System.nanoTime() + timeout;
Thread.yield();
long remaining = deadline - System.nanoTime();
long backoff = INITIAL_BACKOFF_NANOS;
while (remaining > 0 && backoff < MAX_BACKOFF_NANOS) {
if(Thread.currentThread().isInterrupted()) {
throw new InterruptedException();
}
else if(sync.tryAcquire(1)) {
// If we can acquire immediately, it means there is no
// contention and we can proceed, even if we are not the
// preferred actor
return true;
}
else {
// If there is condition, backoff exponentially (up to
// starvation threshhold) while staying within the
// deadline.
long parkTime = Math.min(backoff, remaining);
if(parkTime > 0) {
LockSupport.parkNanos(parkTime);
backoff = Math.min(backoff * 2, MAX_BACKOFF_NANOS);
}
remaining = deadline - System.nanoTime();
}
}
if(remaining > 0 && sync.tryAcquireNanos(1, remaining)) {
// The deadline is longer than our starvation threshold, but
// we were able to acquire within the remaining time
return true;
}
else {
return false;
}
}
}
@Override
public void unlock() {
sync.release(1);
}
}
}/*
* Copyright (c) 2013-2025 Cinchapi Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.cinchapi.concourse.server.concurrent;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
/**
*
*
* @author jeff
*/
public abstract class AbstractPreferentialLockTest {
/**
* Factory method to create a lock that gives preference to readers.
*/
protected abstract ReadWriteLock createReadPriorityLock();
/**
* Factory method to create a lock that gives preference to writers.
*/
protected abstract ReadWriteLock createWritePriorityLock();
/**
* Executor for running concurrent tasks.
*/
private ExecutorService executor;
@Before
public void setup() {
executor = Executors.newCachedThreadPool();
}
/**
* Verify that readers jump ahead of a queued writer up to the threshold.
*/
@Test
public void testReadersJumpAheadUntilThreshold() throws Exception {
ReadWriteLock lock = createReadPriorityLock();
int threshold = 5;
// Pre-acquire write lock to force queuing of subsequent readers/writer
lock.writeLock().lock();
CountDownLatch start = new CountDownLatch(1);
ConcurrentLinkedQueue<String> order = new ConcurrentLinkedQueue<>();
// Schedule threshold+1 readers
for (int i = 0; i < threshold + 1; i++) {
final int idx = i;
executor.submit(() -> {
try {
start.await();
lock.readLock().lock();
order.add("R" + idx);
lock.readLock().unlock();
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
// And one writer
executor.submit(() -> {
try {
start.await();
lock.writeLock().lock();
order.add("W");
lock.writeLock().unlock();
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// Release initial write lock and start
lock.writeLock().unlock();
start.countDown();
executor.shutdown();
executor.awaitTermination(5, TimeUnit.SECONDS);
// First 'threshold' entries must be readers
for (int i = 0; i < threshold; i++) {
Assert.assertEquals("R" + i, order.poll());
}
// Then writer, then last reader
Assert.assertEquals("W", order.poll());
Assert.assertEquals("R" + threshold, order.poll());
}
@Test
public void testDeterministicBias() throws Exception {
// Create a lock with write bias
ReadWriteLock lock = createWritePriorityLock();
// 1) Pre-acquire write lock so nobody can get in immediately
lock.writeLock().lock();
// Create a latch to control when threads start competing
CountDownLatch startLatch = new CountDownLatch(1);
// Track acquisition order
ConcurrentLinkedQueue<String> acquisitionOrder = new ConcurrentLinkedQueue<>();
// Create a reader thread that will compete for the lock
Thread reader = new Thread(() -> {
try {
startLatch.await();
lock.readLock().lock();
acquisitionOrder.add("R");
lock.readLock().unlock();
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// Create a writer thread that will compete for the lock
Thread writer = new Thread(() -> {
try {
startLatch.await();
lock.writeLock().lock();
acquisitionOrder.add("W");
lock.writeLock().unlock();
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// Start the threads
reader.start();
writer.start();
// Release the pre-acquired lock to allow competition
Thread.sleep(10); // Small delay to ensure threads are waiting
// 3) Now let them contend by releasing the pre‑acquired write lock
lock.writeLock().unlock();
// Let the threads compete for the lock
startLatch.countDown();
// Wait for threads to complete
reader.join(1000);
writer.join(1000);
// With write bias, the writer should acquire first
Assert.assertEquals("W", acquisitionOrder.poll());
Assert.assertEquals("R", acquisitionOrder.poll());
}
/**
* Ensure no deferral when lock is free.
*/
@Test
public void testNoDeferralWhenFree() {
ReadWriteLock rp = createReadPriorityLock();
long t0 = System.nanoTime();
rp.writeLock().lock();
long dt = System.nanoTime() - t0;
rp.writeLock().unlock();
Assert.assertTrue(dt < TimeUnit.MILLISECONDS.toNanos(1));
ReadWriteLock wp = createWritePriorityLock();
t0 = System.nanoTime();
wp.readLock().lock();
dt = System.nanoTime() - t0;
wp.readLock().unlock();
Assert.assertTrue(dt < TimeUnit.MILLISECONDS.toNanos(1));
}
/**
* Shared unlock without a prior lock should throw.
*/
@Test(expected = IllegalMonitorStateException.class)
public void testSharedUnlockWithoutLockThrows() {
createReadPriorityLock().readLock().unlock();
}
/**
* Exclusive unlock without a prior lock should throw.
*/
@Test(expected = IllegalMonitorStateException.class)
public void testExclusiveUnlockWithoutLockThrows() {
createWritePriorityLock().writeLock().unlock();
}
/**
* Multiple readers should be allowed concurrently.
*/
@Test
public void testConcurrentReaders() throws Exception {
ReadWriteLock lock = createReadPriorityLock();
int n = 10;
CountDownLatch start = new CountDownLatch(1);
AtomicInteger concurrent = new AtomicInteger();
AtomicInteger max = new AtomicInteger();
List<Future<?>> futures = new ArrayList<>();
for (int i = 0; i < n; i++) {
futures.add(executor.submit(() -> {
try {
start.await();
lock.readLock().lock();
int c = concurrent.incrementAndGet();
max.updateAndGet(m -> Math.max(m, c));
Thread.sleep(50);
concurrent.decrementAndGet();
lock.readLock().unlock();
}
catch (Exception e) {
Thread.currentThread().interrupt();
}
}));
}
start.countDown();
for (Future<?> f : futures)
f.get();
Assert.assertTrue(max.get() > 1);
}
/**
* Writers must be mutually exclusive.
*/
@Test
public void testExclusiveWriters() throws Exception {
ReadWriteLock lock = createWritePriorityLock();
int n = 5;
CountDownLatch start = new CountDownLatch(1);
AtomicInteger concurrent = new AtomicInteger();
AtomicInteger max = new AtomicInteger();
List<Future<?>> futures = new ArrayList<>();
for (int i = 0; i < n; i++) {
futures.add(executor.submit(() -> {
try {
start.await();
lock.writeLock().lock();
int c = concurrent.incrementAndGet();
max.updateAndGet(m -> Math.max(m, c));
Thread.sleep(50);
concurrent.decrementAndGet();
lock.writeLock().unlock();
}
catch (Exception e) {
Thread.currentThread().interrupt();
}
}));
}
start.countDown();
for (Future<?> f : futures)
f.get();
Assert.assertEquals(1, max.get());
}
/**
* Readers cannot be starved indefinitely in write-priority locks.
*/
@Test
public void testNoStarvation() throws Exception {
ReadWriteLock lock = createWritePriorityLock();
int writers = 3;
int readers = 1;
CountDownLatch done = new CountDownLatch(writers + readers);
AtomicReference<Boolean> readerAcquired = new AtomicReference<>(false);
// flood writers
for (int i = 0; i < writers; i++) {
executor.submit(() -> {
for (int j = 0; j < 5; j++) {
lock.writeLock().lock();
lock.writeLock().unlock();
}
done.countDown();
});
}
// one reader
executor.submit(() -> {
lock.readLock().lock();
readerAcquired.set(true);
lock.readLock().unlock();
done.countDown();
});
done.await();
Assert.assertTrue(readerAcquired.get());
}
/**
* Subclasses must return a fresh instance of their ReadWriteLock
* implementation.
*/
protected abstract ReadWriteLock getLock();
/**
* Multiple readers should be allowed concurrently.
*/
@Test
public void testMultipleReaders() throws InterruptedException {
final ReadWriteLock lock = getLock();
final int threadCount = 20;
final CountDownLatch latch = new CountDownLatch(threadCount);
final AtomicInteger concurrentReaders = new AtomicInteger(0);
final AtomicInteger maxConcurrentReaders = new AtomicInteger(0);
Runnable reader = new Runnable() {
@Override
public void run() {
lock.readLock().lock();
try {
// Track the number of threads holding the read lock
// simultaneously
int current = concurrentReaders.incrementAndGet();
maxConcurrentReaders
.updateAndGet(max -> Math.max(max, current));
latch.countDown();
Thread.sleep(100);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
finally {
concurrentReaders.decrementAndGet();
lock.readLock().unlock();
}
}
};
// Create and start 20 reader threads
List<Thread> threads = new ArrayList<>();
for (int i = 0; i < threadCount; i++) {
Thread t = new Thread(reader);
threads.add(t);
t.start();
}
// Wait for all threads to acquire the read lock
boolean allAcquired = latch.await(1000, TimeUnit.MILLISECONDS);
Assert.assertTrue("All read locks should be acquired concurrently",
allAcquired);
Assert.assertEquals("All threads should hold read lock simultaneously",
threadCount, maxConcurrentReaders.get());
System.out.println(maxConcurrentReaders.get());
// Wait for all threads to complete
for (Thread t : threads) {
t.join(500);
}
}
/**
* Only one writer at a time; writers must exclude one another.
*/
@Test
public void testWriteLockIsExclusive() throws InterruptedException {
final ReadWriteLock lock = getLock();
final CountDownLatch firstWriter = new CountDownLatch(1);
final CountDownLatch releaseFirst = new CountDownLatch(1);
Thread writer1 = new Thread(new Runnable() {
@Override
public void run() {
lock.writeLock().lock();
try {
firstWriter.countDown();
releaseFirst.await();
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
finally {
lock.writeLock().unlock();
}
}
});
writer1.start();
Assert.assertTrue("First writer should acquire the write lock",
firstWriter.await(500, TimeUnit.MILLISECONDS));
final boolean[] secondGotIt = { false };
Thread writer2 = new Thread(new Runnable() {
@Override
public void run() {
lock.writeLock().lock();
try {
secondGotIt[0] = true;
}
finally {
lock.writeLock().unlock();
}
}
});
writer2.start();
Thread.sleep(100);
Assert.assertFalse("Second writer must wait", secondGotIt[0]);
releaseFirst.countDown();
writer1.join();
writer2.join(500);
Assert.assertTrue("Second writer should acquire after release",
secondGotIt[0]);
}
/**
* A held write lock must block new readers.
*/
@Test
public void testWriteLockBlocksReaders() throws InterruptedException {
final ReadWriteLock lock = getLock();
final CountDownLatch writerStarted = new CountDownLatch(1);
final CountDownLatch releaseWriter = new CountDownLatch(1);
final boolean[] readerGotIt = { false };
Thread writer = new Thread(new Runnable() {
@Override
public void run() {
lock.writeLock().lock();
try {
writerStarted.countDown();
releaseWriter.await();
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
finally {
lock.writeLock().unlock();
}
}
});
writer.start();
Assert.assertTrue("Writer must hold lock",
writerStarted.await(500, TimeUnit.MILLISECONDS));
Thread reader = new Thread(new Runnable() {
@Override
public void run() {
lock.readLock().lock();
try {
readerGotIt[0] = true;
}
finally {
lock.readLock().unlock();
}
}
});
reader.start();
Thread.sleep(100);
Assert.assertFalse("Reader must be blocked by writer", readerGotIt[0]);
releaseWriter.countDown();
writer.join();
reader.join(500);
Assert.assertTrue("Reader should proceed after writer releases",
readerGotIt[0]);
}
/**
* A held read lock must block new writers.
*/
@Test
public void testWriteLockBlockedByReadLock() throws InterruptedException {
final ReadWriteLock lock = getLock();
final CountDownLatch readerStarted = new CountDownLatch(1);
final CountDownLatch releaseReader = new CountDownLatch(1);
final boolean[] writerGotIt = { false };
Thread reader = new Thread(new Runnable() {
@Override
public void run() {
lock.readLock().lock();
try {
readerStarted.countDown();
releaseReader.await();
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
finally {
lock.readLock().unlock();
}
}
});
reader.start();
Assert.assertTrue("Reader must hold lock",
readerStarted.await(500, TimeUnit.MILLISECONDS));
Thread writer = new Thread(new Runnable() {
@Override
public void run() {
lock.writeLock().lock();
try {
writerGotIt[0] = true;
}
finally {
lock.writeLock().unlock();
}
}
});
writer.start();
Thread.sleep(100);
Assert.assertFalse("Writer must be blocked by reader", writerGotIt[0]);
releaseReader.countDown();
reader.join();
writer.join(500);
Assert.assertTrue("Writer should proceed after reader releases",
writerGotIt[0]);
}
/**
* tryLock() on write lock should succeed when free.
*/
@Test
public void testTryWriteLockWithoutReadLock() {
final ReadWriteLock lock = getLock();
Lock w = lock.writeLock();
boolean got = w.tryLock();
try {
Assert.assertTrue("tryLock should succeed when free", got);
}
finally {
if(got) {
w.unlock();
}
}
}
/**
* tryLock() on write lock should fail if this thread already holds a read
* lock.
*/
@Test
public void testTryWriteLockWithReadLock() {
final ReadWriteLock lock = getLock();
lock.readLock().lock();
try {
boolean got = lock.writeLock().tryLock();
if(got) {
lock.writeLock().unlock();
}
Assert.assertFalse("tryLock write should fail under held read lock",
got);
}
finally {
lock.readLock().unlock();
}
}
/**
* tryLock() on read lock should succeed when free.
*/
@Test
public void testReadLockTryLock() {
final ReadWriteLock lock = getLock();
Lock r = lock.readLock();
boolean got = r.tryLock();
try {
Assert.assertTrue("tryLock read should succeed when free", got);
}
finally {
if(got) {
r.unlock();
}
}
}
/**
* tryLock() on read lock should fail if a writer holds the lock.
*/
@Test
public void testTryReadLockWithWriteLock() {
final ReadWriteLock lock = getLock();
lock.writeLock().lock();
try {
boolean got = lock.readLock().tryLock();
if(got) {
lock.readLock().unlock();
}
Assert.assertFalse(
"tryLock read should fail while writer holds lock", got);
}
finally {
lock.writeLock().unlock();
}
}
}/*
* Copyright (c) 2013-2025 Cinchapi Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.cinchapi.concourse.server.concurrent;
import java.util.concurrent.locks.ReadWriteLock;
/**
*
*
* @author jeff
*/
public class BiasedLockTest extends AbstractPreferentialLockTest {
/*
* (non-Javadoc)
*
* @see
* com.cinchapi.concourse.server.concurrent.AbstractPreferentialLockTest#
* createReadPriorityLock()
*/
@Override
protected ReadWriteLock createReadPriorityLock() {
return BiasedLock.favoringReads();
}
/*
* (non-Javadoc)
*
* @see
* com.cinchapi.concourse.server.concurrent.AbstractPreferentialLockTest#
* createWritePriorityLock()
*/
@Override
protected ReadWriteLock createWritePriorityLock() {
return BiasedLock.favoringWrites();
}
@Override
protected ReadWriteLock getLock() {
return BiasedLock.create();
}
}Metadata
Metadata
Assignees
Labels
No labels