生产者消费者 1 2 1. Java生产者消费者的三种实现 https:
1 2 3 4 5 6 7 Java生产者消费者是最基础的线程同步问题,java岗面试中还是很容易遇到的,之前没写过多线程的代码,面试中被问到很尬啊,面完回来恶补下。在网上查到大概有5 种生产者消费者的写法,分别如下。 用synchronized 对存储加锁,然后用object原生的wait() 和 notify()做同步。 用concurrent.locks.Lock,然后用condition的await() 和signal()做同步。 直接使用concurrent.BlockingQueue。 使用PipedInputStream/PipedOutputStream。 使用信号量semaphore。
synchronized版本 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 import java.util.LinkedList;import java.util.Queue;public class ProducerAndConsumer { private final int MAX_LEN = 10 ; private Queue<Integer> queue = new LinkedList <Integer>(); class Producer extends Thread { @Override public void run () { producer(); } private void producer () { while (true ) { synchronized (queue) { while (queue.size() == MAX_LEN) { queue.notify(); System.out.println("当前队列满" ); try { queue.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } queue.add(1 ); queue.notify(); System.out.println("生产者生产一条任务,当前队列长度为" + queue.size()); try { Thread.sleep(500 ); } catch (InterruptedException e) { e.printStackTrace(); } } } } } class Consumer extends Thread { @Override public void run () { consumer(); } private void consumer () { while (true ) { synchronized (queue) { while (queue.size() == 0 ) { queue.notify(); System.out.println("当前队列为空" ); try { queue.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } queue.poll(); queue.notify(); System.out.println("消费者消费一条任务,当前队列长度为" + queue.size()); try { Thread.sleep(500 ); } catch (InterruptedException e) { e.printStackTrace(); } } } } } public static void main (String[] args) { ProducerAndConsumer pc = new ProducerAndConsumer (); Producer producer = pc.new Producer (); Consumer consumer = pc.new Consumer (); producer.start(); consumer.start(); } }
lock+condition的同步 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 import java.util.LinkedList;import java.util.Queue;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;public class ProducerAndConsumer1 { private final int MAX_LEN = 10 ; private Queue<Integer> queue = new LinkedList <Integer>(); private final Lock lock = new ReentrantLock (); private final Condition condition = lock.newCondition(); class Producer extends Thread { @Override public void run () { producer(); } private void producer () { while (true ) { lock.lock(); try { while (queue.size() == MAX_LEN) { System.out.println("当前队列满" ); try { condition.await(); } catch (InterruptedException e) { e.printStackTrace(); } } queue.add(1 ); condition.signal(); System.out.println("生产者生产一条任务,当前队列长度为" + queue.size()); try { Thread.sleep(500 ); } catch (InterruptedException e) { e.printStackTrace(); } } finally { lock.unlock(); } } } } class Consumer extends Thread { @Override public void run () { consumer(); } private void consumer () { while (true ) { lock.lock(); try { while (queue.size() == 0 ) { System.out.println("当前队列为空" ); try { condition.await(); } catch (InterruptedException e) { e.printStackTrace(); } } queue.poll(); condition.signal(); System.out.println("消费者消费一条任务,当前队列长度为" + queue.size()); try { Thread.sleep(500 ); } catch (InterruptedException e) { e.printStackTrace(); } } finally { lock.unlock(); } } } } public static void main (String[] args) { ProducerAndConsumer pc = new ProducerAndConsumer (); Producer producer = pc.new Producer (); Consumer consumer = pc.new Consumer (); producer.start(); consumer.start(); } }
BlockingQueue版实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 import java.util.Random;import java.util.concurrent.BlockingQueue;import java.util.concurrent.LinkedBlockingQueue;public class ProducerAndConsumer { private BlockingQueue<Integer> queue = new LinkedBlockingQueue <Integer>(10 ); class Producer extends Thread { @Override public void run () { producer(); } private void producer () { while (true ) { try { queue.put(1 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("生产者生产一条任务,当前队列长度为" + queue.size()); try { Thread.sleep(new Random ().nextInt(1000 )+500 ); } catch (InterruptedException e) { e.printStackTrace(); } } } } class Consumer extends Thread { @Override public void run () { consumer(); } private void consumer () { while (true ) { try { queue.take(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("消费者消费一条任务,当前队列长度为" + queue.size()); try { Thread.sleep(new Random ().nextInt(1000 )+500 ); } catch (InterruptedException e) { e.printStackTrace(); } } } } public static void main (String[] args) { ProducerAndConsumer pc = new ProducerAndConsumer (); Producer producer = pc.new Producer (); Consumer consumer = pc.new Consumer (); producer.start(); consumer.start(); } }
Disruptor方式 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 public class PCData { private long value; public void set (long value) { this .value = value; } public long get () { return value; } } public class Consumer implements WorkHandler <PCData> { @Override public void onEvent (PCData event) throws Exception { System.out.println(Thread.currentThread().getId() + ":Event: --" + event.get() * event.get() + "--" ); } } public class PCDataFactory implements EventFactory <PCData>{ public PCData newInstance () { return new PCData (); } } public class Producer { private final RingBuffer<PCData> ringBuffer; public Producer (RingBuffer<PCData> ringBuffer) { this .ringBuffer = ringBuffer; } public void pushData (ByteBuffer bb) { long sequence = ringBuffer.next(); try { PCData event = ringBuffer.get(sequence); event.set(bb.getLong(0 )); } finally { ringBuffer.publish(sequence); } } } public static void main (String[] args) throws Exception { Executor executor = Executors.newCachedThreadPool(); PCDataFactory factory = new PCDataFactory (); int bufferSize = ; Disruptor<PCData> disruptor = new Disruptor <PCData>(factory, bufferSize, executor, ProducerType.MULTI, new BlockingWaitStrategy () ); disruptor.handleEventsWithWorkerPool( new Consumer (), new Consumer (), new Consumer (), new Consumer ()); disruptor.start(); RingBuffer<PCData> ringBuffer = disruptor.getRingBuffer(); Producer producer = new Producer (ringBuffer); ByteBuffer bb = ByteBuffer.allocate(8 ); for (long l = 0 ; true ; l++) { bb.putLong(0 , l); producer.pushData(bb); Thread.sleep(0 ); System.out.println("add data " +l); } } 8 :Event: --0 --add data 0 11 :Event: --1 --add data 1 10 :Event: --4 --add data 2 9 :Event: --9 --add data 3 根据Disruptor的官方报告,Disruptor的性能要比BlockingQueue至少高一个数量级以上。
提高消费者的响应时间:选择合适的策略 当有新数据在Disruptor的环形缓冲区中产生时,消费者如何知道这些新产生的数据呢?或者说,消费者如何监控缓冲区中的信息呢?为此,Disruptor提供了几种策略,这些策略由WaitStrategy接口进行封装,主要有以下几种实现。
BlockingWaitStrategy:这是默认的策略。使用BlockingWaitStrategy和使用BlockingQueue是非常类似的,它们都使用锁和条件(Condition)进行数据的监控和线程的唤醒。因为涉及到线程的切换,BlockingWaitStrategy策略是最节省CPU,但是在高并发下性能表现最糟糕的一种等待策略。
SleepingWaitStrategy:这个策略也是对CPU使用率非常保守的。它会在循环中不断等待数据。它会先进行自旋等待,如果不成功,则使用Thread.yield()让出CPU,并最终使用LockSupport.parkNanos(1)进行线程休眠,以确保不占用太多的CPU数据。因此,这个策略对于数据处理可能产生比较高的平均延时。它比较适合于对延时要求不是特别高的场合,好处是它对生产者线程的影响最小。典型的应用场景是异步日志。
YieldingWaitStrategy:这个策略用于低延时的场合。消费者线程会不断循环监控缓冲区变化,在循环内部,它会使用Thread.yield()让出CPU给别的线程执行时间。如果你需要一个高性能的系统,并且对延时有较为严格的要求,则可以考虑这种策略。使用这种策略时,相当于你的消费者线程变身成为了一个内部执行了Thread.yield()的死循环。因此,你最好有多于消费者线程数量的逻辑CPU数量(这里的逻辑CPU,我指的是“双核四线程”中的那个四线程,否则,整个应用程序恐怕都会受到影响。
BusySpinWaitStrategy:这个是最疯狂的等待策略了。它就是一个死循环!消费者线程会尽最大努力疯狂监控缓冲区的变化。因此,它会吃掉所有的CPU资源。你只有在对延迟非常苛刻的场合可以考虑使用它(或者说,你的系统真的非常繁忙)。因为在这里你等同开启了一个死循环监控,所以,你的物理CPU数量必须要大于消费者线程数。注意,我这里说的是物理CPU,如果你在一个物理核上使用超线程技术模拟两个逻辑核,另外一个逻辑核显然会受到这种超密集计算的影响而不能正常工作。
CPU Cache的优化:解决伪共享问题 什么是伪共享问题 呢?我们知道,为了提高CPU的速度,CPU有一个高速缓存Cache。在高速缓存中,读写数据的最小单位为缓存行(Cache Line),它是从主存(memory)复制到缓存(Cache)的最小单位,一般为32字节到128字节。
如果两个变量存放在一个缓存行中时,在多线程访问中,可能会相互影响彼此的性能。如图5.4所示,假设X和Y在同一个缓存行。运行在CPU1上的线程更新了X,那么CPU2上的缓存行就会失效,同一行的Y即使没有修改也会变成无效,导致Cache无法命中。接着,如果在CPU2上的线程更新了Y,则导致CPU1上的缓存行又失效(此时,同一行的X又变得无法访问)。这种情况反反复复发生,无疑是一个潜在的性能杀手。如果CPU经常不能命中缓存,那么系统的吞吐量就会急剧下降。
并行流水线 1 2 1. 借鉴《Java高并发程序设计》第5.6 节2.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 计算(B+C)*B/2 ,那么这个运行过程就是无法并行的。原因是,如果B+C没有执行完成,则永远算不出(B+C)*B,这就是数据相关性。如果线程执行时,所需的数据存在这种依赖关系,那么,就没有办法将它们完美的并行化。 那遇到这种情况时,有没有什么补救措施呢?答案是肯定的,那就是借鉴日常生产中的流水线思想。 比如,现在要生产一批小玩偶。小玩偶的制作分为四个步骤,第一要组装身体,第二要在身体上安装四肢和头部,第三,给组装完成的玩偶穿上一件漂亮的衣服,第四,就可以包装出货了。为了加快制作玩具的进度,我们不可能叫四个人同时加工一个玩具,因为这四个步骤有着严重的依赖关系。如果没有身体,就没有地方安装四肢,如果没有组装完成,就不能穿衣服,如果没有穿上衣服,就不能包装发货。因此,找四个人来做一个玩偶是毫无意义的。 但是,如果你现在要制作的不是1 只玩偶,而是1 万只玩偶,那情况就不同了。你可以找四个人,第一个人只负责组装身体,完成后交给第二个人;第二个人只负责安装头部和四肢,交付第三人;第三人只负责穿衣服,并交付第四人;第四人只负责包装发货。这样所有人都可以一起工作,共同完成任务,而整个时间周期也能缩短到原来的1 /4 左右,这就是流水线的思想。一旦流水线满载,每次只需要一步(假设一个玩偶需要四步)就可以产生一个玩偶 类似的思想可以借鉴到程序开发中。即使(B+C)*B/2 无法并行,但是如果你需要计算一大堆B和C的值,你依然可以将它流水化。首先将计算过程拆分为三个步骤: P1:A=B+C P2:D=A×B P3:D=D/2 上述步骤中P1、P2和P3均在单独的线程中计算,并且每个线程只负责自己的工作。此时,P3的计算结果就是最终需要的答案。 P1接收B和C的值,并求和,将结果输入给P2。P2求乘积后输入给P3。P3将D除以2 得到最终值。一旦这条流水线建立,只需要一个计算步骤就可以得到(B+C)*B/2 的结果。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 public class Msg { public double i; public double j; public String orgStr=null ; } public class Plus implements Runnable { public static BlockingQueue<Msg> bq=new LinkedBlockingQueue <Msg>(); @Override public void run () { while (true ){ try { Msg msg=bq.take(); msg.j=msg.i+msg.j; Multiply.bq.add(msg); } catch (InterruptedException e) { } } } } public class Multiply implements Runnable { public static BlockingQueue<Msg> bq = new LinkedBlockingQueue <Msg>(); @Override public void run () { while (true ) { try { Msg msg = bq.take(); msg.i = msg.i * msg.j; Div.bq.add(msg); } catch (InterruptedException e) { } } } } public class Div implements Runnable { public static BlockingQueue<Msg> bq = new LinkedBlockingQueue <Msg>(); @Override public void run () { while (true ) { try { Msg msg = bq.take(); msg.i = msg.i / 2 ; System.out.println(msg.orgStr + "=" + msg.i); } catch (InterruptedException e) { } } } } public class PStreamMain { public static void main (String[] args) { new Thread (new Plus ()).start(); new Thread (new Multiply ()).start(); new Thread (new Div ()).start(); for (int i = 1 ; i <= ; i++) { for (int j = 1 ; j <= ; j++) { Msg msg = new Msg (); msg.i = i; msg.j = j; msg.orgStr = "((" + i + "+" + j + ")*" + i + ")/2" ; Plus.bq.add(msg); } } } } 上述代码第行,将数据提交给P1加法线程,开启流水线的计算。在多核或者分布式场景中,这种设计思路可以有效地将有依赖关系的操作分配在不同的线程中进行计算,尽可能利用多核优势。