更新时间: 2020-03-31 14:08:03       分类: 学习笔记


BlockingQueue介绍

BlockingQueue是一个支持两个附加操作的队列,他拥有和CSP并发模型中channel类似的特性:当队列满的时候,写入元素的线程会被阻塞;当队列为空的时候,读取元素的线程会被阻塞。

队列的大小可以由开发者自己决定,不同的队列大小,以及不同的读写者数量都会最终影响最终并发运行的效果。

  1. 主要API:

    一般来说使用BlockingQueue时,关注点在put(e)和take()这两个方法上,因为他们提供了“阻塞”的特性。

  2. 使用时需要注意的特性

    - 首先,BlockingQueue拒绝接受null值,所以放入null值时会立刻跑出NullPointer的异常 - BlockingQueue的实现是线程安全的,但是批量的操作诸如addAll()等不一定是原子操作,有可能在放入一部分元素后出现异常,这时候已经放入的元素不会被回滚出来。 - BlockingQueue的支持多线程的消费者和生产者场景,可以安心使用。

BlockingQueue实现原理

现在来探究一下BlockingQueue是如何实现。

按照最直观的想法,其内部一定是一个加了锁的队列数据结构。但关键是,这个锁要怎么加,如何实现多个读写者时的线程安全?

阅读源码,发现BlockingQueue使用了Condition来实现这一点。

Condition第一眼看过去,和Object的wait()notify()操作很像,同时又非常接近信号量的概念。但Condition最大的亮点就是,它由可重入锁来产生,而一个锁可以产生多个不同的Condition,这就使得在需要处理多种线程等待条件的时候,不必建立过多的共享变量和锁,从而使代码更简洁,性能更好。

接下来看官方所提供的例子,顺便一提ArrayListBlockingQueue实现思路和这个例子几乎一模一样:

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

class BoundedBuffer {
    final Lock lock = new ReentrantLock();
    // condition 依赖于 lock 来产生
    final Condition notFull = lock.newCondition();
    final Condition notEmpty = lock.newCondition();

    final Object[] items = new Object[100];
    int putptr, takeptr, count;

    // 生产
    public void put(Object x) throws InterruptedException {
        lock.lock();
        try {
            while (count == items.length)
                notFull.await();  // 队列已满,等待,直到 not full 才能继续生产
            items[putptr] = x;
            if (++putptr == items.length) putptr = 0;
            ++count;
            notEmpty.signal(); // 生产成功,队列已经 not empty 了,发个通知出去
        } finally {
            lock.unlock();
        }
    }

    // 消费
    public Object take() throws InterruptedException {
        lock.lock();
        try {
            while (count == 0)
                notEmpty.await(); // 队列为空,等待,直到队列 not empty,才能继续消费
            Object x = items[takeptr];
            if (++takeptr == items.length) takeptr = 0;
            --count;
            notFull.signal(); // 被我消费掉一个,队列 not full 了,发个通知出去
            return x;
        } finally {
            lock.unlock();
        }
    }
}

先明确继承关系和调用链:Condition是由ReentrantLock创建的,而ReentrantLock是依赖AbstractQueuedSynchronizer(下文简称AQS)实现的。只有拿到了ReentrantLock的对象,才能去使用Condition。

所以我们得先明确ReentrantLock是如何处理多个线程的竞争的,它也是通过一个队列来实现的,看下图,不过分讨论细节:

其中head和node都是对Thread的一个封装,head就是当前占有锁的线程,而之后再有线程尝试去占有锁的时候,就会在后面的阻塞队列里排队了。

调度则有两种方式,公平和不公平,不公平的锁吞吐量会高一些,但是可能会存在某个线程长期处于饥饿状态的情况,处于性能考虑,默认还是会使用不公平的锁。

了解了AQS里阻塞队列的概念之后,再来看Condition的实现,就会好理解的多了。简单的来说,就是在线程排队进入阻塞队列之前,又添加了一个队列,姑且把这个队列称为条件队列,那么上面示例代码的实际排队结构就是下面这个样子的:

首先,每个Condition(我觉得可以在这里可以理解成信号量)都拥有一个自己的单向条件队列。调用await()时,就是把当前线程包装成Node后加入到该Condition的条件队列中去,然后阻塞住,不继续执行。调用signal()方法时,则是将对应Condition条件队列中的firstWaiter扔进Lock的阻塞队列队尾去等待锁资源。

了解了AQS和Condition之后,BlockingQueue的实现就不难理解了,只要利用两个Condition控制队列的Full和Empty状态,就可以很方便的实现一个多读写者的队列结构。

和Golang的Channel做个对比

之前我一直认为Golang的Channel和BlockingQueue是一样的结构,但在实际使用后发现两者还是存在着一些不同。

最直观的来看,Golang的Channel在读写方拥有更多的api,诸如close, range, select,而不仅仅是单一的put和take。

就实现而言,golang整体上要更简洁一些,下图是channel的内存布局模型

Hchan结构体后面的连续内存区域是以环形队列方式实现的一个队列,用于储存channel中缓存的数据。 而结构体内部维护了两个队列,recvq和sendq,用于存放生产者和消费者的goroutine。

再来看一同步读写的场景(对应BlockingQueue大小为1的情景),在写入channel前先检查recvq中有没有等待读channel的routine,如果有则从recvq中将其出队。这时候调度器会直接把要写入的数据ep复制给对应的routine。

最后唤醒接受的routine,然后将其放入本地等待运行队列中等待调度器的调度。

源码如下:

func chansend(t *chantype, c *hchan, ep unsafe.Pointer, 
block bool, callerpc uintptr) bool {
...
    lock(&c.lock)
    if c.dataqsiz == 0 { // synchronous channel
        sg := c.recvq.dequeue()
        if sg != nil { // found a waiting receiver
            unlock(&c.lock)
            recvg := sg.g
            syncsend(c, sg, ep)
            goready(recvg, 3)
            return true
        }
        // no receiver available: block on this channel.
        mysg := acquireSudog()
        mysg.elem = ep  
        c.sendq.enqueue(mysg)
        goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3)
        // someone woke us up.
        releaseSudog(mysg)
        return true
    }
}

评论

还没有评论