当前位置: 首页 > news >正文

完整的 .NET 6 分布式定时任务实现(Hangfire + Redis 分布式锁)

完整的 .NET 6 分布式定时任务实现(Hangfire + Redis 分布式锁)

以下是完整的解决方案,包含所有必要组件:

1. 基础设施层

1.1 分布式锁服务

// IDistributedLockService.cs
public interface IDistributedLockService
{ValueTask<IAsyncDisposable?> AcquireLockAsync(string resourceKey, TimeSpan expiryTime);
}// RedisDistributedLockService.cs
public class RedisDistributedLockService : IDistributedLockService
{private readonly IConnectionMultiplexer _redis;private readonly ILogger<RedisDistributedLockService> _logger;public RedisDistributedLockService(IConnectionMultiplexer redis,ILogger<RedisDistributedLockService> logger){_redis = redis;_logger = logger;}public async ValueTask<IAsyncDisposable?> AcquireLockAsync(string resourceKey, TimeSpan expiryTime){var db = _redis.GetDatabase();var lockToken = Guid.NewGuid().ToString();var lockKey = $"distributed-lock:{resourceKey}";try{var acquired = await db.LockTakeAsync(lockKey, lockToken, expiryTime);if (acquired){_logger.LogDebug("成功获取分布式锁 {LockKey}", lockKey);return new RedisLockHandle(db, lockKey, lockToken, _logger);}_logger.LogDebug("无法获取分布式锁 {LockKey}", lockKey);return null;}catch (Exception ex){_logger.LogError(ex, "获取分布式锁 {LockKey} 时发生错误", lockKey);throw;}}private sealed class RedisLockHandle : IAsyncDisposable{private readonly IDatabase _db;private readonly string _lockKey;private readonly string _lockToken;private readonly ILogger _logger;private bool _isDisposed;public RedisLockHandle(IDatabase db,string lockKey,string lockToken,ILogger logger){_db = db;_lockKey = lockKey;_lockToken = lockToken;_logger = logger;}public async ValueTask DisposeAsync(){if (_isDisposed) return;try{var released = await _db.LockReleaseAsync(_lockKey, _lockToken);if (!released){_logger.LogWarning("释放分布式锁 {LockKey} 失败", _lockKey);}else{_logger.LogDebug("成功释放分布式锁 {LockKey}", _lockKey);}}catch (Exception ex){_logger.LogError(ex, "释放分布式锁 {LockKey} 时发生错误", _lockKey);}finally{_isDisposed = true;}}}
}

2. 任务服务层

2.1 定时任务服务

// IPollingService.cs
public interface IPollingService
{Task ExecutePollingTasksAsync();Task ExecuteDailyTaskAsync(int hour);
}// PollingService.cs
public class PollingService : IPollingService
{private readonly IDistributedLockService _lockService;private readonly ILogger<PollingService> _logger;public PollingService(IDistributedLockService lockService,ILogger<PollingService> logger){_lockService = lockService;_logger = logger;}[DisableConcurrentExecution(timeoutInSeconds: 60 * 30)] // 30分钟防并发public async Task ExecutePollingTasksAsync(){await using var lockHandle = await _lockService.AcquireLockAsync("polling-tasks-lock",TimeSpan.FromMinutes(25)); // 锁有效期25分钟if (lockHandle is null){_logger.LogInformation("其他节点正在执行轮询任务,跳过本次执行");return;}try{_logger.LogInformation("开始执行轮询任务 - 节点: {NodeId}", Environment.MachineName);// 执行所有轮询任务await Task.WhenAll(PollingTaskAsync(),PollingExpireTaskAsync(),PollingExpireDelCharactTaskAsync());// 触发后台任务_ = BackgroundTask.Run(() => PollingDelCharactTaskAsync(), _logger);_ = BackgroundTask.Run(() => AutoCheckApiAsync(), _logger);_ = BackgroundTask.Run(() => DelLogsAsync(), _logger);}catch (Exception ex){_logger.LogError(ex, "执行轮询任务时发生错误");throw;}}[DisableConcurrentExecution(timeoutInSeconds: 60 * 60)] // 1小时防并发public async Task ExecuteDailyTaskAsync(int hour){var lockKey = $"daily-task-{hour}:{DateTime.UtcNow:yyyyMMdd}";await using var lockHandle = await _lockService.AcquireLockAsync(lockKey,TimeSpan.FromMinutes(55)); // 锁有效期55分钟if (lockHandle is null){_logger.LogInformation("其他节点已执行今日 {Hour} 点任务", hour);return;}try{_logger.LogInformation("开始执行 {Hour} 点任务 - 节点: {NodeId}", hour, Environment.MachineName);if (hour == 21){await ExecuteNightlyMaintenanceAsync();}else if (hour == 4){await ExecuteEarlyMorningTasksAsync();}}catch (Exception ex){_logger.LogError(ex, "执行 {Hour} 点任务时发生错误", hour);throw;}}// 具体任务实现方法private async Task PollingTaskAsync(){// 实现游戏角色启动/关闭逻辑}private async Task ExecuteNightlyMaintenanceAsync(){// 21点特殊任务逻辑}// 其他方法...
}// BackgroundTask.cs (安全运行后台任务)
public static class BackgroundTask
{public static Task Run(Func<Task> task, ILogger logger){return Task.Run(async () =>{try{await task();}catch (Exception ex){logger.LogError(ex, "后台任务执行失败");}});}
}

3. 任务调度配置层

3.1 任务初始化器

// RecurringJobInitializer.cs
public class RecurringJobInitializer : IHostedService
{private readonly IRecurringJobManager _jobManager;private readonly IServiceProvider _services;private readonly ILogger<RecurringJobInitializer> _logger;public RecurringJobInitializer(IRecurringJobManager jobManager,IServiceProvider services,ILogger<RecurringJobInitializer> logger){_jobManager = jobManager;_services = services;_logger = logger;}public Task StartAsync(CancellationToken cancellationToken){try{using var scope = _services.CreateScope();var pollingService = scope.ServiceProvider.GetRequiredService<IPollingService>();// 每30分钟执行的任务_jobManager.AddOrUpdate<IPollingService>("polling-tasks-30min",s => s.ExecutePollingTasksAsync(),"*/30 * * * *");// 每天21:00执行的任务_jobManager.AddOrUpdate<IPollingService>("daily-task-21:00",s => s.ExecuteDailyTaskAsync(21),"0 21 * * *");// 每天04:00执行的任务_jobManager.AddOrUpdate<IPollingService>("daily-task-04:00",s => s.ExecuteDailyTaskAsync(4),"0 4 * * *");_logger.LogInformation("周期性任务初始化完成");}catch (Exception ex){_logger.LogError(ex, "初始化周期性任务失败");throw;}return Task.CompletedTask;}public Task StopAsync(CancellationToken cancellationToken) => Task.CompletedTask;
}

4. 应用启动配置

4.1 Program.cs

var builder = WebApplication.CreateBuilder(args);// 添加Redis
builder.Services.AddSingleton<IConnectionMultiplexer>(sp => ConnectionMultiplexer.Connect(builder.Configuration.GetConnectionString("Redis")));// 配置Hangfire
builder.Services.AddHangfire(config =>
{config.UseRedisStorage(builder.Configuration.GetConnectionString("Redis"),new RedisStorageOptions{Prefix = "hangfire:",Db = 1 // 使用单独的Redis数据库});config.UseColouredConsoleLogProvider();
});builder.Services.AddHangfireServer(options =>
{options.ServerName = $"{Environment.MachineName}:{Guid.NewGuid():N}";options.WorkerCount = 1;options.Queues = new[] { "default", "critical" };
});// 注册服务
builder.Services.AddSingleton<IDistributedLockService, RedisDistributedLockService>();
builder.Services.AddScoped<IPollingService, PollingService>();
builder.Services.AddHostedService<RecurringJobInitializer>();var app = builder.Build();// 配置Hangfire仪表盘
app.UseHangfireDashboard("/jobs", new DashboardOptions
{DashboardTitle = "任务调度中心",Authorization = new[] { new HangfireDashboardAuthorizationFilter() },StatsPollingInterval = 60_000 // 60秒刷新一次
});app.Run();// Hangfire仪表盘授权过滤器
public class HangfireDashboardAuthorizationFilter : IDashboardAuthorizationFilter
{public bool Authorize(DashboardContext context){var httpContext = context.GetHttpContext();return httpContext.User.Identity?.IsAuthenticated == true;}
}

5. appsettings.json 配置

{"ConnectionStrings": {"Redis": "localhost:6379,allowAdmin=true","Hangfire": "Server=(localdb)\\mssqllocaldb;Database=Hangfire;Trusted_Connection=True;"},"Hangfire": {"WorkerCount": 1,"SchedulePollingInterval": 5000}
}

关键设计说明

  1. 分布式锁

    • 使用Redis RedLock算法实现
    • 自动处理锁的获取和释放
    • 包含完善的错误处理和日志记录
  2. 任务隔离

    • 使用Hangfire的[DisableConcurrentExecution]防止同一任务重复执行
    • 分布式锁确保跨节点唯一执行
  3. 错误处理

    • 所有关键操作都有try-catch和日志记录
    • 后台任务使用安全包装器执行
  4. 可观测性

    • 详细的日志记录
    • Hangfire仪表盘监控
  5. 扩展性

    • 可以轻松添加新任务
    • 支持动态调整调度策略

这个实现方案完全符合.NET 6的最佳实践,支持分布式部署,确保任务在集群环境中安全可靠地执行。

相关文章:

  • 计算机视觉中,我们经常提到到训练pipeline是什么意思
  • leetcode 2364. 统计坏数对的数目 中等
  • RT-Thread开发文档合集
  • Python大小整数池及intern机制详解
  • 模块内聚:理解和优化模块设计的关键
  • Web3架构下的数据隐私与保护
  • 【unity实战】Animator启用root motion根运动动画,实现完美的动画动作匹配
  • 基于大模型的直肠息肉诊疗全流程风险预测与方案优化研究报告
  • Ethernet/IP转ProfiNet边缘计算网关在能源管理中的应用:跨系统数据聚合与智能分析
  • SQL SERVER里面也可以插入存储过程,操作TCP,WEBSOCKET吗?数据发生改变时用于通知客户端
  • 量子神经网络编译器开发指南:从理论突破到产业落地全景解析
  • AbMole推荐——肿瘤类器官加速癌症研究成果产出
  • 安全光幕的CE认证
  • 2025年人工智能指数报告(斯坦福)重点整理
  • 用ffmpeg 实现拉取h265的flv视频转存成264的mp4 实现方案
  • 智慧校园整体解决方案
  • 番外篇 | SEAM-YOLO:引入SEAM系列注意力机制,提升遮挡小目标的检测性能
  • 小说阅读器 ebook-reader
  • Silverlight发展历程(微软2021年已经停止支持Silverlight 5)
  • ESP32之本地HTTP服务器OTA固件升级流程,基于VSCode环境下的ESP-IDF开发(附源码)
  • 成都市政府秘书长王忠诚调任遂宁市委副书记
  • 著名作家、中国艺术研究院原常务副院长曲润海逝世
  • 智慧菜场团标试验:标准化的同时还能保留个性化吗?
  • 特朗普就防卫负担施压日本,石破茂:防卫费应由我们自主决定
  • 老年人越“懒”越健康,特别是这5种“懒”
  • 左眼失明左耳失聪,办理残疾人证被拒?县残联:双眼残疾才能办