当前位置: 首页 > news >正文

用 Go 实现一个轻量级并发任务调度器(支持限速)

前言

在日常开发中,我们经常会遇到这样的场景:

  • • 有一堆任务要跑(比如:发请求、处理数据、爬虫等)
  • • 不希望一次性全部跑完,担心打爆服务端或者被封
  • • 想要设置并发数、限速,还能控制任务重试、失败记录

那么,能不能用 Go 实现一个“轻量级的并发任务调度器”?——答案是:当然可以!

今天我们就来用 Go 从零实现一个可配置的任务调度器,支持:

  • • 最大并发数控制(worker pool)
  • • 每秒请求速率限制(rate limit)
  • • 简单的失败重试机制
  • • 支持结果收集与错误输出

效果展示

你可以像这样调用我们的调度器:

scheduler := NewScheduler(5, 10) // 并发 5,速率限制每秒 10 次for i := 0; i < 100; i++ {task := NewTask(func() error {// 模拟网络请求或业务逻辑fmt.Println("正在处理任务:", i)time.Sleep(300 * time.Millisecond)return nil})scheduler.Submit(task)
}scheduler.Wait()
fmt.Println("全部任务完成")

核心组件设计

1. 任务(Task)

我们将每个任务抽象为一个结构体:

type Task struct {fn   func() errorretry int
}

2. 调度器(Scheduler)

负责维护任务队列、worker、速率限制器:

type Scheduler struct {tasks       chan *Taskwg          sync.WaitGrouprateLimiter <-chan time.Time
}

实现代码

下面是完整实现(可以直接复制使用):

type Task struct {fn    func() errorretry int
}func NewTask(fn func() error) *Task {return &Task{fn: fn, retry: 3}
}type Scheduler struct {tasks       chan *Taskwg          sync.WaitGrouprateLimiter <-chan time.Time
}func NewScheduler(concurrency int, ratePerSecond int) *Scheduler {s := &Scheduler{tasks:       make(chan *Task, 100),rateLimiter: time.Tick(time.Second / time.Duration(ratePerSecond)),}for i := 0; i < concurrency; i++ {go s.worker()}return s
}func (s *Scheduler) Submit(task *Task) {s.wg.Add(1)s.tasks <- task
}func (s *Scheduler) worker() {for task := range s.tasks {<-s.rateLimiter // 限速err := task.fn()if err != nil && task.retry > 0 {fmt.Println("任务失败,重试中...")task.retry--s.Submit(task)} else if err != nil {fmt.Println("任务最终失败:", err)}s.wg.Done()}
}func (s *Scheduler) Wait() {s.wg.Wait()close(s.tasks)
}

实战应用场景

  • • 网络爬虫限速抓取
  • • 批量发送邮件/SMS/请求,防止接口限流
  • • 云服务任务调度、批量自动化操作
  • • 异步数据采集和聚合

总结

Go 的并发模型非常适合处理“海量任务 + 控制速率 + 错误重试”的需求。本篇实现的调度器非常轻量,适合作为基础组件集成到你自己的系统中。

如果你有更多需求,比如:

  • • 增加失败回调
  • • 支持超时控制
  • • 任务优先级
  • • 后台监控 dashboard

欢迎留言交流,我们可以继续升级这个任务调度器!


关注我,带你用 Go 写出更有趣的小工具!
如果你觉得这篇文章对你有帮助,别忘了点赞、收藏、转发哈~

相关文章:

  • 多线程编程的简单案例——单例模式[多线程编程篇(3)]
  • NFC 碰一碰发视频源码搭建,碰一碰发视频定制化开发技术
  • Redis 的指令执行方式:Pipeline、事务与 Lua 脚本的对比
  • ROS机器人一般用哪些传感器?
  • 初识Redis · 客户端“Hello world“
  • R 语言科研绘图 --- 饼状图-汇总
  • Yum镜像源
  • 中间件--ClickHouse-10--海量数据存储如何抉择ClickHouse和ES?
  • 【系统分析师】-软件工程
  • 【文件操作与IO】详细解析文件操作与IO (一)
  • 探索 Higress:下一代云原生 API 网关
  • 前端融合图片mask
  • 高级java每日一道面试题-2025年4月13日-微服务篇[Nacos篇]-Nacos如何处理网络分区情况下的服务可用性问题?
  • ubantu18.04(Hadoop3.1.3)之MapReduce编程
  • pnpm解决幽灵依赖问题
  • Model Context Protocol (MCP) 开放协议对医疗多模态数据整合的分析路径【附代码】
  • Kaamel隐私与安全分析报告:Microsoft Recall功能评估与风险控制
  • hadoop和Yarn的基本介绍
  • 使用Java动态数据生成PDF报告:简化您的报告导出流程
  • AI语音助手 React 组件使用js-audio-recorder实现,将获取到的语音转成base64发送给后端,后端接口返回文本内容
  • 新华视点丨广西抗旱一线调查
  • 宁德时代校友红利!副董事长给母校复旦豪捐10亿,曾毓群给交大捐近14亿
  • 巴勒斯坦民族权力机构主席:哈马斯必须移交武器
  • 金地集团:保交楼为经营的首要任务,将根据融资性现金流恢复程度等进行投资决策
  • 裁员15%、撤销132个机构,美国务院将全面重组
  • 金发科技去年净利增160%,机器人等新领域催生材料新需求