当前位置: 首页 > news >正文

Python代码片段-断点任务

使用Python处理一堆长耗时任务的时候,为了防止异常退出程序或者手动退出程序后丢失任务进度,可用使用断点的方式记录任务进度,下次重载任务后,继续运行上次未完成的任务即可。

这里用json文件作为数据持久化的方式,免去了使用数据库来作为持久的依赖问题,为了一个小的任务来搭建一套数据库,耗时耗力,得不偿失,不如选择本地文件来做简单的持久化,就算丢失部分任务进度,少数任务重新执行所消耗的时间完全处于容忍范围内。另外使用json文件记录任务进度,可用修改json文件来人为干预任务的执行情况,也是不错的一个优点

import json
import time
import os


class Task:
    def __init__(self, task_file='task_progress.json'):
        self.task_file = task_file
        self.progress = []
        if os.path.exists(self.task_file):
            self.load_progress()

    def load_progress(self):
        """加载任务进度"""
        with open(self.task_file, 'r') as f:
            data = json.load(f)
            self.progress = data.get('progress', 0)

    def save_progress(self):
        """保存任务进度"""
        data = {'progress': self.progress}
        with open(self.task_file, 'w') as f:
            json.dump(data, f,indent=4, separators=(',', ': '))
            f.flush()
            os.fsync(f.fileno())  # 确保数据已写入磁盘

    def execute_task(self):

        legacy_tasks = [t  for t in self.progress if t['finish'] ==0 ]

        for i, v in enumerate(legacy_tasks):
            print(f"执行任务 {v['key']} ...")
            time.sleep(3)  # 模拟任务执行的时间
            self.mark_finished( v['key']) # 更行进度
            self.save_progress()  # 每次执行后保存进度
            print(f"任务 {v['key']} 完成。")

        print("任务完成!")

    def flush_progresses(self):
        tasks = [
            {"key":"xxx1"},
            {"key":"xxx2"},
            {"key":"xxx3"}
        ]

        self.progress = [ {"key" : t['key'], 'finish':0} for t in tasks]
        self.save_progress()

    def mark_finished(self, key):
        """标记完成"""
        print(key)
        for i, t in enumerate(self.progress) :
            if t ['key'] ==  key :
                self.progress[i]['finish'] = 1
                break
        self.save_progress()
    
    def count(self):
        """查询未完成"""
        return len( [t for t in self.progress if t['finish'] == 0])



if __name__ == "__main__":
    task = Task()
    # 第一次生成任务列表,下次任务时候注释掉此行
    task.flush_progresses()
    try:
        task.execute_task()
    except KeyboardInterrupt:
        print("\n任务被中断,进度已保存。")

相关文章:

  • Linux常见问题
  • 算法日记27:完全背包(DFS->记忆化搜索->倒叙DP->顺序DP->空间优化)
  • Unity Android SDK 升级、安装 build-tools、platform-tools
  • 【HeadFirst系列之HeadFirstJava】第5天之超强力方法 —— 从战舰游戏到循环控制
  • 【C语言】指针(6)
  • 通俗理解什么是云原生?
  • Spring Boot 3 集成 RabbitMQ 实践指南
  • 《操作系统 - 清华大学》 8 -6:进程管理:进程状态变化模型
  • 3、优先级翻转问题
  • Ubuntu中部署deepseek
  • 【漫话机器学习系列】101.特征选择法之Lasso(Lasso For Feature Selection)
  • 离子阱量子计算机的原理与应用:开辟量子计算的新天地
  • 代码随想录|62.不同路径,63.不同路径Ⅱ,343.整数拆分
  • 论文笔记(七十二)Reward Centering(四)
  • Linux系统移植之对NXP的Uboot修改后移植
  • 给SQL server数据库表字段添加注释SQL,附修改、删除注释SQL及演示
  • Comfyui Windows Desktop桌面版便携版安装教程
  • 深入了解 MySQL 中的 JSON_CONTAINS
  • com库原理使用
  • Python 环境管理介绍
  • 苏州一季度GDP为6095.68亿元,同比增长6%
  • 【社论】优化限购限行,激发汽车消费潜能
  • 日韩 “打头阵”与美国贸易谈判,汽车、半导体产业忧虑重重
  • 找化学的答案,解人类的命题:巴斯夫的“变革者”成长之道
  • 专业竞演、剧场LIVE直播,32位越剧新星逐梦上海
  • 王庆成:儒家、墨家和洪秀全的“上帝”