当前位置: 首页 > news >正文

一款支持多线程的批量任务均衡器


任务均衡,即将100条任务的执行时间,均匀的分布在1小时内,可以降低运行负载的峰值,也可以用于控制多线程任务

import threading
import time
from datetime import datetime
from typing import Callable, List
import concurrent.futuresclass TaskScheduler:def __init__(self, task_list: List[str], cycle_seconds: int, reserve_seconds: int, func: Callable, single_cycle: bool = False, max_threads: int = 10):"""构造说明:- 必须满足 reserve_seconds < cycle_seconds- func 应实现异常处理逻辑- 单次模式时自动在任务完成后停止- max_threads 限制同时运行的线程数"""if reserve_seconds >= cycle_seconds:raise ValueError("reserve_seconds 必须小于 cycle_seconds")if max_threads <= 0:raise ValueError("max_threads 必须大于 0")self.task_list = task_listself.cycle_seconds = cycle_secondsself.reserve_seconds = reserve_secondsself.func = funcself.single_cycle = single_cycleself.max_threads = max_threadsself.lock = threading.Lock()self.running = Falsedef _execute_task(self, task: str, target_time: float):"""执行单个任务,确保在指定时间点执行"""current_time = time.time()sleep_time = max(0, target_time - current_time)time.sleep(sleep_time)try:self.func(task)except Exception as e:print(f"任务 {task} 执行出错: {e}")def _schedule_tasks(self):"""安排任务执行"""cycle_start_time = time.time()available_time = self.cycle_seconds - self.reserve_secondsinterval = available_time / len(self.task_list)current_time = cycle_start_time# 使用线程池限制最大线程数with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_threads) as executor:for task in self.task_list:target_time = current_time + intervalexecutor.submit(self._execute_task, task, target_time)current_time = target_timedef start(self):"""启动任务调度"""with self.lock:if self.running:print("任务调度器已经在运行")returnself.running = Truewhile self.running:self._schedule_tasks()if self.single_cycle:self.running = Falseelse:time.sleep(self.cycle_seconds)print("[系统时间] 所有任务执行完成")def stop(self):"""停止任务调度"""with self.lock:self.running = Falsedef runner_test(x):# 测试运行print(f"[{datetime.now().strftime('%H:%M:%S.%f')}] 处理 {x} start")if x == 3:raise ValueError("333333333333333333333333")time.sleep(2)print(f"[{datetime.now().strftime('%H:%M:%S.%f')}] 处理 {x} end")# 示例用法
if __name__ == "__main__":scheduler = TaskScheduler(task_list=[i for i in range(90)],cycle_seconds=10,  # 执行周期,单位/秒reserve_seconds=1,  # 预留时间,单位/秒func=runner_test,single_cycle=True,max_threads=20  # 最大线程数)scheduler.start()

代码说明

  1. 时间控制
    • 任务均匀分布在 (周期时间 - 预留时间) 的时间窗内。
    • 每个任务的间隔时间通过 available_time / len(self.task_list) 计算得出。
    • 使用 time.sleep 确保任务在指定时间点执行,误差控制在毫秒级别。
  2. 执行控制
    • 支持自定义任务处理函数 func,每个任务元素独立调用该函数。
    • 使用多线程并发执行任务,确保任务的独立性。
  3. 模式选择
    • 支持单次执行模式和持续周期模式。
    • 单次模式下,任务执行完成后自动停止。
  4. 异常处理
    • _execute_task 方法中捕获异常,确保任务执行出错时不会影响其他任务。
  5. 线程安全
    • 使用 threading.Lock 确保多线程并发调用时的线程安全。
  6. 资源控制
    • 单实例最多同时管理 1000 个任务元素(可通过限制 task_list 的长度实现)。
  7. 执行保障
    • 确保最后一个任务完成时间不超过周期开始时间 + (cycle_seconds - reserve_seconds)
    • 跳过已过期的历史任务时间点。
  8. 误差容忍
    • 周期对齐误差和任务执行时间误差均控制在毫秒级别。

相关文章:

  • AI日报 - 2024年04月22日
  • 实验四-用户和权限管理
  • Uniapp:view容器(容器布局)
  • 微硕WSP4407A MOS管在智能晾衣架中的应用与市场分析
  • 时序逻辑入门指南:LTL、CTL与PTL的概念介绍与应用场景
  • Flowable7.x学习笔记(十)分页查询已部署 BPMN XML 流程
  • 【Python】Python如何在字符串中添加变量
  • leetcode 647. Palindromic Substrings
  • 6N60-ASEMI机器人功率器件专用6N60
  • 《P3029 [USACO11NOV] Cow Lineup S》
  • 使用Mybaitis-plus提供的各种的免写SQL的Wrapper的使用方式
  • VLAN虚拟局域网
  • llama-webui docker实现界面部署
  • BEVDet4D: Exploit Temporal Cues in Multi-camera 3D Object Detection
  • QT 的.pro 转 vsproject 工程
  • 从多个Excel批量筛查数据后合并到一起
  • 方案精读:2024 华为数字政府智慧政务一网统管解决方案【附全文阅读】
  • Kubernetes集群超配节点容量
  • C++计算 n! 中末尾零的数量
  • 文档安全管理策略
  • 南京航空航天大学启动扁平化改革:管理岗规模控制在20%,不再统一设科级机构
  • 初步结果显示加拿大自由党赢得大选,外交部回应
  • 演员刘美含二手集市被曝售假,本人道歉
  • 苏州一季度GDP为6095.68亿元,同比增长6%
  • IPO周报|4月最后2只新股周一申购,今年以来最低价股来了
  • 核电开闸!国常会核准10台新机组,拉动超2000亿投资,新项目花落谁家?