LOGO OA教程 ERP教程 模切知识交流 PMS教程 CRM教程 开发文档 其他文档  
 
网站管理员

NETCore中实现一个轻量无负担的极简任务调度ScheduleTask

freeflydom
2024年5月25日 11:54 本文热度 1044

至于任务调度这个基础功能,重要性不言而喻,大多数业务系统都会用到,世面上有很多成熟的三方库比如Quartz,Hangfire,Coravel
这里我们不讨论三方的库如何使用 而是从0开始自己制作一个简易的任务调度,如果只是到分钟级别的粒度基本够用

技术栈用到了:BackgroundServiceNCrontab

第一步我们定义一个简单的任务约定,不干别的就是一个执行方法:

    public interface IScheduleTask

    {

        Task ExecuteAsync();

    }

    public abstract class ScheduleTask : IScheduleTask

    {

        public virtual Task ExecuteAsync()

        {

            return Task.CompletedTask;

        }

    }


第二步定义特性标注任务执行周期等信的metadata

    [AttributeUsage(AttributeTargets.Class, AllowMultiple = true, Inherited = false)]

    public class ScheduleTaskAttribute(string cron) : Attribute

    {

        /// <summary>

        /// 支持的cron表达式格式 * * * * *:https://en.wikipedia.org/wiki/Cron

        /// 最小单位为分钟

        /// </summary>

        public string Cron { get; set; } = cron;

        public string? Description { get; set; }

        /// <summary>

        /// 是否异步执行.默认false会阻塞接下来的同类任务

        /// </summary>

        public bool IsAsync { get; set; } = false;

        /// <summary>

        /// 是否初始化即启动,默认false

        /// </summary>

        public bool IsStartOnInit { get; set; } = false;

    }

   

第三步我们定义一个调度器约定,不干别的就是判断当前的任务是否可以执行:

    public interface IScheduler

    {

        /// <summary>

        /// 判断当前的任务是否可以执行

        /// </summary>

        bool CanRun(ScheduleTaskAttribute scheduleMetadata, DateTime referenceTime);

    }

好了,基础步骤就完成了,如果我们需要实现配置级别的任务调度或者动态的任务调度 那我们再抽象一个Store:

    public class ScheduleTaskMetadata(Type scheduleTaskType, string cron)

    {

        public Type ScheduleTaskType { get; set; } = scheduleTaskType;

        public string Cron { get; set; } = cron;

        public string? Description { get; set; }

        public bool IsAsync { get; set; } = false;

        public bool IsStartOnInit { get; set; } = false;

    }

    public interface IScheduleMetadataStore

    {

        /// <summary>

        /// 获取所有ScheduleTaskMetadata

        /// </summary>

        Task<IEnumerable<ScheduleTaskMetadata>> GetAllAsync();

    }

实现一个Configuration级别的Store

    internal class ConfigurationScheduleMetadataStore(IConfiguration configuration) : IScheduleMetadataStore

    {

        const string Key = "BiwenQuickApi:Schedules";


        public Task<IEnumerable<ScheduleTaskMetadata>> GetAllAsync()

        {

            var options = configuration.GetSection(Key).GetChildren();


            if (options?.Any() is true)

            {

                var metadatas = options.Select(x =>

                {

                    var type = Type.GetType(x[nameof(ConfigurationScheduleOption.ScheduleType)]!);

                    if (type is null)

                        throw new ArgumentException($"Type {x[nameof(ConfigurationScheduleOption.ScheduleType)]} not found!");


                    return new ScheduleTaskMetadata(type, x[nameof(ConfigurationScheduleOption.Cron)]!)

                    {

                        Description = x[nameof(ConfigurationScheduleOption.Description)],

                        IsAsync = string.IsNullOrEmpty(x[nameof(ConfigurationScheduleOption.IsAsync)]) ? false : bool.Parse(x[nameof(ConfigurationScheduleOption.IsAsync)]!),

                        IsStartOnInit = string.IsNullOrEmpty(x[nameof(ConfigurationScheduleOption.IsStartOnInit)]) ? false : bool.Parse(x[nameof(ConfigurationScheduleOption.IsStartOnInit)]!),

                    };

                });

                return Task.FromResult(metadatas);

            }

            return Task.FromResult(Enumerable.Empty<ScheduleTaskMetadata>());

        }

    }


然后呢,我们可能需要多任务调度的事件做一些操作或者日志存储.比如失败了该干嘛,完成了回调其他后续业务等.我们再来定义一下具体的事件IEvent,具体可以参考我上一篇文章:
https://www.cnblogs.com/vipwan/p/18184088

    public abstract class ScheduleTaskEvent(IScheduleTask scheduleTask, DateTime eventTime) : IEvent

    {

        /// <summary>

        /// 任务

        /// </summary>

        public IScheduleTask ScheduleTask { get; set; } = scheduleTask;

        /// <summary>

        /// 触发时间

        /// </summary>

        public DateTime EventTime { get; set; } = eventTime;

    }

    /// <summary>

    /// 执行完成

    /// </summary>

    public sealed class TaskSuccessedEvent(IScheduleTask scheduleTask, DateTime eventTime, DateTime endTime) : ScheduleTaskEvent(scheduleTask, eventTime)

    {

        /// <summary>

        /// 执行结束的时间

        /// </summary>

        public DateTime EndTime { get; set; } = endTime;

    }

    /// <summary>

    /// 执行开始

    /// </summary>

    public sealed class TaskStartedEvent(IScheduleTask scheduleTask, DateTime eventTime) : ScheduleTaskEvent(scheduleTask, eventTime);

    /// <summary>

    /// 执行失败

    /// </summary>

    public sealed class TaskFailedEvent(IScheduleTask scheduleTask, DateTime eventTime, Exception exception) : ScheduleTaskEvent(scheduleTask, eventTime)

    {

        /// <summary>

        /// 异常信息

        /// </summary>

        public Exception Exception { get; private set; } = exception;

    }

接下来我们再实现基于NCrontab的简易调度器,这个调度器主要是解析Cron表达式判断传入时间是否可以执行ScheduleTask,具体的代码:

    internal class SampleNCrontabScheduler : IScheduler

    {

        /// <summary>

        /// 暂存上次执行时间

        /// </summary>

        private static ConcurrentDictionary<ScheduleTaskAttribute, DateTime> LastRunTimes = new();


        public bool CanRun(ScheduleTaskAttribute scheduleMetadata, DateTime referenceTime)

        {

            var now = DateTime.Now;

            var haveExcuteTime = LastRunTimes.TryGetValue(scheduleMetadata, out var time);

            if (!haveExcuteTime)

            {

                var nextStartTime = CrontabSchedule.Parse(scheduleMetadata.Cron).GetNextOccurrence(referenceTime);

                LastRunTimes.TryAdd(scheduleMetadata, nextStartTime);


                //如果不是初始化启动,则不执行

                if (!scheduleMetadata.IsStartOnInit)

                    return false;

            }

            if (now >= time)

            {

                var nextStartTime = CrontabSchedule.Parse(scheduleMetadata.Cron).GetNextOccurrence(referenceTime);

                //更新下次执行时间

                LastRunTimes.TryUpdate(scheduleMetadata, nextStartTime, time);

                return true;

            }

            return false;

        }

    }

然后就是核心的BackgroundService了,这里我用的IdleTime心跳来实现,粒度分钟,当然内部也可以封装Timer等实现更复杂精度更高的调度,这里就不展开讲了,代码如下:


    internal class ScheduleBackgroundService : BackgroundService

    {

        private static readonly TimeSpan _pollingTime

#if DEBUG

          //轮询20s 测试环境下,方便测试。

          = TimeSpan.FromSeconds(20);

#endif

#if !DEBUG

         //轮询60s 正式环境下,考虑性能轮询时间延长到60s

         = TimeSpan.FromSeconds(60);

#endif

        //心跳10s.

        private static readonly TimeSpan _minIdleTime = TimeSpan.FromSeconds(10);

        private readonly ILogger<ScheduleBackgroundService> _logger;

        private readonly IServiceProvider _serviceProvider;

        public ScheduleBackgroundService(ILogger<ScheduleBackgroundService> logger, IServiceProvider serviceProvider)

        {

            _logger = logger;

            _serviceProvider = serviceProvider;

        }

        protected override async Task ExecuteAsync(CancellationToken stoppingToken)

        {

            while (!stoppingToken.IsCancellationRequested)

            {

                var pollingDelay = Task.Delay(_pollingTime, stoppingToken);

                try

                {

                    await RunAsync(stoppingToken);

                }

                catch (Exception ex)

                {

                    //todo:

                    _logger.LogError(ex.Message);

                }

                await WaitAsync(pollingDelay, stoppingToken);

            }

        }

        private async Task RunAsync(CancellationToken stoppingToken)

        {

            using var scope = _serviceProvider.CreateScope();

            var tasks = scope.ServiceProvider.GetServices<IScheduleTask>();

            if (tasks is null || !tasks.Any())

            {

                return;

            }

            //调度器

            var scheduler = scope.ServiceProvider.GetRequiredService<IScheduler>();

            async Task DoTaskAsync(IScheduleTask task, ScheduleTaskAttribute metadata)

            {

                if (scheduler.CanRun(metadata, DateTime.Now))

                {

                    var eventTime = DateTime.Now;

                    //通知启动

                    _ = new TaskStartedEvent(task, eventTime).PublishAsync(default);

                    try

                    {

                        if (metadata.IsAsync)

                        {

                            //异步执行

                            _ = task.ExecuteAsync();

                        }

                        else

                        {

                            //同步执行

                            await task.ExecuteAsync();

                        }

                        //执行完成

                        _ = new TaskSuccessedEvent(task, eventTime, DateTime.Now).PublishAsync(default);

                    }

                    catch (Exception ex)

                    {

                        _ = new TaskFailedEvent(task, DateTime.Now, ex).PublishAsync(default);

                    }

                }

            };

            //注解中的task

            foreach (var task in tasks)

            {

                if (stoppingToken.IsCancellationRequested)

                {

                    break;

                }

                //标注的metadatas

                var metadatas = task.GetType().GetCustomAttributes<ScheduleTaskAttribute>();


                if (!metadatas.Any())

                {

                    continue;

                }

                foreach (var metadata in metadatas)

                {

                    await DoTaskAsync(task, metadata);

                }

            }

            //store中的scheduler

            var stores = _serviceProvider.GetServices<IScheduleMetadataStore>().ToArray();


            //并行执行,提高性能

            Parallel.ForEach(stores, async store =>

            {

                if (stoppingToken.IsCancellationRequested)

                {

                    return;

                }

                var metadatas = await store.GetAllAsync();

                if (metadatas is null || !metadatas.Any())

                {

                    return;

                }

                foreach (var metadata in metadatas)

                {

                    var attr = new ScheduleTaskAttribute(metadata.Cron)

                    {

                        Description = metadata.Description,

                        IsAsync = metadata.IsAsync,

                        IsStartOnInit = metadata.IsStartOnInit,

                    };


                    var task = scope.ServiceProvider.GetRequiredService(metadata.ScheduleTaskType) as IScheduleTask;

                    if (task is null)

                    {

                        return;

                    }

                    await DoTaskAsync(task, attr);

                }

            });

        }


        private static async Task WaitAsync(Task pollingDelay, CancellationToken stoppingToken)

        {

            try

            {

                await Task.Delay(_minIdleTime, stoppingToken);

                await pollingDelay;

            }

            catch (OperationCanceledException)

            {

            }

        }

    }

最后收尾阶段我们老规矩扩展一下IServiceCollection:

        internal static IServiceCollection AddScheduleTask(this IServiceCollection services)

        {

            foreach (var task in ScheduleTasks)

            {

                services.AddTransient(task);

                services.AddTransient(typeof(IScheduleTask), task);

            }

            //调度器

            services.AddScheduler<SampleNCrontabScheduler>();

            //配置文件Store:

services.AddScheduleMetadataStore<ConfigurationScheduleMetadataStore>();

            //BackgroundService

           services.AddHostedService<ScheduleBackgroundService>();

            return services;

        }

        /// <summary>

        /// 注册调度器AddScheduler

        /// </summary>

        public static IServiceCollection AddScheduler<T>(this IServiceCollection services) where T : class, IScheduler

        {

            services.AddSingleton<IScheduler, T>();

            return services;

        }


        /// <summary>

        /// 注册ScheduleMetadataStore

        /// </summary>

        public static IServiceCollection AddScheduleMetadataStore<T>(this IServiceCollection services) where T : class, IScheduleMetadataStore

        {

            services.AddSingleton<IScheduleMetadataStore, T>();

            return services;

        }

老规矩我们来测试一下:

    //通过特性标注的方式执行:

    [ScheduleTask(Constants.CronEveryMinute)] //每分钟一次

    [ScheduleTask("0/3 * * * *")]//每3分钟执行一次

    public class KeepAlive(ILogger<KeepAlive> logger) : IScheduleTask

    {

        public async Task ExecuteAsync()

        {

            //执行5s

            await Task.Delay(TimeSpan.FromSeconds(5));

            logger.LogInformation("keep alive!");

        }

    }

public class DemoConfigTask(ILogger<DemoConfigTask> logger) : IScheduleTask

    {

        public Task ExecuteAsync()

        {

            logger.LogInformation("Demo Config Schedule Done!");

            return Task.CompletedTask;

        }

    }

通过配置文件的方式配置Store:

{

  "BiwenQuickApi": {

    "Schedules": [

      {

        "ScheduleType": "Biwen.QuickApi.DemoWeb.Schedules.DemoConfigTask,Biwen.QuickApi.DemoWeb",

        "Cron": "0/5 * * * *",

        "Description": "Every 5 mins",

        "IsAsync": true,

        "IsStartOnInit": false

      },

      {

        "ScheduleType": "Biwen.QuickApi.DemoWeb.Schedules.DemoConfigTask,Biwen.QuickApi.DemoWeb",

        "Cron": "0/10 * * * *",

        "Description": "Every 10 mins",

        "IsAsync": false,

        "IsStartOnInit": true

      }

    ]

  }

}

我们还可以实现自己的Store,这里以放到内存为例,如果有兴趣 你可以可以自行开发一个面板管理:

    public class DemoStore : IScheduleMetadataStore

    {

        public Task<IEnumerable<ScheduleTaskMetadata>> GetAllAsync()

        {

            //模拟从数据库或配置文件中获取ScheduleTaskMetadata

            IEnumerable<ScheduleTaskMetadata> metadatas =

                [

                    new ScheduleTaskMetadata(typeof(DemoTask),Constants.CronEveryNMinutes(2))

                    {

                        Description="测试的Schedule"

                    },

                ];

            return Task.FromResult(metadatas);

        }

    }

//然后注册这个Store:

builder.Services.AddScheduleMetadataStore<DemoStore>();

所有的一切都大功告成,最后我们来跑一下Demo,成功了:

 

当然这里是自己的固定思维设计的一个简约版,还存在一些不足,欢迎板砖轻拍指正!

2024/05/16更新:
提供同一时间单一运行中的任务实现

/// <summary>

/// 模拟一个只能同时存在一个的任务.一分钟执行一次,但是耗时两分钟.

/// </summary>

/// <param name="logger"></param>

[ScheduleTask(Constants.CronEveryMinute, IsStartOnInit = true)]

    public class OnlyOneTask(ILogger<OnlyOneTask> logger) : OnlyOneRunningScheduleTask

    {

        public override Task OnAbort()

        {

            logger.LogWarning($"[{DateTime.Now}]任务被打断.因为有一个相同的任务正在执行!");

            return Task.CompletedTask;

        }


        public override async Task ExecuteAsync()

        {

            var now = DateTime.Now;

            //模拟一个耗时2分钟的任务

            await Task.Delay(TimeSpan.FromMinutes(2));

            logger.LogInformation($"[{now}] ~ {DateTime.Now} 执行一个耗时两分钟的任务!");

        }

    }


源代码我发布到了GitHub,欢迎star! https://github.com/vipwan/Biwen.QuickApi
https://github.com/vipwan/Biwen.QuickApi/tree/master/Biwen.QuickApi/Scheduling


转自https://www.cnblogs.com/vipwan/p/18194062/biwen-quickapi-scheduletask 作者:万雅虎


该文章在 2024/5/25 11:54:07 编辑过
关键字查询
相关文章
正在查询...
点晴ERP是一款针对中小制造业的专业生产管理软件系统,系统成熟度和易用性得到了国内大量中小企业的青睐。
点晴PMS码头管理系统主要针对港口码头集装箱与散货日常运作、调度、堆场、车队、财务费用、相关报表等业务管理,结合码头的业务特点,围绕调度、堆场作业而开发的。集技术的先进性、管理的有效性于一体,是物流码头及其他港口类企业的高效ERP管理信息系统。
点晴WMS仓储管理系统提供了货物产品管理,销售管理,采购管理,仓储管理,仓库管理,保质期管理,货位管理,库位管理,生产管理,WMS管理系统,标签打印,条形码,二维码管理,批号管理软件。
点晴免费OA是一款软件和通用服务都免费,不限功能、不限时间、不限用户的免费OA协同办公管理系统。
Copyright 2010-2024 ClickSun All Rights Reserved