当前位置: 首页 > news >正文

tigase源码学习杂记-IO处理的线程模型

前言

tigase是一个高性能的服务器,其实个人认为作为即时通讯的服务器,高性能主要体现在他对IO复用,和多线程的使用上,今天来学习一下他的IO的线程处理模型的源码,并记录一下他优秀的设计。

概述

tigase是使用的NIO作为自己的IO模型。IOService是实现了Callable接口的并持有SocketChannel对象的一个抽象的IO封装对象。

tigase的IO处理线程模型的核心类是 SocketThread,此类提供了两个核心的静态方法:addSocketService(IOService<?> s) 和 removeSocketService(IOService<Object> s) 分别是表示把IOService对象添加进线程模型中和把IOService对象移除线程模型中。其他IO监听器类监听到IO的xmppStreamOpened()发生之后,调用addSocketService(IOService<?> s)方法将数据添加到SocketThread之后,就开始了整个多线程处理整个IO复用的流程。

IO处理流程设计

tiagase IO处理流程图

ConnectionOpenThread作为整个tigase的IO监听连接的线程,tigase服务启动之后就会启动ConnectionOpenThread。启动完ConnectionOpenThread流程如下:

  1. ConnectionOpenThread首先会初始化内部的Selector用于对客户端的连接的监听。(当然ConnectionOpenThread也会做一些限流的控制,超过连接数就拒绝新连接等)对应图中1步
  2. 接着ConnectionOpenThread就会循环监听就绪的SelectKey,拿到SocketChannel传递给ConnectionOpenListener。对应图中2、3、4步
  3. ConnectionOpenListener监听器就会创建IOService对象,并设置一些参数,包括SSL容器,和其他的一些监听器。对应图中5、6步
  4. ConnectionManager(连接管理器)启动IOService对象,添加连接超时需要执行的Task,并将IOService对象通过SocketThread.addSocketService(IOService<?> s)添加到IO复用的线程模型中,自此进入了IO处理的线程的逻辑。对应图中7、8、9步
  5. SocketThread.addSocketService(IOService<?> s)通过哈希和取模的算法,将数据负载均衡到不同的SocketThread的waitting的跳表集合中进行数据的缓冲。SocketThread循环从waitting跳表集合中拿到IOService并根据读写事件类型,将IOService注册到SocketThread自身的Selector对象上。对应图中10步
  6. SocketThread循序监听Selector就绪的SelectKey,并拿到IOService对象,并将IOService添加到forCompletion跳表集合中进行缓冲。对应图中11步
  7. SocketThread循环从forCompletion跳表集合中拿到IOService对象,丢到completionService线程池进行执行IOService的call()方法。对应图中12步
  8. ResultsListener 结果监听器线程循环监听completionService执行结束的IOService对象的状态,如果IOService状态是isConnected就继续调用SocketThread.addSocketService(IOService<?> s)添加到SocketThread进行重复调用每个IOService自身的call()方法。自此整个IO的循环处理流程基本结束。对应图中13步

SocketThread的线程模型

tigase的线程模型和Netty的主从React的模式有点像,ConnectionOpenThread主要用来处理客户端的连接建立,建立之后创建将SocketChannel封装到IOService抽象对象中,通过IOService.hashCode() % SocketThread.length 算法负载均衡到不同的ScoketThread中(因为是跟IOService计算的hash,保证相同的IO对象会添加到相同的线程中去),通过SocketChannel的一列缓冲操作,将就绪的IOServices丢到CompletionService线程池中去,线程池来执行IOService.call()方法对数据进行读写的各种操作。call()中加了读写锁,这里效率略低于Netty的设计,Netty是一个线程基于责任链的形式一个线程执行到底的无锁设计,这里ResultsListener可以将IOService再次丢入线程池中供不同的线程继续调用。

SocketThread类数据结构设计源码分析

数据结构
class SocketThread implements Runnable{//线程池private static CompletionService<IOService<?>> completionService = null;//读线程数组private static SocketThread[] socketReadThread = null;//写线程数组private static SocketThread[] socketWriteThread = null;
}

SocketThread线程通过持有两个static类型的读写线程组和CompletionService的线程池组成他的独特的线程模型,通过读写线程分组提高代码的效率,并更好地体现了设计的单一职责。CompletionService的线程池主要的作用就是解耦执行和结果的获取。采用static的类变量,就只有初始化一次。在类加载完成之后,线程处理模型就初始化好了。

初始化相关代码
class SocketThread implements Runnable{ //默认每个核心的线程数public static final int DEF_MAX_THREADS_PER_CPU = 8;//获取系统的核心数private static int cpus = Runtime.getRuntime().availableProcessors();static {if (socketReadThread == null) {//根据CPU核数计算默认线程数int nThreads = (cpus * DEF_MAX_THREADS_PER_CPU) / 2 + 1;executor = new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());completionService = new ExecutorCompletionService<IOService<?>>(executor);socketReadThread = new SocketThread[nThreads];socketWriteThread = new SocketThread[nThreads];//读线程初始化for (int i = 0; i < socketReadThread.length; i++) {socketReadThread[i] = new SocketThread("socketReadThread-" + i);socketReadThread[i].reading = true;Thread thrd = new Thread(socketReadThread[i]);thrd.setName("socketReadThread-" + i);thrd.start();}...//写线程初始化for (int i = 0; i < socketWriteThread.length; i++) {socketWriteThread[i] = new SocketThread("socketWriteThread-" + i);socketWriteThread[i].writing = true;Thread thrd = new Thread(socketWriteThread[i]);thrd.setName("socketWriteThread-" + i);thrd.start();}...}   }
}

线程模型初始化很简单,就是通过一个static代码块,初始化相关的读写线程组和处理IOService的call()方法调用的线程池。这里稍微核心一点的我想说可能就是计算线程池的公式,这里对tiagse的计算公式进行化简可得 nThread = CPU 核数 * 4 + 1,而对比《java并发编程实战》一书的第八章的线程池的最优计算公式:线程数 = CPU 核数 * CPU利用率 * (1 + 等待时间/计算时间), 朝着tigase公式的形式化简一下可得:线程数 = CPU 核数 * CPU利用率 + CPU 核数 *(CPU利用率*等待时间/计算时间)),由于tigase是IO密集型系统,所以我大胆假设 CPU 核数 * CPU利用率 ≈1,所以以上两个公式可得出:CPU利用率*等待时间/计算时间 = 4.当然这这只是个人的一些假设,用于复习一下线程池计算公式。实际数据肯定是tigase经过大量测试优化之后的最优数据。

SocketThread成员变量数据结构设计源码

class SocketThread implements Runnable{//selector对象private Selector clientsSel = null;// 控制selector为空的一个计数器private int empty_selections = 0;//“待完成”的IOService的缓冲跳表集合private ConcurrentSkipListSet<IOService<?>> forCompletion = new ConcurrentSkipListSet<IOService<?>>(new IOServiceComparator());//读状态private boolean reading = false;//写状态private boolean writing = false;//停止状态private boolean stopping = false;//等待的IOService的缓冲跳表集合private ConcurrentSkipListSet<IOService<?>> waiting = new ConcurrentSkipListSet<IOService<?>>(new IOServiceComparator());
}
  • clientsSel对象: 用于监听对象的就绪状态。
  • empty_selections:作为一个计数器,作用是统计 “Selector中就绪的Key为0 waiting跳表中的IOService为0” 的个数,当个数大于默认的10次,就重新创建Selector,代码的注释写的是解决两个java的BUG,这里就不做展开。
  • forCompletion:待完成的IOService的跳表缓冲集合
  • reading: 标记SocketThread是否是处理IO读的线程
  • writing::标记SocketThread是否是处理IO写的线程
  • stopping: 标记SocketThread线程是否是已经停止
  • waiting:等待的IOService的跳表缓冲集合

核心的addSocketService、 removeSocketService方法源码

   /*** 核心方法添加IOService* @param s*/public static void addSocketService(IOService<?> s) {//设置IOService已就绪s.setSocketServiceReady(true);// Due to a delayed SelectionKey cancelling deregistering// nature this distribution doesn't work well, it leads to// dead-lock. Let's make sure the service is always processed// by the same thread thus the same Selector.// socketReadThread[incrementAndGet()].addSocketServicePriv(s);//如果IOService是等待读的状态,那个根据 % 算法均衡到不同的线程中去if (s.waitingToRead()) {socketReadThread[s.hashCode() % socketReadThread.length].addSocketServicePriv(s);}//如果是待发送的IOService则根据 % 算法负载到写线程中去if (s.waitingToSend()) {socketWriteThread[s.hashCode() % socketWriteThread.length].addSocketServicePriv(s);}}/*** 核心移除IOService方法* @param s*/public static void removeSocketService(IOService<Object> s) {//设置就绪状态falses.setSocketServiceReady(false);//读写线程中都移除这个IOService对象socketReadThread[s.hashCode() % socketReadThread.length].removeSocketServicePriv(s);socketWriteThread[s.hashCode() % socketWriteThread.length].removeSocketServicePriv(s);}

两个都是静态方法,其核心原理就是用过hash 和取模算法分配IOService到SocketThread数组中的不同SocketThread线程中去。

SocketThread的run方法分析

public void run() {while (!stopping) {try {clientsSel.select();Set<SelectionKey> selected = clientsSel.selectedKeys();int selectedKeys = selected.size();//就绪的Key为0 且 等待的IOService为0if ((selectedKeys == 0) && (waiting.size() == 0)) {//重新创建Selector条件if ((++empty_selections) > MAX_EMPTY_SELECTIONS) {recreateSelector();}} else {empty_selections = 0;if (selectedKeys > 0) {//遍历就绪的Keyfor (SelectionKey sk : selected) {//从就绪的Key中拿到IOService对象IOService s = (IOService) sk.attachment();try {...sk.cancel();//添加到“待完成” 跳表中forCompletion.add(s);} catch (CancelledKeyException e) {//异常就强制停止IOServices.forceStop();}}}clientsSel.selectNow();}//将waiting跳表中的IOService分类注册到当前SocketThread的Selector上addAllWaiting();IOService serv = null;//先是从waiting 跳表注册到到当前forCompletion,然后再从//forCompletion跳表拿到最小的一个不为空,//就丢到completionService中执行,执行完只需要take()就能拿到执行完的对象while ((serv = forCompletion.pollFirst()) != null) {completionService.submit(serv);}} catch (Exception e) {recreateSelector();}}}

SocketThread本身是一个线程,其核心的方法和流程就在run()方法里面,以上代码是我精简之后留下的核心代码,具体的逻辑就是循环将waiting跳表集合中的IOServie注册到Selector上,并监听就绪的IOService,将其添加到forCompletion跳表集合中,然后从forCompletion中挨个取出,提交到completionService线程池中。供线程池调用IOService核心的call()方法进行数据的读写。

小结

tiagse的IO线程处理模型充分利用多线程和单一职责的设计,ConnectionOpenThread负载客户端连接的建立、SocketThread线程组负责监听就绪的IO对象,以及读写IO的分类、缓冲等,CompletionService线程池负责执行IOService的call方法(主要是处理IO的读写操作),以及解耦执行和执行结果的获取操作。ResultsListener线程负责循环利用没有关闭的IOService对象。合理的线程模型设计,更有利于提高系统的效率。

相关文章:

  • 如何导出1寸分辨率为300及以上的照片?
  • TC3xx学习笔记-UCB BMHD使用详解(一)
  • 如何给GitHub项目提PR(踩坑记录
  • 【Linux网络】构建HTTP响应与请求处理系统 - HttpResponse从理解到实现
  • 目标检测原理简介
  • Linux系统编程之内存映射
  • AI编程方法第六弹:高效编码离不开编程者经验引导
  • 设计看似完美却测不过? Intra-Pair Skew 是「讯号完整性(Signal Integrity)」里最隐形的杀手
  • venv环境基础指令以及常见问题汇总(持续更新)
  • 《AI大模型趣味实战》智能Agent和MCP协议的应用实例:搭建一个能阅读DOC文件并实时显示润色改写过程的Python Flask应用
  • WPF之项目创建
  • Rule.resource作用说明
  • 安装docker,在docker上安装mysql,docker上安装nginx
  • 微信小程序,基于uni-app的轮播图制作,调用文件中图片
  • 加里·基尔代尔:CP/M之父与个人计算时代的先驱
  • 线程池(六):ThreadLocal相关知识详解
  • 移除元素(简单)
  • 游戏引擎学习第246天:将 Worker 上下文移到主线程创建
  • C语言中结构体的字节对齐的应用
  • WPF与C++ 动态库交互
  • 伊朗最大港口爆炸:26公里外都能听到,超七百人受伤,原因指向化学品储存
  • 孟泽:我们简化了历史,因此也简化了人性
  • 云南鲁甸县一河滩突然涨水致4死,有人在救人过程中遇难
  • 现场|贝聿铭上海大展:回到他建筑梦的初始之地
  • 政治局会议:持续稳定和活跃资本市场
  • 全国首个古文学习AI大模型在沪发布,可批阅古文翻译