HashedWheelTimer源码分析
前言
Java Timer 是个单线程的基于小根堆结构来维护延时任务的调度器,任务的调度和执行在同一个线程上,任务之间会相互影响,同时基于数组实现的小根堆也不适合维护大量任务。
之后,Java 又推出了多线程版本的 ScheduledThreadPoolExecutor,性能更好,资源利用率更高,因为是多线程,任务之间的影响也相对较小,同时对异常进行了捕获,不会因为单个任务执行异常而退出线程。但是,基于数组实现的小根堆同样不适合维护大量任务,任务数一旦多起来,对内存的压力以及堆的维护开销都不容小觑。其次,延时任务的调度和执行还是在同一线程,并没有隔离开来。
延时任务的实现,还有一种时间轮算法,用来解决传统定时器在大量任务时的性能问题,它非常高效,尤其在处理大量短时定时任务时,时间复杂度接近O(1),代价是要牺牲一定的时间精度。
Netty 的 HashedWheelTimer 就是时间轮算法的一种实现,Netty 内部在它来处理 连接超时、空闲检测等任务。
时间轮算法
时间轮算法的思想:如图所示,圆环就是一个时间轮,它共有 8 个刻度,假设每个刻度代表一秒钟。延时任务会根据延迟时间添加到时间轮对应的刻度上。Data1、Data2 延迟时间都是一秒,所以被添加到刻度1上;Data4 延迟时间 14 秒,饶了一圈后被添加到刻度6上。同时,会有一个指向当前时间刻度的指针,沿着时间轮顺时针旋转,指针每秒前进一个刻度,并把当前刻度上所有的延迟时间已到的延时任务全部执行一遍。
时间轮算法的核心是,仅靠单个线程,就可以完成大量任务的调度。因为任务被分配到不同的Bucket上,线程调度时只关心当前Bucket上有哪些任务到期,性能也会更好。
HashedWheelTimer
HashedWheelTimer的构造函数以及各参数含义
public HashedWheelTimer(ThreadFactory threadFactory,long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,long maxPendingTimeouts, Executor taskExecutor) {checkNotNull(threadFactory, "threadFactory");checkNotNull(unit, "unit");checkPositive(tickDuration, "tickDuration");checkPositive(ticksPerWheel, "ticksPerWheel");this.taskExecutor = checkNotNull(taskExecutor, "taskExecutor");// Normalize ticksPerWheel to power of two and initialize the wheel.wheel = createWheel(ticksPerWheel);mask = wheel.length - 1;// Convert tickDuration to nanos.long duration = unit.toNanos(tickDuration);// Prevent overflow.if (duration >= Long.MAX_VALUE / wheel.length) {throw new IllegalArgumentException(String.format("tickDuration: %d (expected: 0 < tickDuration in nanos < %d",tickDuration, Long.MAX_VALUE / wheel.length));}if (duration < MILLISECOND_NANOS) {logger.warn("Configured tickDuration {} smaller than {}, using 1ms.",tickDuration, MILLISECOND_NANOS);this.tickDuration = MILLISECOND_NANOS;} else {this.tickDuration = duration;}workerThread = threadFactory.newThread(worker);leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null;this.maxPendingTimeouts = maxPendingTimeouts;if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {reportTooManyInstances();}
}
- threadFactory 创建Worker线程的工厂
- tickDuration 时间轮的间隔时间,最小精确到1毫秒
- unit 时间轮的间隔时间单位
- ticksPerWheel 时间轮的大小,默认512
- leakDetection 是否开启内存泄漏检测
- maxPendingTimeouts 最大等待任务数,默认不限制
- taskExecutor 任务执行器,如果异步执行任务,那么调度线程和执行线程就隔离开了,更加利于任务的调度
构造函数流程:
- createWheel 创建时间轮,也就是 HashedWheelBucket 数组,数组的大小必须是2的幂次方大小
- 计算 mask 掩码,便于计算当前要处理的时间轮下标
- 时间轮的调度间隔 tickDuration 转换为纳秒单位,间隔不可小于1毫秒
- 通过线程工厂创建工作线程 workerThread
- 判断是否要开启内存泄漏检测,开启则创建一个 ResourceLeakTracker
- 判断时间轮的实例数是否超过64个,超过则日志告警
createWheel 用来创建时间轮,底层就是个环形数组,数组的长度必须是2的幂次方数。
private static HashedWheelBucket[] createWheel(int ticksPerWheel) {checkInRange(ticksPerWheel, 1, 1073741824, "ticksPerWheel");
ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
for (int i = 0; i < wheel.length; i ++) {wheel[i] = new HashedWheelBucket();
}
return wheel;
}
数组中的元素是 HashedWheelBucket,用来存储落在同一刻度上的任务,底层是双向链表实现。
private static final class HashedWheelBucket {private HashedWheelTimeout head; // 头节点private HashedWheelTimeout tail; // 尾节点
}
延时任务提交 newTimeout,任务被封装成 TimerTask,同时提供任务延迟执行的时间
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {checkNotNull(task, "task");checkNotNull(unit, "unit");long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {pendingTimeouts.decrementAndGet();throw new RejectedExecutionException("Number of pending timeouts ("+ pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "+ "timeouts (" + maxPendingTimeouts + ")");}start();long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;if (delay > 0 && deadline < 0) {deadline = Long.MAX_VALUE;}HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);timeouts.add(timeout);return timeout;
}
- pendingTimeouts 代表等待执行的任务数量,首先是检查任务数是否超过阈值,超过则直接抛出拒绝异常。默认是不限制的,需要注意是否有足够的内存来容纳这些任务。
- 接着 start 开启线程,Java Timer 实例化时就启动线程,HashedWheelTimer 则是懒启动,更利于资源。
- Worker线程启动后,会把启动时间赋值给 startTime,延时任务的 deadline 就是当前时间+延迟时间-startTime
- 最后把任务封装成 HashedWheelTimeout,加入到 timeouts 队列。
注意,此时任务只是加入到 timeouts 队列,还没有加入到时间轮的Bucket中。
Worker线程启动后,执行的 Runnable 是 内部类 Worker,变量 tick 代表时间轮运行的指针,也就是根据 tick 计算当前要处理的 Bucket。
private final class Worker implements Runnable {private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();private long tick;@Overridepublic void run() {// Initialize the startTime.startTime = System.nanoTime();if (startTime == 0) {// We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized.startTime = 1;}// Notify the other threads waiting for the initialization at start().startTimeInitialized.countDown();do {final long deadline = waitForNextTick();if (deadline > 0) {int idx = (int) (tick & mask);processCancelledTasks();HashedWheelBucket bucket =wheel[idx];transferTimeoutsToBuckets();bucket.expireTimeouts(deadline);tick++;}} while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);// Fill the unprocessedTimeouts so we can return them from stop() method.for (HashedWheelBucket bucket: wheel) {bucket.clearTimeouts(unprocessedTimeouts);}for (;;) {HashedWheelTimeout timeout = timeouts.poll();if (timeout == null) {break;}if (!timeout.isCancelled()) {unprocessedTimeouts.add(timeout);}}processCancelledTasks();}
}
Worker#run 流程:
- 先根据 tickDuration 和 tick 计算要睡眠的时间,线程sleep
- 时间一到,根据 (tick & mask) 计算要处理的 Bucket
- processCancelledTasks 先移除掉期间已经被取消的任务
- transferTimeoutsToBuckets 把先前入到 timeouts 队列里的任务,迁移到 Bucket 里
- bucket.expireTimeouts 把 Bucket 里已经到期的任务执行并移除
- 时间轮关闭后,把没执行的任务添加到 unprocessedTimeouts,以便在 stop 中返回
transferTimeoutsToBuckets 会把 timeouts 中的任务迁移到对应 Bucket
private void transferTimeoutsToBuckets() {for (int i = 0; i < 100000; i++) {HashedWheelTimeout timeout = timeouts.poll();if (timeout == null) {// all processedbreak;}if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {// Was cancelled in the meantime.continue;}long calculated = timeout.deadline / tickDuration;timeout.remainingRounds = (calculated - tick) / wheel.length;final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past.int stopIndex = (int) (ticks & mask);HashedWheelBucket bucket = wheel[stopIndex];bucket.addTimeout(timeout);}
}
- 为了防止任务迁移耗时太久,导致Worker线程时间精度失效,限制最多迁移100000个任务
- 时间轮一圈的时间范围由 tickDuration 和 Bucket 数量决定,如果任务的到期时间超过一轮,属性 remainingRounds 就会记录剩余的轮数。时间轮调度时,对于 remainingRounds>0 的任务,仅会递减剩余的轮数,等待下一轮再调度执行
任务迁移到对应 Bucket 后,bucket#expireTimeouts 会处理当前 Bucket 里的任务
public void expireTimeouts(long deadline) {
HashedWheelTimeout timeout = head;
// process all timeouts
while (timeout != null) {HashedWheelTimeout next = timeout.next;if (timeout.remainingRounds <= 0) {next = remove(timeout);if (timeout.deadline <= deadline) {timeout.expire();} else {// The timeout was placed into a wrong slot. This should never happen.throw new IllegalStateException(String.format("timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));}} else if (timeout.isCancelled()) {next = remove(timeout);} else {timeout.remainingRounds --;}timeout = next;
}
}
- HashedWheelBucket 是双向链表,先根据 head 找到头节点,再向后扫描
- 如果任务的 remainingRounds>0 ,代表任务的延迟时间超过一轮,仅递减轮数,等待后续调度
- 如果任务已经被取消,则直接移除掉
- 否则代表任务到期,将其从Bucket里移除,再执行任务
尾巴
HashedWheelTimer 是 Netty 基于 时间轮算法 实现的高效定时任务调度器,适用于处理大规模延迟任务或周期性任务(如超时检测、心跳等)。它将任务散列到时间轮的Bucket中,通过固定时间间隔推进轮子,实现任务的调度和执行。
它的特点是 适合处理大量的短时任务,效率非常高,代价是牺牲了一定的时间精度,任务的调度时间受到 tickDuration 的影响,不适用于高精度定时场景。