生产者消费者

1
2
1. Java生产者消费者的三种实现
https://blog.csdn.net/xindoo/article/details/80004003
1
2
3
4
5
6
7
Java生产者消费者是最基础的线程同步问题,java岗面试中还是很容易遇到的,之前没写过多线程的代码,面试中被问到很尬啊,面完回来恶补下。在网上查到大概有5种生产者消费者的写法,分别如下。

synchronized对存储加锁,然后用object原生的wait() 和 notify()做同步。
用concurrent.locks.Lock,然后用condition的await() 和signal()做同步。
直接使用concurrent.BlockingQueue。
使用PipedInputStream/PipedOutputStream。
使用信号量semaphore。

synchronized版本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import java.util.LinkedList;
import java.util.Queue;

public class ProducerAndConsumer {
private final int MAX_LEN = 10;
private Queue<Integer> queue = new LinkedList<Integer>();
class Producer extends Thread {
@Override
public void run() {
producer();
}
private void producer() {
while(true) {
synchronized (queue) {
while (queue.size() == MAX_LEN) {
queue.notify();
System.out.println("当前队列满");
try {
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.add(1);
queue.notify();
System.out.println("生产者生产一条任务,当前队列长度为" + queue.size());
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
class Consumer extends Thread {
@Override
public void run() {
consumer();
}
private void consumer() {
while (true) {
synchronized (queue) {
while (queue.size() == 0) {
queue.notify();
System.out.println("当前队列为空");
try {
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.poll();
queue.notify();
System.out.println("消费者消费一条任务,当前队列长度为" + queue.size());
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
public static void main(String[] args) {
ProducerAndConsumer pc = new ProducerAndConsumer();
Producer producer = pc.new Producer();
Consumer consumer = pc.new Consumer();
producer.start();
consumer.start();
}
}

lock+condition的同步

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* version 1 doesn't use synchronized to improve performance
*/
public class ProducerAndConsumer1 {
private final int MAX_LEN = 10;
private Queue<Integer> queue = new LinkedList<Integer>();
private final Lock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
class Producer extends Thread {
@Override
public void run() {
producer();
}
private void producer() {
while(true) {
lock.lock();
try {
while (queue.size() == MAX_LEN) {
System.out.println("当前队列满");
try {
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.add(1);
condition.signal();
System.out.println("生产者生产一条任务,当前队列长度为" + queue.size());
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
} finally {
lock.unlock();
}
}
}
}
class Consumer extends Thread {
@Override
public void run() {
consumer();
}
private void consumer() {
while (true) {
lock.lock();
try {
while (queue.size() == 0) {
System.out.println("当前队列为空");
try {
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.poll();
condition.signal();
System.out.println("消费者消费一条任务,当前队列长度为" + queue.size());
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
} finally {
lock.unlock();
}
}
}
}
public static void main(String[] args) {
ProducerAndConsumer pc = new ProducerAndConsumer();
Producer producer = pc.new Producer();
Consumer consumer = pc.new Consumer();
producer.start();
consumer.start();
}
}

BlockingQueue版实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;


public class ProducerAndConsumer {
private BlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>(10);
class Producer extends Thread {
@Override
public void run() {
producer();
}
private void producer() {
while(true) {
try {
queue.put(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("生产者生产一条任务,当前队列长度为" + queue.size());
try {
Thread.sleep(new Random().nextInt(1000)+500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class Consumer extends Thread {
@Override
public void run() {
consumer();
}
private void consumer() {
while (true) {
try {
queue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消费者消费一条任务,当前队列长度为" + queue.size());
try {
Thread.sleep(new Random().nextInt(1000)+500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) {
ProducerAndConsumer pc = new ProducerAndConsumer();
Producer producer = pc.new Producer();
Consumer consumer = pc.new Consumer();
producer.start();
consumer.start();
}
}

Disruptor方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
// 摘自《Java高并发程序设计》
// https://blog.csdn.net/sunhaoning/article/details/64131362

// BlockigQueue用于实现生产者和消费者一个不错的选择。它可以很自然地实现作为生产者和消费者的内存缓冲区。但是BlockigQueue并不是一个高性能的实现,它完全使用锁和阻塞等待来实现线程间的同步。在高并发场合,它的性能并不是特别的优越。就像之前我已经提过的:ConcurrentLinkedQueue是一个高性能的队列,但是BlockingQueue只是为了方便数据共享。

// 而ConcurrentLinkedQueue的秘诀就在于大量使用了无锁的CAS操作。同理,如果我们使用CAS来实现生产者-消费者模式,也同样可以获得可观的性能提升。不过正如大家所见,使用CAS进行编程是非常困难的,但有一个好消息是,目前有一个现成的Disruptor框架,它已经帮助我们实现了这一个功能。

// Disruptor框架是由LMAX公司开发的一款高效的无锁内存队列。它使用无锁的方式实现了一个环形队列,非常适合于实现生产者和消费者模式,比如事件和消息的发布。在Disruptor中,别出心裁地使用了环形队列(RingBuffer)来代替普通线性队列,这个环形队列内部实现为一个普通的数组。对于一般的队列,势必要提供队列同步head和尾部tail两个指针,用于出队和入队,这样无疑就增加了线程协作的复杂度。但如果队列是环形的,则只需要对外提供一个当前位置cursor,利用这个指针既可以进入入队也可以进行出队操作。由于环形队列的缘故,队列的总大小必须事先指定,不能动态扩展。为了能够快速从一个序列(sequence)对应到数组的实际位置(每次有元素入队,序列就加1),Disruptor要求我们必须将数组的大小设置为2的整数次方。这样通过sequence &(queueSize-1)就能立即定位到实际的元素位置index。这个要比取余(%)操作快得多。


// 首先,我们还是需要一个代表数据的PCData:
public class PCData
{
private long value;
public void set(long value)
{
this.value = value;
}
public long get(){
return value;
}
}

// 消费者实现为WorkHandler接口,它来自Disruptor框架
public class Consumer implements WorkHandler<PCData> {
@Override
public void onEvent(PCData event) throws Exception {
System.out.println(Thread.currentThread().getId() + ":Event: --"
+ event.get() * event.get() + "--");
}
}

// 还需要一个产生PCData的工厂类。它会在Disruptor系统初始化时,构造所有的缓冲区中的对象实例
public class PCDataFactory implements EventFactory<PCData>
{
public PCData newInstance()
{
return new PCData();
}
}

// 生产者
public class Producer
{
private final RingBuffer<PCData> ringBuffer;
public Producer(RingBuffer<PCData> ringBuffer)
{
this.ringBuffer = ringBuffer;
}
public void pushData(ByteBuffer bb)
{
long sequence = ringBuffer.next(); // Grab the next sequence
try
{
PCData event = ringBuffer.get(sequence); // Get the entry in the Disruptor
// for the sequence
event.set(bb.getLong(0)); // Fill with data
}
finally
{
ringBuffer.publish(sequence);
}
}
}

// 主函数
public static void main(String[] args) throws Exception
{
Executor executor = Executors.newCachedThreadPool();
PCDataFactory factory = new PCDataFactory();
// Specify the size of the ring buffer, must be power of 2.
int bufferSize = ;
Disruptor<PCData> disruptor = new Disruptor<PCData>(factory,
bufferSize,
executor,
ProducerType.MULTI,
new BlockingWaitStrategy()
);
disruptor.handleEventsWithWorkerPool(
new Consumer(),
new Consumer(),
new Consumer(),
new Consumer());
disruptor.start();

RingBuffer<PCData> ringBuffer = disruptor.getRingBuffer();
Producer producer = new Producer(ringBuffer);
ByteBuffer bb = ByteBuffer.allocate(8);
for (long l = 0; true; l++)
{
bb.putLong(0, l);
producer.pushData(bb);
Thread.sleep(0);
System.out.println("add data "+l);
}
}

8:Event: --0--
add data 0
11:Event: --1--
add data 1
10:Event: --4--
add data 2
9:Event: --9--
add data 3

根据Disruptor的官方报告,Disruptor的性能要比BlockingQueue至少高一个数量级以上。

提高消费者的响应时间:选择合适的策略

当有新数据在Disruptor的环形缓冲区中产生时,消费者如何知道这些新产生的数据呢?或者说,消费者如何监控缓冲区中的信息呢?为此,Disruptor提供了几种策略,这些策略由WaitStrategy接口进行封装,主要有以下几种实现。

  • BlockingWaitStrategy:这是默认的策略。使用BlockingWaitStrategy和使用BlockingQueue是非常类似的,它们都使用锁和条件(Condition)进行数据的监控和线程的唤醒。因为涉及到线程的切换,BlockingWaitStrategy策略是最节省CPU,但是在高并发下性能表现最糟糕的一种等待策略。
  • SleepingWaitStrategy:这个策略也是对CPU使用率非常保守的。它会在循环中不断等待数据。它会先进行自旋等待,如果不成功,则使用Thread.yield()让出CPU,并最终使用LockSupport.parkNanos(1)进行线程休眠,以确保不占用太多的CPU数据。因此,这个策略对于数据处理可能产生比较高的平均延时。它比较适合于对延时要求不是特别高的场合,好处是它对生产者线程的影响最小。典型的应用场景是异步日志。
  • YieldingWaitStrategy:这个策略用于低延时的场合。消费者线程会不断循环监控缓冲区变化,在循环内部,它会使用Thread.yield()让出CPU给别的线程执行时间。如果你需要一个高性能的系统,并且对延时有较为严格的要求,则可以考虑这种策略。使用这种策略时,相当于你的消费者线程变身成为了一个内部执行了Thread.yield()的死循环。因此,你最好有多于消费者线程数量的逻辑CPU数量(这里的逻辑CPU,我指的是“双核四线程”中的那个四线程,否则,整个应用程序恐怕都会受到影响。
  • BusySpinWaitStrategy:这个是最疯狂的等待策略了。它就是一个死循环!消费者线程会尽最大努力疯狂监控缓冲区的变化。因此,它会吃掉所有的CPU资源。你只有在对延迟非常苛刻的场合可以考虑使用它(或者说,你的系统真的非常繁忙)。因为在这里你等同开启了一个死循环监控,所以,你的物理CPU数量必须要大于消费者线程数。注意,我这里说的是物理CPU,如果你在一个物理核上使用超线程技术模拟两个逻辑核,另外一个逻辑核显然会受到这种超密集计算的影响而不能正常工作。

CPU Cache的优化:解决伪共享问题

​ 什么是伪共享问题呢?我们知道,为了提高CPU的速度,CPU有一个高速缓存Cache。在高速缓存中,读写数据的最小单位为缓存行(Cache Line),它是从主存(memory)复制到缓存(Cache)的最小单位,一般为32字节到128字节。

​ 如果两个变量存放在一个缓存行中时,在多线程访问中,可能会相互影响彼此的性能。如图5.4所示,假设X和Y在同一个缓存行。运行在CPU1上的线程更新了X,那么CPU2上的缓存行就会失效,同一行的Y即使没有修改也会变成无效,导致Cache无法命中。接着,如果在CPU2上的线程更新了Y,则导致CPU1上的缓存行又失效(此时,同一行的X又变得无法访问)。这种情况反反复复发生,无疑是一个潜在的性能杀手。如果CPU经常不能命中缓存,那么系统的吞吐量就会急剧下降。

并行流水线

1
2
1. 借鉴《Java高并发程序设计》第5.6
2.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
计算(B+C)*B/2,那么这个运行过程就是无法并行的。原因是,如果B+C没有执行完成,则永远算不出(B+C)*B,这就是数据相关性。如果线程执行时,所需的数据存在这种依赖关系,那么,就没有办法将它们完美的并行化。
那遇到这种情况时,有没有什么补救措施呢?答案是肯定的,那就是借鉴日常生产中的流水线思想。

比如,现在要生产一批小玩偶。小玩偶的制作分为四个步骤,第一要组装身体,第二要在身体上安装四肢和头部,第三,给组装完成的玩偶穿上一件漂亮的衣服,第四,就可以包装出货了。为了加快制作玩具的进度,我们不可能叫四个人同时加工一个玩具,因为这四个步骤有着严重的依赖关系。如果没有身体,就没有地方安装四肢,如果没有组装完成,就不能穿衣服,如果没有穿上衣服,就不能包装发货。因此,找四个人来做一个玩偶是毫无意义的。

但是,如果你现在要制作的不是1只玩偶,而是1万只玩偶,那情况就不同了。你可以找四个人,第一个人只负责组装身体,完成后交给第二个人;第二个人只负责安装头部和四肢,交付第三人;第三人只负责穿衣服,并交付第四人;第四人只负责包装发货。这样所有人都可以一起工作,共同完成任务,而整个时间周期也能缩短到原来的1/4左右,这就是流水线的思想。一旦流水线满载,每次只需要一步(假设一个玩偶需要四步)就可以产生一个玩偶
类似的思想可以借鉴到程序开发中。即使(B+C)*B/2无法并行,但是如果你需要计算一大堆B和C的值,你依然可以将它流水化。首先将计算过程拆分为三个步骤:

P1:A=B+C
P2:D=A×B
P3:D=D/2

上述步骤中P1、P2和P3均在单独的线程中计算,并且每个线程只负责自己的工作。此时,P3的计算结果就是最终需要的答案。

P1接收B和C的值,并求和,将结果输入给P2。P2求乘积后输入给P3。P3将D除以2得到最终值。一旦这条流水线建立,只需要一个计算步骤就可以得到(B+C)*B/2的结果。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
// 为了实现这个功能,我们需要定义一个在线程间携带结果进行信息交换的载体:
public class Msg {
public double i;
public double j;
public String orgStr=null;
}

// P1计算的是加法:
public class Plus implements Runnable {
public static BlockingQueue<Msg> bq=new LinkedBlockingQueue<Msg>();
@Override
public void run() {
while(true){
try {
Msg msg=bq.take();
msg.j=msg.i+msg.j;
Multiply.bq.add(msg);
} catch (InterruptedException e) {
}
}
}
}
// 上述代码中,P1取得封装了两个操作数的Msg,并进行求和,将结果传递给乘法线程P2(第9行)。当没有数据需要处理时,P1进行等待。

// P2计算乘法:
public class Multiply implements Runnable {
public static BlockingQueue<Msg> bq = new LinkedBlockingQueue<Msg>();

@Override
public void run() {
while (true) {
try {
Msg msg = bq.take();
msg.i = msg.i * msg.j;
Div.bq.add(msg);
} catch (InterruptedException e) {
}
}
}
}
//和P1非常类似,P2计算相乘结果后,将中间结果传递给除法线程P3。

//P3计算除法:
public class Div implements Runnable {
public static BlockingQueue<Msg> bq = new LinkedBlockingQueue<Msg>();
@Override
public void run() {
while (true) {
try {
Msg msg = bq.take();
msg.i = msg.i / 2;
System.out.println(msg.orgStr + "=" + msg.i);
} catch (InterruptedException e) {
}
}
}
}
// P3将结果除以2后输出最终的结果。

// 最后是提交任务的主线程,这里,我们提交0万个请求,让线程组进行计算:
public class PStreamMain {
public static void main(String[] args) {
new Thread(new Plus()).start();
new Thread(new Multiply()).start();
new Thread(new Div()).start();

for (int i = 1; i <= ; i++) {
for (int j = 1; j <= ; j++) {
Msg msg = new Msg();
msg.i = i;
msg.j = j;
msg.orgStr = "((" + i + "+" + j + ")*" + i + ")/2";
Plus.bq.add(msg);
}
}
}
}
上述代码第行,将数据提交给P1加法线程,开启流水线的计算。在多核或者分布式场景中,这种设计思路可以有效地将有依赖关系的操作分配在不同的线程中进行计算,尽可能利用多核优势。