当前位置: 首页 > news >正文

java并发编程-高性能内存队列

高性能内存队列

  • 缓存一致性
  • 伪共享
  • 高性能内存队列Disruptor
    • 构造器参数
    • 使用流程

缓存一致性

在CPU多核缓存架构中,每个处理器都有一个单独的缓存,共享数据可能有多个副本:一个副本在主内存中,一个副本在请求它的每个处理器的本地缓存中。当数据的一个副本发生更改时,其他副本必须反映该更改。也就是说,CPU多核缓存架构要保证缓存一致性。

两种方式:

  1. 总线锁定
  2. 缓存一致性协议:缓存一致性协议是一种用于确保处理器缓存中的数据和主存中的数据一致的机制。缓存一致性协议会通过处理器之间的通信,确保在一个处理器修改了某个数据后,其他处理器缓存中的该数据会被更新或者失效,从而保证在多个处理器同时对同一个数据进行操作时,它们所看到的数据始终是一致的。
  3. 缓存一致性协议有多种实现,大概分为两类:写失效(当处理器写入一个共享缓存块时,其他缓存中的所有共享副本都会通过总线窥探失效)和写更新(当处理器写入一个共享缓存块时,其他缓存的所有共享副本都会通过总线窥探更新)。

伪共享

如果多个核上的线程在操作同一个缓存行(linux下缓存行有64个字节)中的不同变量数据,那么就会出现频繁的缓存失效,即使在代码层面看这两个线程操作的数据之间完全没有关系。这种不合理的资源竞争情况就是伪共享(FalseSharing)。

linux下查看缓存行大小:cat /proc/cpuinfo
避免伪共享方案:

  1. 缓存行填充
public class FalseSharing {public static void main(String[] args) throws InterruptedException {Pointer pointer = new Pointer();long start = System.currentTimeMillis();Thread t1 = new Thread(new Runnable() {@Overridepublic void run() {for(int i = 0; i < 100000000; i++) {pointer.x++;}}});Thread t2 = new Thread(new Runnable() {@Overridepublic void run() {for(int i = 0; i < 100000000; i++) {pointer.y++;}}});t1.start();t2.start();t1.join();t2.join();System.out.println(System.currentTimeMillis()-start);}static class Pointer {volatile long x;//缓存填充long p1,p2,p3,p4,p5,p6,p7,p8;volatile long y;}
}
  1. 使用@sun.misc.Contended注解(java8)注意需要配置jvm参数:-XX:-RestrictContended
public class FalseSharing {public static void main(String[] args) throws InterruptedException {Pointer pointer = new Pointer();long start = System.currentTimeMillis();Thread t1 = new Thread(new Runnable() {@Overridepublic void run() {for(int i = 0; i < 100000000; i++) {pointer.x++;}}});Thread t2 = new Thread(new Runnable() {@Overridepublic void run() {for(int i = 0; i < 100000000; i++) {pointer.y++;}}});t1.start();t2.start();t1.join();t2.join();System.out.println(System.currentTimeMillis()-start);}static class Pointer {@Contendedvolatile long x;//缓存填充//long p1,p2,p3,p4,p5,p6,p7,p8;volatile long y;}
}
  1. 用线程的本地内存,比如ThreadLocal

高性能内存队列Disruptor

juc下的队列大部分采用有界队列,有界队列通常采用数组实现。但是采用数组实现又会引发另外一个问题false sharing(伪共享)。Disruptor实现了队列的功能并且是一个有界队列,可以用于生产者-消费者模型。利用缓存行填充解决了伪共享的问题。

    <dependency><groupId>com.lmax</groupId><artifactId>disruptor</artifactId><version>3.4.2</version></dependency>

构造器参数

EventFactory:创建事件(任务)的工厂类。ringBufferSize:容器的长度。
ThreadFactory:用于创建执行任务的线程。
ProductType:生产者类型:单生产者、多生产者。
WaitStrategy:等待策略。

使用流程

  1. 构建消息载体(事件)
  2. 构建生产者
  3. 构建消费者
  4. 生产消息,消费消息的测试
@Data
public class OrderEvent {private long value;private String name;
}
public class OrderEventFactory implements EventFactory<OrderEvent> {@Overridepublic OrderEvent newInstance() {return new OrderEvent();}
}
public class OrderEventProducer {//存储事件的环形队列private RingBuffer<OrderEvent> ringBuffer;public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer) {this.ringBuffer = ringBuffer;}public void onData(long value, String name) {//获取队列的下一个槽long next = ringBuffer.next();//获取消息OrderEvent orderEvent = ringBuffer.get(next);//写入数据orderEvent.setValue(value);orderEvent.setName(name);System.out.println(String.format("生产着发送数据:value=%d name=%s", value, name));ringBuffer.publish(next);}
}
public class OrderEventHandler implements EventHandler<OrderEvent> {@Overridepublic void onEvent(OrderEvent orderEvent, long l, boolean b) throws Exception {System.out.println(String.format("消费者拿到数据:value=%d name=%s", orderEvent.getValue(), orderEvent.getName()));}
}

单生产者单消费者模式

public class DisruptorDemo {public static void main(String[] args) {//创建DisruptorDisruptor<OrderEvent> disruptor = new Disruptor<>(new OrderEventFactory(),1024 * 1024,Executors.defaultThreadFactory(),ProducerType.SINGLE,new YieldingWaitStrategy());//设置消费者用于处理的事件disruptor.handleEventsWith(new OrderEventHandler());disruptor.start();//创建队列容器RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();//创建生产者OrderEventProducer producer = new OrderEventProducer(ringBuffer);for(int i = 0; i < 100; i++) {producer.onData(i, "test"+i);}disruptor.shutdown();}
}

单生产者多消费者模式

public class DisruptorDemo {public static void main(String[] args) {//创建DisruptorDisruptor<OrderEvent> disruptor = new Disruptor<>(new OrderEventFactory(),1024 * 1024,Executors.defaultThreadFactory(),ProducerType.SINGLE,new YieldingWaitStrategy());//设置消费者用于处理的事件//disruptor.handleEventsWith(new OrderEventHandler());//多消费者,重复消费//disruptor.handleEventsWith(new OrderEventHandler(), new OrderEventHandler());//设置多消费者,消费者要实现WorkHandler接口,一条消息只会被一个消费者消费disruptor.handleEventsWithWorkerPool(new OrderEventHandler(), new OrderEventHandler());disruptor.start();//创建队列容器RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();//创建生产者OrderEventProducer producer = new OrderEventProducer(ringBuffer);for(int i = 0; i < 100; i++) {producer.onData(i, "test"+i);}disruptor.shutdown();}
}

多生产者多消费者模式

public class DisruptorDemo {public static void main(String[] args) throws InterruptedException {//创建DisruptorDisruptor<OrderEvent> disruptor = new Disruptor<>(new OrderEventFactory(),1024 * 1024,Executors.defaultThreadFactory(),ProducerType.MULTI,new YieldingWaitStrategy());//设置消费者用于处理的事件//disruptor.handleEventsWith(new OrderEventHandler());//多消费者,重复消费//disruptor.handleEventsWith(new OrderEventHandler(), new OrderEventHandler());//设置多消费者,消费者要实现WorkHandler接口,一条消息只会被一个消费者消费disruptor.handleEventsWithWorkerPool(new OrderEventHandler(), new OrderEventHandler());disruptor.start();//创建队列容器RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();//创建生产者Thread t1 = new Thread(new Runnable() {@Overridepublic void run() {OrderEventProducer producer = new OrderEventProducer(ringBuffer);for(int i = 0; i < 100; i++) {producer.onData(i, "test"+i);}}});//创建生产者Thread t2 = new Thread(new Runnable() {@Overridepublic void run() {OrderEventProducer producer = new OrderEventProducer(ringBuffer);for(int i = 0; i < 100; i++) {producer.onData(i, "aaaa"+i);}}});t1.start();t2.start();t1.join();t2.join();disruptor.shutdown();}
}

相关文章:

  • OpenVINO教程(二):图片目标检测推理应用
  • MySQL VS SQL Server:优缺点全解析
  • Unity3D 编辑器扩展开发指南
  • 基于大模型的贲门失弛缓症手术全流程风险预测与治疗方案研究
  • docker 国内源和常用命令
  • 【Ultralytics 使用yolo12 读取tiff 数据异常解决】
  • 实践项目开发-hbmV4V20250407-跨平台开发框架深度解析与VSCode一站式开发实践
  • 双向流-热-固耦合分析
  • 数据结构:链表
  • Vue中如何优雅地处理 `<el-dialog>` 的关闭事件
  • 【MQ篇】RabbitMQ之简单模式!
  • 第T9周:猫狗识别2
  • 机器学习基础 - 分类模型之逻辑回归
  • Linux kernel signal原理(下)- aarch64架构sigreturn流程
  • XHTMLConverter把docx转换html报java.lang.NullPointerException异常
  • 基于SpringBoot的校园赛事直播管理系统-项目分享
  • 前端如何优雅地对接后端
  • 使用Selenium进行元素定位的全面指南
  • docker容器中uv的使用
  • 前端性能优化全攻略:JavaScript 优化、DOM 操作、内存管理、资源压缩与合并、构建工具及性能监控
  • 欧盟就中欧有关世贸争端案件提起上诉仲裁,商务部回应
  • 中印尼举行外长防长“2+2”对话机制首次部长级会议
  • 成都市政府秘书长王忠诚调任遂宁市委副书记
  • 徐之凯评《突如其来的勇气》|早熟的抵抗
  • 林诗栋4比1战胜梁靖崑,晋级世界杯男单决赛将和雨果争冠
  • 男子拍摄女性视频后在网上配发诱导他人违法犯罪文字,已被警方行拘