java并发编程-高性能内存队列
高性能内存队列
- 缓存一致性
- 伪共享
- 高性能内存队列Disruptor
- 构造器参数
- 使用流程
缓存一致性
在CPU多核缓存架构中,每个处理器都有一个单独的缓存,共享数据可能有多个副本:一个副本在主内存中,一个副本在请求它的每个处理器的本地缓存中。当数据的一个副本发生更改时,其他副本必须反映该更改。也就是说,CPU多核缓存架构要保证缓存一致性。
两种方式:
- 总线锁定
- 缓存一致性协议:缓存一致性协议是一种用于确保处理器缓存中的数据和主存中的数据一致的机制。缓存一致性协议会通过处理器之间的通信,确保在一个处理器修改了某个数据后,其他处理器缓存中的该数据会被更新或者失效,从而保证在多个处理器同时对同一个数据进行操作时,它们所看到的数据始终是一致的。
- 缓存一致性协议有多种实现,大概分为两类:写失效(当处理器写入一个共享缓存块时,其他缓存中的所有共享副本都会通过总线窥探失效)和写更新(当处理器写入一个共享缓存块时,其他缓存的所有共享副本都会通过总线窥探更新)。
伪共享
如果多个核上的线程在操作同一个缓存行(linux下缓存行有64个字节)中的不同变量数据,那么就会出现频繁的缓存失效,即使在代码层面看这两个线程操作的数据之间完全没有关系。这种不合理的资源竞争情况就是伪共享(FalseSharing)。
linux下查看缓存行大小:cat /proc/cpuinfo
避免伪共享方案:
- 缓存行填充
public class FalseSharing {public static void main(String[] args) throws InterruptedException {Pointer pointer = new Pointer();long start = System.currentTimeMillis();Thread t1 = new Thread(new Runnable() {@Overridepublic void run() {for(int i = 0; i < 100000000; i++) {pointer.x++;}}});Thread t2 = new Thread(new Runnable() {@Overridepublic void run() {for(int i = 0; i < 100000000; i++) {pointer.y++;}}});t1.start();t2.start();t1.join();t2.join();System.out.println(System.currentTimeMillis()-start);}static class Pointer {volatile long x;//缓存填充long p1,p2,p3,p4,p5,p6,p7,p8;volatile long y;}
}
- 使用@sun.misc.Contended注解(java8)注意需要配置jvm参数:-XX:-RestrictContended
public class FalseSharing {public static void main(String[] args) throws InterruptedException {Pointer pointer = new Pointer();long start = System.currentTimeMillis();Thread t1 = new Thread(new Runnable() {@Overridepublic void run() {for(int i = 0; i < 100000000; i++) {pointer.x++;}}});Thread t2 = new Thread(new Runnable() {@Overridepublic void run() {for(int i = 0; i < 100000000; i++) {pointer.y++;}}});t1.start();t2.start();t1.join();t2.join();System.out.println(System.currentTimeMillis()-start);}static class Pointer {@Contendedvolatile long x;//缓存填充//long p1,p2,p3,p4,p5,p6,p7,p8;volatile long y;}
}
- 用线程的本地内存,比如ThreadLocal
高性能内存队列Disruptor
juc下的队列大部分采用有界队列,有界队列通常采用数组实现。但是采用数组实现又会引发另外一个问题false sharing(伪共享)。Disruptor实现了队列的功能并且是一个有界队列,可以用于生产者-消费者模型。利用缓存行填充解决了伪共享的问题。
<dependency><groupId>com.lmax</groupId><artifactId>disruptor</artifactId><version>3.4.2</version></dependency>
构造器参数
EventFactory:创建事件(任务)的工厂类。ringBufferSize:容器的长度。
ThreadFactory:用于创建执行任务的线程。
ProductType:生产者类型:单生产者、多生产者。
WaitStrategy:等待策略。
使用流程
- 构建消息载体(事件)
- 构建生产者
- 构建消费者
- 生产消息,消费消息的测试
@Data
public class OrderEvent {private long value;private String name;
}
public class OrderEventFactory implements EventFactory<OrderEvent> {@Overridepublic OrderEvent newInstance() {return new OrderEvent();}
}
public class OrderEventProducer {//存储事件的环形队列private RingBuffer<OrderEvent> ringBuffer;public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer) {this.ringBuffer = ringBuffer;}public void onData(long value, String name) {//获取队列的下一个槽long next = ringBuffer.next();//获取消息OrderEvent orderEvent = ringBuffer.get(next);//写入数据orderEvent.setValue(value);orderEvent.setName(name);System.out.println(String.format("生产着发送数据:value=%d name=%s", value, name));ringBuffer.publish(next);}
}
public class OrderEventHandler implements EventHandler<OrderEvent> {@Overridepublic void onEvent(OrderEvent orderEvent, long l, boolean b) throws Exception {System.out.println(String.format("消费者拿到数据:value=%d name=%s", orderEvent.getValue(), orderEvent.getName()));}
}
单生产者单消费者模式
public class DisruptorDemo {public static void main(String[] args) {//创建DisruptorDisruptor<OrderEvent> disruptor = new Disruptor<>(new OrderEventFactory(),1024 * 1024,Executors.defaultThreadFactory(),ProducerType.SINGLE,new YieldingWaitStrategy());//设置消费者用于处理的事件disruptor.handleEventsWith(new OrderEventHandler());disruptor.start();//创建队列容器RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();//创建生产者OrderEventProducer producer = new OrderEventProducer(ringBuffer);for(int i = 0; i < 100; i++) {producer.onData(i, "test"+i);}disruptor.shutdown();}
}
单生产者多消费者模式
public class DisruptorDemo {public static void main(String[] args) {//创建DisruptorDisruptor<OrderEvent> disruptor = new Disruptor<>(new OrderEventFactory(),1024 * 1024,Executors.defaultThreadFactory(),ProducerType.SINGLE,new YieldingWaitStrategy());//设置消费者用于处理的事件//disruptor.handleEventsWith(new OrderEventHandler());//多消费者,重复消费//disruptor.handleEventsWith(new OrderEventHandler(), new OrderEventHandler());//设置多消费者,消费者要实现WorkHandler接口,一条消息只会被一个消费者消费disruptor.handleEventsWithWorkerPool(new OrderEventHandler(), new OrderEventHandler());disruptor.start();//创建队列容器RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();//创建生产者OrderEventProducer producer = new OrderEventProducer(ringBuffer);for(int i = 0; i < 100; i++) {producer.onData(i, "test"+i);}disruptor.shutdown();}
}
多生产者多消费者模式
public class DisruptorDemo {public static void main(String[] args) throws InterruptedException {//创建DisruptorDisruptor<OrderEvent> disruptor = new Disruptor<>(new OrderEventFactory(),1024 * 1024,Executors.defaultThreadFactory(),ProducerType.MULTI,new YieldingWaitStrategy());//设置消费者用于处理的事件//disruptor.handleEventsWith(new OrderEventHandler());//多消费者,重复消费//disruptor.handleEventsWith(new OrderEventHandler(), new OrderEventHandler());//设置多消费者,消费者要实现WorkHandler接口,一条消息只会被一个消费者消费disruptor.handleEventsWithWorkerPool(new OrderEventHandler(), new OrderEventHandler());disruptor.start();//创建队列容器RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();//创建生产者Thread t1 = new Thread(new Runnable() {@Overridepublic void run() {OrderEventProducer producer = new OrderEventProducer(ringBuffer);for(int i = 0; i < 100; i++) {producer.onData(i, "test"+i);}}});//创建生产者Thread t2 = new Thread(new Runnable() {@Overridepublic void run() {OrderEventProducer producer = new OrderEventProducer(ringBuffer);for(int i = 0; i < 100; i++) {producer.onData(i, "aaaa"+i);}}});t1.start();t2.start();t1.join();t2.join();disruptor.shutdown();}
}