Java多线程Queue、BlockingQueue和使用BlockingQueue实现生产消费者模型方法解析
Queue是什么
公司主营业务:网站建设、成都做网站、移动网站开发等业务。帮助企业客户真正实现互联网宣传,提高企业的竞争能力。创新互联是一支青春激扬、勤奋敬业、活力青春激扬、勤奋敬业、活力澎湃、和谐高效的团队。公司秉承以“开放、自由、严谨、自律”为核心的企业文化,感谢他们对我们的高要求,感谢他们从不同领域给我们带来的挑战,让我们激情的团队有机会用头脑与智慧不断的给客户带来惊喜。创新互联推出文圣免费做网站回馈大家。
队列,是一种数据结构。除了优先级队列和LIFO队列外,队列都是以FIFO(先进先出)的方式对各个元素进行排序的。无论使用哪种排序方式,队列的头都是调用remove()或poll()移除元素的。在FIFO队列中,所有新元素都插入队列的末尾。
Queue中的方法
Queue中的方法不难理解,6个,每2对是一个也就是总共3对。看一下JDKAPI就知道了:
注意一点就好,Queue通常不允许插入Null,尽管某些实现(比如LinkedList)是允许的,但是也不建议。
BlockingQueue
1、BlockingQueue概述
BlockingQueue也是java.util.concurrent下的主要用来控制线程同步的工具。
BlockingQueue有四个具体的实现类,根据不同需求,选择不同的实现类
1、ArrayBlockingQueue:一个由数组支持的有界阻塞队列,规定大小的BlockingQueue,其构造函数必须带一个int参数来指明其大小.其所含的对象是以FIFO(先入先出)顺序排序的。
2、LinkedBlockingQueue:大小不定的BlockingQueue,若其构造函数带一个规定大小的参数,生成的BlockingQueue有大小限制,若不带大小参数,所生成的BlockingQueue的大小由Integer.MAX_VALUE来决定.其所含的对象是以FIFO(先入先出)顺序排序的。
3、PriorityBlockingQueue:类似于LinkedBlockQueue,但其所含对象的排序不是FIFO,而是依据对象的自然排序顺序或者是构造函数的Comparator决定的顺序。
4、SynchronousQueue:特殊的BlockingQueue,对其的操作必须是放和取交替完成的。
LinkedBlockingQueue可以指定容量,也可以不指定,不指定的话,默认最大是Integer.MAX_VALUE,其中主要用到put和take方法,put方法在队列满的时候会阻塞直到有队列成员被消费,take方法在队列空的时候会阻塞,直到有队列成员被放进来。
讲BlockingQueue,因为BlockingQueue是Queue中的一个重点,并且通过BlockingQueue我们再次加深对于生产者/消费者模型的理解。其他的Queue都不难,通过查看JDKAPI和简单阅读源码完全可以理解他们的作用。
BlockingQueue,顾名思义,阻塞队列。BlockingQueue是在java.util.concurrent下的,因此不难理解,BlockingQueue是为了解决多线程中数据高效安全传输而提出的。
多线程中,很多场景都可以使用队列实现,比如经典的生产者/消费者模型,通过队列可以便利地实现两者之间数据的共享,定义一个生产者线程,定义一个消费者线程,通过队列共享数据就可以了。
当然现实不可能都是理想的,比如消费者消费速度比生产者生产的速度要快,那么消费者消费到一定程度上的时候,必须要暂停等待一下了(使消费者线程处于WAITING状态)。BlockingQueue的提出,就是为了解决这个问题的,他不用程序员去控制这些细节,同时还要兼顾效率和线程安全。
阻塞队列所谓的"阻塞",指的是某些情况下线程会挂起(即阻塞),一旦条件满足,被挂起的线程又会自动唤醒。使用BlockingQueue,不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,这些内容BlockingQueue都已经做好了
2、BlockingQueue中的方法
BlockingQueue既然是Queue的子接口,必然有Queue中的方法,上面已经列了。看一下BlockingQueue中特有的方法:
(1)voidput(Ee)throwsInterruptedException
把e添加进BlockingQueue中,如果BlockingQueue中没有空间,则调用线程被阻塞,进入等待状态,直到BlockingQueue中有空间再继续
(2)voidtake()throwsInterruptedException
取走BlockingQueue里面排在首位的对象,如果BlockingQueue为空,则调用线程被阻塞,进入等待状态,直到BlockingQueue有新的数据被加入
(3)intdrainTo(Collection<?superE>c,intmaxElements)
一次性取走BlockingQueue中的数据到c中,可以指定取的个数。通过该方法可以提升获取数据效率,不需要多次分批加锁或释放锁
3、ArrayBlockingQueue
基于数组的阻塞队列,必须指定队列大小。比较简单。ArrayBlockingQueue中只有一个ReentrantLock对象,这意味着生产者和消费者无法并行运行(见下面的代码)。另外,创建ArrayBlockingQueue时,可以指定ReentrantLock是否为公平锁,默认采用非公平锁。
/** Main lock guarding all access */ private final ReentrantLock lock; /** Condition for waiting takes */ private final Condition notEmpty; /** Condition for waiting puts */ private final Condition notFull;
4、LinkedBlockingQueue
基于链表的阻塞队列,和ArrayBlockingQueue差不多。不过LinkedBlockingQueue如果不指定队列容量大小,会默认一个类似无限大小的容量,之所以说是类似是因为这个无限大小是Integer.MAX_VALUE,这么说就好理解ArrayBlockingQueue为什么必须要制定大小了,如果ArrayBlockingQueue不指定大小的话就用Integer.MAX_VALUE,那将造成大量的空间浪费,但是基于链表实现就不一样的,一个一个节点连起来而已。另外,LinkedBlockingQueue生产者和消费者都有自己的锁(见下面的代码),这意味着生产者和消费者可以"同时"运行。
/** Lock held by take, poll, etc */ private final ReentrantLock takeLock = new ReentrantLock(); /** Wait queue for waiting takes */ private final Condition notEmpty = takeLock.newCondition(); /** Lock held by put, offer, etc */ private final ReentrantLock putLock = new ReentrantLock(); /** Wait queue for waiting puts */ private final Condition notFull = putLock.newCondition();
5、SynchronousQueue
比较特殊,一种没有缓冲的等待队列。什么叫做没有缓冲区,ArrayBlocking中有:
/** The queued items */ private final E[] items;
数组用以存储队列。LinkedBlockingQueue中有:
/** * Linked list node class */ static class Node{ /** The item, volatile to ensure barrier separating write and read */ volatile E item; Node next; Node(E x) { item = x; } }
将队列以链表形式连接。
生产者/消费者操作数据实际上都是通过这两个"中介"来操作数据的,但是SynchronousQueue则是生产者直接把数据给消费者(消费者直接从生产者这里拿数据),好像又回到了没有生产者/消费者模型的老办法了。换句话说,每一个插入操作必须等待一个线程对应的移除操作。SynchronousQueue又有两种模式:
1、公平模式
采用公平锁,并配合一个FIFO队列(Queue)来管理多余的生产者和消费者
2、非公平模式
采用非公平锁,并配合一个LIFO栈(Stack)来管理多余的生产者和消费者,这也是SynchronousQueue默认的模式
利用BlockingQueue实现生产者消费者模型
上一篇我们写的生产者消费者模型有局限,局限体现在:
缓冲区内只能存放一个数据,实际生产者/消费者模型中的缓冲区内可以存放大量生产者生产出来的数据
生产者和消费者处理数据的速度几乎一样
OK,我们就用BlockingQueue来简单写一个例子,并且让生产者、消费者处理数据速度不同。子类选择的是ArrayBlockingQueue,大小定为10:
public static void main(String[] args) { final BlockingQueuebq = new ArrayBlockingQueue (10); Runnable producerRunnable = new Runnable() { int i = 0; public void run() { while (true) { try { System.out.println("我生产了一个" + i++); bq.put(i + ""); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } }; Runnable customerRunnable = new Runnable() { public void run() { while (true) { try { System.out.println("我消费了一个" + bq.take()); Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } } } }; Thread producerThread = new Thread(producerRunnable); Thread customerThread = new Thread(customerRunnable); producerThread.start(); customerThread.start(); }
代码的做法是让生产者生产速度快于消费者消费速度的,看一下运行结果:
我生产了一个0 我消费了一个1 我生产了一个1 我生产了一个2 我消费了一个2 我生产了一个3 我生产了一个4 我生产了一个5 我消费了一个3 我生产了一个6 我生产了一个7 我生产了一个8 我消费了一个4 我生产了一个9 我生产了一个10 我生产了一个11 我消费了一个5 我生产了一个12 我生产了一个13 我生产了一个14 我消费了一个6 我生产了一个15 我生产了一个16 我消费了一个7 我生产了一个17 我消费了一个8 我生产了一个18
分两部分来看输出结果:
1、第1行~第23行。这块BlockingQueue未满,所以生产者随便生产,消费者随便消费,基本上都是生产3个消费1个,消费者消费速度慢
2、第24行~第27行,从前面我们可以看出,生产到16,消费到6,说明到了ArrayBlockingQueue的极限10了,这时候没办法,生产者生产一个ArrayBlockingQueue就满了,所以不能继续生产了,只有等到消费者消费完才可以继续生产。所以之后的打印内容一定是一个生产者、一个消费者
这就是前面一章开头说的"通过平衡生产者和消费者的处理能力来提高整体处理数据的速度",这给例子应该体现得很明显。另外,也不要担心非单一生产者/消费者场景下的系统假死问题,缓冲区空、缓冲区满的场景BlockingQueue都是定义了不同的Condition,所以不会唤醒自己的同类。
总结
以上就是本文关于Java多线程Queue、BlockingQueue和使用BlockingQueue实现生产消费者模型方法解析的全部内容,希望对大家有所帮助。感兴趣的朋友可以参阅本站:
浅谈Java多线程的优点及代码示例
浅谈Java多线程处理中Future的妙用(附源码)
Java利用future及时获取多线程运行结果
如有不足之处,欢迎留言指出。
分享题目:Java多线程Queue、BlockingQueue和使用BlockingQueue实现生产消费者模型方法解析
文章链接:http://cdiso.cn/article/ghpcjo.html