- Disruptor作为一个高性能的生产者-消费者队列系统,一个核心的设计:通过RingBuffer实现一个无锁队列
- Java里面的LinkedBlockingQueue,比起Disruptor的RingBuffer要慢很多,主要原因链表的数据在内存里面的布局对于高速缓存并不友好LinkedBlockingQueue对于锁的依赖一般来说消费者比生产者快(不然队列会堆积),因为大部分时候,队列是空的,生产者和消费者一样会产生竞争
- LinkedBlockingQueue的锁机制是通过ReentrantLock,需要JVM进行裁决锁的争夺,会把没有拿到锁的线程挂起等待,也需要进行一次上下文切换上下文切换的过程,需要把当前执行线程的寄存器等信息,保存到内存中的线程栈里面意味:已经加载到高速缓存里面的指令或者数据,又回到主内存里面,进一步拖慢性能
- 加锁很慢,所以Disruptor的解决方案是无锁(没有操作系统层面的锁)
- Disruptor利用了一个CPU硬件支持的指令,称之为CAS(Compare And Swap)
- Disruptor的RingBuffer创建一个Sequence对象,用来指向当前的RingBuffer的头和尾头和尾的标识,不是通过一个指针来实现的,而是通过一个序号
- RingBuffer在进行生产者和消费者之间的资源协调,采用的是对比序号的方式当生产者想要往队列里面加入新数据的时候,会把当前生产者的Sequence的序号,加上需要加入的新数据的数量然后和实际的消费者所在的位置进行对比,看下队列里是不是有足够的空间加入这些数据而不是直接覆盖掉消费者还没处理完的数据
- CAS指令,既不是基础库里的一个函数,也不是操作系统里面实现的一个系统调用,而是一个CPU硬件支持的机器指令在Intel CPU上,为cmpxchg指令:compxchg [ax] (隐式参数,EAX累加器), [bx] (源操作数地址), [cx] (目标操作数地址)第一个操作数不在指令里面出现,是一个隐式的操作数,即EAX累加寄存器里面的值第二个操作数就是源操作数,指令会对比这个操作数和上面EAX累加寄存器里面的值伪代码:IF [ax]== [bx] THEN [ZF] = 1, [bx] = [cx] ELSE [ZF] = 0, [ax] = [bx]单个指令是原子的,意味着使用CAS操作的时候,不需要单独进行加锁,直接调用即可
Sequence关键代码如下:
public long addAndGet(final long increment)
{
long currentValue;
long newValue;
// 如果CAS操作没有成功,会不断等待重试
do
{
currentValue = get();
newValue = currentValue increment;
}
while (!compareAndSet(currentValue, newValue));
return newValue;
}
public boolean compareAndSet(final long expectedValue, final long newValue)
{
// 调用CAS指令
return UNSAFE.compareAndSwapLong(this, VALUE_OFFSET, expectedValue, newValue);
}
Benchmark
互斥锁竞争、CAS乐观锁与无锁测试:
public class LockBenchmark {
private static final long MAX = 500_000_000L;
private static void runIncrement() {
long counter = 0;
long start = System.currentTimeMillis();
while (counter < MAX) {
counter ;
}
long end = System.currentTimeMillis();
System.out.println("Time spent is " (end - start) "ms without lock");
}
private static void runIncrementWithLock() {
Lock lock = new ReentrantLock();
long counter = 0;
long start = System.currentTimeMillis();
while (counter < MAX) {
if (lock.tryLock()) {
counter ;
lock.unlock();
}
}
long end = System.currentTimeMillis();
System.out.println("Time spent is " (end - start) "ms with lock");
}
private static void runIncrementAtomic() {
AtomicLong counter = new AtomicLong(0);
long start = System.currentTimeMillis();
while (counter.incrementAndGet() < MAX) {
}
long end = System.currentTimeMillis();
System.out.println("Time spent is " (end - start) "ms with cas");
}
public static void main(String[] args) {
runIncrement();
runIncrementWithLock();
runIncrementAtomic();
// Time spent is 153ms without lock
// Time spent is 7801ms with lock
// Time spent is 3164ms with cas
// 7801 / 153 ≈ 51
// 3164 / 153 ≈ 21
}
}得出