完整的 .NET 6 分布式定时任务实现(Hangfire + Redis 分布式锁)
完整的 .NET 6 分布式定时任务实现(Hangfire + Redis 分布式锁)
以下是完整的解决方案,包含所有必要组件:
1. 基础设施层
1.1 分布式锁服务
// IDistributedLockService.cs
public interface IDistributedLockService
{ValueTask<IAsyncDisposable?> AcquireLockAsync(string resourceKey, TimeSpan expiryTime);
}// RedisDistributedLockService.cs
public class RedisDistributedLockService : IDistributedLockService
{private readonly IConnectionMultiplexer _redis;private readonly ILogger<RedisDistributedLockService> _logger;public RedisDistributedLockService(IConnectionMultiplexer redis,ILogger<RedisDistributedLockService> logger){_redis = redis;_logger = logger;}public async ValueTask<IAsyncDisposable?> AcquireLockAsync(string resourceKey, TimeSpan expiryTime){var db = _redis.GetDatabase();var lockToken = Guid.NewGuid().ToString();var lockKey = $"distributed-lock:{resourceKey}";try{var acquired = await db.LockTakeAsync(lockKey, lockToken, expiryTime);if (acquired){_logger.LogDebug("成功获取分布式锁 {LockKey}", lockKey);return new RedisLockHandle(db, lockKey, lockToken, _logger);}_logger.LogDebug("无法获取分布式锁 {LockKey}", lockKey);return null;}catch (Exception ex){_logger.LogError(ex, "获取分布式锁 {LockKey} 时发生错误", lockKey);throw;}}private sealed class RedisLockHandle : IAsyncDisposable{private readonly IDatabase _db;private readonly string _lockKey;private readonly string _lockToken;private readonly ILogger _logger;private bool _isDisposed;public RedisLockHandle(IDatabase db,string lockKey,string lockToken,ILogger logger){_db = db;_lockKey = lockKey;_lockToken = lockToken;_logger = logger;}public async ValueTask DisposeAsync(){if (_isDisposed) return;try{var released = await _db.LockReleaseAsync(_lockKey, _lockToken);if (!released){_logger.LogWarning("释放分布式锁 {LockKey} 失败", _lockKey);}else{_logger.LogDebug("成功释放分布式锁 {LockKey}", _lockKey);}}catch (Exception ex){_logger.LogError(ex, "释放分布式锁 {LockKey} 时发生错误", _lockKey);}finally{_isDisposed = true;}}}
}
2. 任务服务层
2.1 定时任务服务
// IPollingService.cs
public interface IPollingService
{Task ExecutePollingTasksAsync();Task ExecuteDailyTaskAsync(int hour);
}// PollingService.cs
public class PollingService : IPollingService
{private readonly IDistributedLockService _lockService;private readonly ILogger<PollingService> _logger;public PollingService(IDistributedLockService lockService,ILogger<PollingService> logger){_lockService = lockService;_logger = logger;}[DisableConcurrentExecution(timeoutInSeconds: 60 * 30)] // 30分钟防并发public async Task ExecutePollingTasksAsync(){await using var lockHandle = await _lockService.AcquireLockAsync("polling-tasks-lock",TimeSpan.FromMinutes(25)); // 锁有效期25分钟if (lockHandle is null){_logger.LogInformation("其他节点正在执行轮询任务,跳过本次执行");return;}try{_logger.LogInformation("开始执行轮询任务 - 节点: {NodeId}", Environment.MachineName);// 执行所有轮询任务await Task.WhenAll(PollingTaskAsync(),PollingExpireTaskAsync(),PollingExpireDelCharactTaskAsync());// 触发后台任务_ = BackgroundTask.Run(() => PollingDelCharactTaskAsync(), _logger);_ = BackgroundTask.Run(() => AutoCheckApiAsync(), _logger);_ = BackgroundTask.Run(() => DelLogsAsync(), _logger);}catch (Exception ex){_logger.LogError(ex, "执行轮询任务时发生错误");throw;}}[DisableConcurrentExecution(timeoutInSeconds: 60 * 60)] // 1小时防并发public async Task ExecuteDailyTaskAsync(int hour){var lockKey = $"daily-task-{hour}:{DateTime.UtcNow:yyyyMMdd}";await using var lockHandle = await _lockService.AcquireLockAsync(lockKey,TimeSpan.FromMinutes(55)); // 锁有效期55分钟if (lockHandle is null){_logger.LogInformation("其他节点已执行今日 {Hour} 点任务", hour);return;}try{_logger.LogInformation("开始执行 {Hour} 点任务 - 节点: {NodeId}", hour, Environment.MachineName);if (hour == 21){await ExecuteNightlyMaintenanceAsync();}else if (hour == 4){await ExecuteEarlyMorningTasksAsync();}}catch (Exception ex){_logger.LogError(ex, "执行 {Hour} 点任务时发生错误", hour);throw;}}// 具体任务实现方法private async Task PollingTaskAsync(){// 实现游戏角色启动/关闭逻辑}private async Task ExecuteNightlyMaintenanceAsync(){// 21点特殊任务逻辑}// 其他方法...
}// BackgroundTask.cs (安全运行后台任务)
public static class BackgroundTask
{public static Task Run(Func<Task> task, ILogger logger){return Task.Run(async () =>{try{await task();}catch (Exception ex){logger.LogError(ex, "后台任务执行失败");}});}
}
3. 任务调度配置层
3.1 任务初始化器
// RecurringJobInitializer.cs
public class RecurringJobInitializer : IHostedService
{private readonly IRecurringJobManager _jobManager;private readonly IServiceProvider _services;private readonly ILogger<RecurringJobInitializer> _logger;public RecurringJobInitializer(IRecurringJobManager jobManager,IServiceProvider services,ILogger<RecurringJobInitializer> logger){_jobManager = jobManager;_services = services;_logger = logger;}public Task StartAsync(CancellationToken cancellationToken){try{using var scope = _services.CreateScope();var pollingService = scope.ServiceProvider.GetRequiredService<IPollingService>();// 每30分钟执行的任务_jobManager.AddOrUpdate<IPollingService>("polling-tasks-30min",s => s.ExecutePollingTasksAsync(),"*/30 * * * *");// 每天21:00执行的任务_jobManager.AddOrUpdate<IPollingService>("daily-task-21:00",s => s.ExecuteDailyTaskAsync(21),"0 21 * * *");// 每天04:00执行的任务_jobManager.AddOrUpdate<IPollingService>("daily-task-04:00",s => s.ExecuteDailyTaskAsync(4),"0 4 * * *");_logger.LogInformation("周期性任务初始化完成");}catch (Exception ex){_logger.LogError(ex, "初始化周期性任务失败");throw;}return Task.CompletedTask;}public Task StopAsync(CancellationToken cancellationToken) => Task.CompletedTask;
}
4. 应用启动配置
4.1 Program.cs
var builder = WebApplication.CreateBuilder(args);// 添加Redis
builder.Services.AddSingleton<IConnectionMultiplexer>(sp => ConnectionMultiplexer.Connect(builder.Configuration.GetConnectionString("Redis")));// 配置Hangfire
builder.Services.AddHangfire(config =>
{config.UseRedisStorage(builder.Configuration.GetConnectionString("Redis"),new RedisStorageOptions{Prefix = "hangfire:",Db = 1 // 使用单独的Redis数据库});config.UseColouredConsoleLogProvider();
});builder.Services.AddHangfireServer(options =>
{options.ServerName = $"{Environment.MachineName}:{Guid.NewGuid():N}";options.WorkerCount = 1;options.Queues = new[] { "default", "critical" };
});// 注册服务
builder.Services.AddSingleton<IDistributedLockService, RedisDistributedLockService>();
builder.Services.AddScoped<IPollingService, PollingService>();
builder.Services.AddHostedService<RecurringJobInitializer>();var app = builder.Build();// 配置Hangfire仪表盘
app.UseHangfireDashboard("/jobs", new DashboardOptions
{DashboardTitle = "任务调度中心",Authorization = new[] { new HangfireDashboardAuthorizationFilter() },StatsPollingInterval = 60_000 // 60秒刷新一次
});app.Run();// Hangfire仪表盘授权过滤器
public class HangfireDashboardAuthorizationFilter : IDashboardAuthorizationFilter
{public bool Authorize(DashboardContext context){var httpContext = context.GetHttpContext();return httpContext.User.Identity?.IsAuthenticated == true;}
}
5. appsettings.json 配置
{"ConnectionStrings": {"Redis": "localhost:6379,allowAdmin=true","Hangfire": "Server=(localdb)\\mssqllocaldb;Database=Hangfire;Trusted_Connection=True;"},"Hangfire": {"WorkerCount": 1,"SchedulePollingInterval": 5000}
}
关键设计说明
-
分布式锁:
- 使用Redis RedLock算法实现
- 自动处理锁的获取和释放
- 包含完善的错误处理和日志记录
-
任务隔离:
- 使用Hangfire的
[DisableConcurrentExecution]
防止同一任务重复执行 - 分布式锁确保跨节点唯一执行
- 使用Hangfire的
-
错误处理:
- 所有关键操作都有try-catch和日志记录
- 后台任务使用安全包装器执行
-
可观测性:
- 详细的日志记录
- Hangfire仪表盘监控
-
扩展性:
- 可以轻松添加新任务
- 支持动态调整调度策略
这个实现方案完全符合.NET 6的最佳实践,支持分布式部署,确保任务在集群环境中安全可靠地执行。