当前位置: 首页 > news >正文

HashedWheelTimer源码分析

前言

Java Timer 是个单线程的基于小根堆结构来维护延时任务的调度器,任务的调度和执行在同一个线程上,任务之间会相互影响,同时基于数组实现的小根堆也不适合维护大量任务。

之后,Java 又推出了多线程版本的 ScheduledThreadPoolExecutor,性能更好,资源利用率更高,因为是多线程,任务之间的影响也相对较小,同时对异常进行了捕获,不会因为单个任务执行异常而退出线程。但是,基于数组实现的小根堆同样不适合维护大量任务,任务数一旦多起来,对内存的压力以及堆的维护开销都不容小觑。其次,延时任务的调度和执行还是在同一线程,并没有隔离开来。

延时任务的实现,还有一种时间轮算法,用来解决传统定时器在大量任务时的性能问题,它非常高效,尤其在处理大量短时定时任务时,时间复杂度接近O(1),代价是要牺牲一定的时间精度。

Netty 的 HashedWheelTimer 就是时间轮算法的一种实现,Netty 内部在它来处理 连接超时、空闲检测等任务。

时间轮算法

时间轮算法的思想:如图所示,圆环就是一个时间轮,它共有 8 个刻度,假设每个刻度代表一秒钟。延时任务会根据延迟时间添加到时间轮对应的刻度上。Data1、Data2 延迟时间都是一秒,所以被添加到刻度1上;Data4 延迟时间 14 秒,饶了一圈后被添加到刻度6上。同时,会有一个指向当前时间刻度的指针,沿着时间轮顺时针旋转,指针每秒前进一个刻度,并把当前刻度上所有的延迟时间已到的延时任务全部执行一遍。

时间轮算法的核心是,仅靠单个线程,就可以完成大量任务的调度。因为任务被分配到不同的Bucket上,线程调度时只关心当前Bucket上有哪些任务到期,性能也会更好。

HashedWheelTimer

HashedWheelTimer的构造函数以及各参数含义

public HashedWheelTimer(ThreadFactory threadFactory,long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,long maxPendingTimeouts, Executor taskExecutor) {checkNotNull(threadFactory, "threadFactory");checkNotNull(unit, "unit");checkPositive(tickDuration, "tickDuration");checkPositive(ticksPerWheel, "ticksPerWheel");this.taskExecutor = checkNotNull(taskExecutor, "taskExecutor");// Normalize ticksPerWheel to power of two and initialize the wheel.wheel = createWheel(ticksPerWheel);mask = wheel.length - 1;// Convert tickDuration to nanos.long duration = unit.toNanos(tickDuration);// Prevent overflow.if (duration >= Long.MAX_VALUE / wheel.length) {throw new IllegalArgumentException(String.format("tickDuration: %d (expected: 0 < tickDuration in nanos < %d",tickDuration, Long.MAX_VALUE / wheel.length));}if (duration < MILLISECOND_NANOS) {logger.warn("Configured tickDuration {} smaller than {}, using 1ms.",tickDuration, MILLISECOND_NANOS);this.tickDuration = MILLISECOND_NANOS;} else {this.tickDuration = duration;}workerThread = threadFactory.newThread(worker);leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null;this.maxPendingTimeouts = maxPendingTimeouts;if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {reportTooManyInstances();}
}
  • threadFactory 创建Worker线程的工厂
  • tickDuration 时间轮的间隔时间,最小精确到1毫秒
  • unit 时间轮的间隔时间单位
  • ticksPerWheel 时间轮的大小,默认512
  • leakDetection 是否开启内存泄漏检测
  • maxPendingTimeouts 最大等待任务数,默认不限制
  • taskExecutor 任务执行器,如果异步执行任务,那么调度线程和执行线程就隔离开了,更加利于任务的调度

构造函数流程:

  1. createWheel 创建时间轮,也就是 HashedWheelBucket 数组,数组的大小必须是2的幂次方大小
  2. 计算 mask 掩码,便于计算当前要处理的时间轮下标
  3. 时间轮的调度间隔 tickDuration 转换为纳秒单位,间隔不可小于1毫秒
  4. 通过线程工厂创建工作线程 workerThread
  5. 判断是否要开启内存泄漏检测,开启则创建一个 ResourceLeakTracker
  6. 判断时间轮的实例数是否超过64个,超过则日志告警

createWheel 用来创建时间轮,底层就是个环形数组,数组的长度必须是2的幂次方数。

private static HashedWheelBucket[] createWheel(int ticksPerWheel) {checkInRange(ticksPerWheel, 1, 1073741824, "ticksPerWheel");
ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
for (int i = 0; i < wheel.length; i ++) {wheel[i] = new HashedWheelBucket();
}
return wheel;
}

数组中的元素是 HashedWheelBucket,用来存储落在同一刻度上的任务,底层是双向链表实现。

private static final class HashedWheelBucket {private HashedWheelTimeout head; // 头节点private HashedWheelTimeout tail; // 尾节点
}

延时任务提交 newTimeout,任务被封装成 TimerTask,同时提供任务延迟执行的时间

public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {checkNotNull(task, "task");checkNotNull(unit, "unit");long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {pendingTimeouts.decrementAndGet();throw new RejectedExecutionException("Number of pending timeouts ("+ pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "+ "timeouts (" + maxPendingTimeouts + ")");}start();long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;if (delay > 0 && deadline < 0) {deadline = Long.MAX_VALUE;}HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);timeouts.add(timeout);return timeout;
}
  1. pendingTimeouts 代表等待执行的任务数量,首先是检查任务数是否超过阈值,超过则直接抛出拒绝异常。默认是不限制的,需要注意是否有足够的内存来容纳这些任务。
  2. 接着 start 开启线程,Java Timer 实例化时就启动线程,HashedWheelTimer 则是懒启动,更利于资源。
  3. Worker线程启动后,会把启动时间赋值给 startTime,延时任务的 deadline 就是当前时间+延迟时间-startTime
  4. 最后把任务封装成 HashedWheelTimeout,加入到 timeouts 队列。

注意,此时任务只是加入到 timeouts 队列,还没有加入到时间轮的Bucket中。

Worker线程启动后,执行的 Runnable 是 内部类 Worker,变量 tick 代表时间轮运行的指针,也就是根据 tick 计算当前要处理的 Bucket。

private final class Worker implements Runnable {private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();private long tick;@Overridepublic void run() {// Initialize the startTime.startTime = System.nanoTime();if (startTime == 0) {// We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized.startTime = 1;}// Notify the other threads waiting for the initialization at start().startTimeInitialized.countDown();do {final long deadline = waitForNextTick();if (deadline > 0) {int idx = (int) (tick & mask);processCancelledTasks();HashedWheelBucket bucket =wheel[idx];transferTimeoutsToBuckets();bucket.expireTimeouts(deadline);tick++;}} while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);// Fill the unprocessedTimeouts so we can return them from stop() method.for (HashedWheelBucket bucket: wheel) {bucket.clearTimeouts(unprocessedTimeouts);}for (;;) {HashedWheelTimeout timeout = timeouts.poll();if (timeout == null) {break;}if (!timeout.isCancelled()) {unprocessedTimeouts.add(timeout);}}processCancelledTasks();}
}

Worker#run 流程:

  1. 先根据 tickDuration 和 tick 计算要睡眠的时间,线程sleep
  2. 时间一到,根据 (tick & mask) 计算要处理的 Bucket
  3. processCancelledTasks 先移除掉期间已经被取消的任务
  4. transferTimeoutsToBuckets 把先前入到 timeouts 队列里的任务,迁移到 Bucket 里
  5. bucket.expireTimeouts 把 Bucket 里已经到期的任务执行并移除
  6. 时间轮关闭后,把没执行的任务添加到 unprocessedTimeouts,以便在 stop 中返回

transferTimeoutsToBuckets 会把 timeouts 中的任务迁移到对应 Bucket

private void transferTimeoutsToBuckets() {for (int i = 0; i < 100000; i++) {HashedWheelTimeout timeout = timeouts.poll();if (timeout == null) {// all processedbreak;}if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {// Was cancelled in the meantime.continue;}long calculated = timeout.deadline / tickDuration;timeout.remainingRounds = (calculated - tick) / wheel.length;final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past.int stopIndex = (int) (ticks & mask);HashedWheelBucket bucket = wheel[stopIndex];bucket.addTimeout(timeout);}
}
  1. 为了防止任务迁移耗时太久,导致Worker线程时间精度失效,限制最多迁移100000个任务
  2. 时间轮一圈的时间范围由 tickDuration 和 Bucket 数量决定,如果任务的到期时间超过一轮,属性 remainingRounds 就会记录剩余的轮数。时间轮调度时,对于 remainingRounds>0 的任务,仅会递减剩余的轮数,等待下一轮再调度执行

任务迁移到对应 Bucket 后,bucket#expireTimeouts 会处理当前 Bucket 里的任务

public void expireTimeouts(long deadline) {
HashedWheelTimeout timeout = head;
// process all timeouts
while (timeout != null) {HashedWheelTimeout next = timeout.next;if (timeout.remainingRounds <= 0) {next = remove(timeout);if (timeout.deadline <= deadline) {timeout.expire();} else {// The timeout was placed into a wrong slot. This should never happen.throw new IllegalStateException(String.format("timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));}} else if (timeout.isCancelled()) {next = remove(timeout);} else {timeout.remainingRounds --;}timeout = next;
}
}
  1. HashedWheelBucket 是双向链表,先根据 head 找到头节点,再向后扫描
  2. 如果任务的 remainingRounds>0 ,代表任务的延迟时间超过一轮,仅递减轮数,等待后续调度
  3. 如果任务已经被取消,则直接移除掉
  4. 否则代表任务到期,将其从Bucket里移除,再执行任务

尾巴

HashedWheelTimer 是 Netty 基于 时间轮算法‌ 实现的高效定时任务调度器,适用于处理大规模延迟任务或周期性任务(如超时检测、心跳等)。它将任务散列到时间轮的Bucket中,通过固定时间间隔推进轮子,实现任务的调度和执行。

它的特点是 适合处理大量的短时任务,效率非常高,代价是牺牲了一定的时间精度,任务的调度时间受到 tickDuration 的影响,不适用于高精度定时场景。

相关文章:

  • Scrapy框架爬虫官网的学习
  • OpenCV 图形API(55)颜色空间转换-----将图像从 RGB 色彩空间转换为 I420 格式函数RGB2I420()
  • 第九章:Agent Protocol Implementation
  • 香港云服务器内存使用率过高如何解决此问题
  • PH热榜 | 2025-04-23
  • 【金仓数据库征文】从 HTAP 到 AI 加速,KingbaseES 的未来之路
  • 《AI大模型应知应会100篇》第35篇:Prompt链式调用:解决复杂问题的策略
  • day4 pandas学习
  • godot源码编译
  • Oracle EBS R12.2 汉化
  • Java从入门到“放弃”(精通)之旅——String类⑩
  • C#学习1_认识项目/程序结构
  • SAP-SD创建SO时报错‘送达方***未对销售范围 **** ** **定义’
  • App自动化测试多设备并行执行方案
  • jumpserver应用
  • MinIO 教程:从入门到Spring Boot集成
  • 【maven-7.1】POM文件中的属性管理:提升构建灵活性与可维护性
  • VS 解决QT项目中文显示乱码问题
  • 一个报错说函数为私有函数
  • 使用 Oracle 数据库进行基于 JSON 的应用程序开发
  • GDP十强省份“一季报”出炉,湖北领跑
  • 税率飙至3500%!美国双反大棒重击东南亚光伏,中企如何应对
  • 外交部:中企在中韩暂定水域建立渔业养殖设施不违反中韩有关协定
  • 全国登记在册民营企业超过5700万户
  • 人民日报读者点题·共同关注:花粉过敏增多,如何看待城市绿化“成长的烦恼”
  • 北京理工大学解除宫某聘用关系,该教师被指涉嫌骚扰猥亵学生