为什么编写TaskSchedulerEx类?
因为.NET默认线程池只有一个线程池,如果某个批量任务一直占着大量线程,甚至耗尽默认线程池,则会严重影响应用程序域中其它任务或批量任务的性能。
特点:
1、使用独立线程池,线程池中线程分为核心线程和辅助线程,辅助线程会动态增加和释放,且总线程数不大于参数_maxThreadCount
2、无缝兼容Task,使用上和Task一样,可以用它来实现异步,参见:C# async await 异步执行方法封装 替代 BackgroundWorker
3、队列中尚未执行的任务可以取消
4、通过扩展类TaskHelper实现任务分组
5、和SmartThreadPool对比,优点是无缝兼容Task类,和Task类使用没有区别,因为它本身就是对Task、TaskScheduler的扩展,所以Task类的ContinueWith、WaitAll等方法它都支持,以及兼容async、await异步编程
6、代码量相当精简,TaskSchedulerEx类只有260多行代码
7、池中的线程数量会根据负载自动增减,支持,但没有SmartThreadPool智能,为了性能,使用了比较笨的方式实现,不知道大家有没有既智能,性能又高的方案,我有一个思路,在定时器中计算每个任务执行平均耗时,然后使用公式(线程数 = CPU核心数 * ( 本地计算时间 + 等待时间 ) / 本地计算时间)来计算最佳线程数,然后按最佳线程数来动态创建线程,但这个计算过程可能会牺牲性能
对比SmartThreadPool:
TaskSchedulerEx类代码(使用Semaphore实现):
using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Runtime.InteropServices; using System.Text; using System.Threading; using System.Threading.Tasks; namespace Utils { /// <summary> /// TaskScheduler扩展 /// 每个实例都是独立线程池 /// </summary> public class TaskSchedulerEx : TaskScheduler, IDisposable { #region 外部方法 [DllImport("kernel32.dll", EntryPoint = "SetProcessWorkingSetSize")] public static extern int SetProcessWorkingSetSize(IntPtr process, int minSize, int maxSize); #endregion #region 变量属性事件 private ConcurrentQueue<Task> _tasks = new ConcurrentQueue<Task>(); private int _coreThreadCount = 0; private int _maxThreadCount = 0; private int _auxiliaryThreadTimeOut = 20000; //辅助线程释放时间 private int _activeThreadCount = 0; private System.Timers.Timer _timer; private object _lockCreateTimer = new object(); private bool _run = true; private Semaphore _sem = null; private int _semMaxCount = int.MaxValue; //可以同时授予的信号量的最大请求数 private int _semCount = 0; //可用信号量请求数 private int _runCount = 0; //正在执行的和等待执行的任务数量 /// <summary> /// 活跃线程数 /// </summary> public int ActiveThreadCount { get { return _activeThreadCount; } } /// <summary> /// 核心线程数 /// </summary> public int CoreThreadCount { get { return _coreThreadCount; } } /// <summary> /// 最大线程数 /// </summary> public int MaxThreadCount { get { return _maxThreadCount; } } #endregion #region 构造函数 /// <summary> /// TaskScheduler扩展 /// 每个实例都是独立线程池 /// </summary> /// <param>核心线程数(大于或等于0,不宜过大)(如果是一次性使用,则设置为0比较合适)</param> /// <param>最大线程数</param> public TaskSchedulerEx(int coreThreadCount = 10, int maxThreadCount = 20) { _sem = new Semaphore(0, _semMaxCount); _maxThreadCount = maxThreadCount; CreateCoreThreads(coreThreadCount); } #endregion #region override GetScheduledTasks protected override IEnumerable<Task> GetScheduledTasks() { return _tasks; } #endregion #region override TryExecuteTaskInline protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) { return false; } #endregion #region override QueueTask protected override void QueueTask(Task task) { _tasks.Enqueue(task); while (_semCount >= _semMaxCount) //信号量已满,等待 { Thread.Sleep(1); } _sem.Release(); Interlocked.Increment(ref _semCount); Interlocked.Increment(ref _runCount); if (_activeThreadCount < _maxThreadCount && _activeThreadCount < _runCount) { CreateThread(); } } #endregion #region 资源释放 /// <summary> /// 资源释放 /// 队列中尚未执行的任务不再执行 /// </summary> public void Dispose() { _run = false; if (_timer != null) { _timer.Stop(); _timer.Dispose(); _timer = null; } while (_activeThreadCount > 0) { _sem.Release(); Interlocked.Increment(ref _semCount); } } #endregion #region 创建核心线程池 /// <summary> /// 创建核心线程池 /// </summary> private void CreateCoreThreads(int? coreThreadCount = null) { if (coreThreadCount != null) _coreThreadCount = coreThreadCount.Value; for (int i = 0; i < _coreThreadCount; i++) { Interlocked.Increment(ref _activeThreadCount); Thread thread = null; thread = new Thread(new ThreadStart(() => { Task task; while (_run) { if (_tasks.TryDequeue(out task)) { TryExecuteTask(task); Interlocked.Decrement(ref _runCount); } else { _sem.WaitOne(); Interlocked.Decrement(ref _semCount); } } Interlocked.Decrement(ref _activeThreadCount); if (_activeThreadCount == 0) { GC.Collect(); GC.WaitForPendingFinalizers(); if (Environment.OSVersion.Platform == PlatformID.Win32NT) { SetProcessWorkingSetSize(System.Diagnostics.Process.GetCurrentProcess().Handle, -1, -1); } } })); thread.IsBackground = true; thread.Start(); } } #endregion #region 创建辅助线程 /// <summary> /// 创建辅助线程 /// </summary> private void CreateThread() { Interlocked.Increment(ref _activeThreadCount); Thread thread = null; thread = new Thread(new ThreadStart(() => { Task task; DateTime dt = DateTime.Now; while (_run && DateTime.Now.Subtract(dt).TotalMilliseconds < _auxiliaryThreadTimeOut) { if (_tasks.TryDequeue(out task)) { TryExecuteTask(task); Interlocked.Decrement(ref _runCount); dt = DateTime.Now; } else { _sem.WaitOne(_auxiliaryThreadTimeOut); Interlocked.Decrement(ref _semCount); } } Interlocked.Decrement(ref _activeThreadCount); if (_activeThreadCount == _coreThreadCount) { GC.Collect(); GC.WaitForPendingFinalizers(); if (Environment.OSVersion.Platform == PlatformID.Win32NT) { SetProcessWorkingSetSize(System.Diagnostics.Process.GetCurrentProcess().Handle, -1, -1); } } })); thread.IsBackground = true; thread.Start(); } #endregion #region 全部取消 /// <summary> /// 全部取消 /// 取消队列中尚未执行的任务 /// </summary> public void CancelAll() { Task tempTask; while (_tasks.TryDequeue(out tempTask)) { Interlocked.Decrement(ref _runCount); } } #endregion } }