ASP.NET Core 3.x 并发限制的实现代码

Microsoft.AspNetCore.ConcurrencyLimiter AspNetCore3.0后增加的,用于传入的请求进行排队处理,避免线程池的不足.
我们日常开发中可能常做的给某web服务器配置连接数以及,请求队列大小,那么今天我们看看如何在通过中间件形式实现一个并发量以及队列长度限制.

Queue策略

添加Nuget

Install-Package Microsoft.AspNetCore.ConcurrencyLimiter

public void ConfigureServices(IServiceCollection services) { services.AddQueuePolicy(options => { //最大并发请求数 options.MaxConcurrentRequests = 2; //请求队列长度限制 options.RequestQueueLimit = 1; }); services.AddControllers(); } public void Configure(IApplicationBuilder app, IWebHostEnvironment env) { //添加并发限制中间件 app.UseConcurrencyLimiter(); app.Run(async context => { Task.Delay(100).Wait(); // 100ms sync-over-async await context.Response.WriteAsync("Hello World!"); }); if (env.IsDevelopment()) { app.UseDeveloperExceptionPage(); } app.UseHttpsRedirection(); app.UseRouting(); app.UseAuthorization(); app.UseEndpoints(endpoints => { endpoints.MapControllers(); }); }

通过上面简单的配置,我们就可以将他引入到我们的代码中,从而做并发量限制,以及队列的长度;那么问题来了,他是怎么实现的呢?

public static IServiceCollection AddQueuePolicy(this IServiceCollection services, Action<QueuePolicyOptions> configure) { services.Configure(configure); services.AddSingleton<IQueuePolicy, QueuePolicy>(); return services; }

QueuePolicy采用的是SemaphoreSlim信号量设计,SemaphoreSlim、Semaphore(信号量)支持并发多线程进入被保护代码,对象在初始化时会指定 最大任务数量,当线程请求访问资源,信号量递减,而当他们释放时,信号量计数又递增。

/// <summary> /// 构造方法(初始化Queue策略) /// </summary> /// <param></param> public QueuePolicy(IOptions<QueuePolicyOptions> options) { _maxConcurrentRequests = options.Value.MaxConcurrentRequests; if (_maxConcurrentRequests <= 0) { throw new ArgumentException(nameof(_maxConcurrentRequests), "MaxConcurrentRequests must be a positive integer."); } _requestQueueLimit = options.Value.RequestQueueLimit; if (_requestQueueLimit < 0) { throw new ArgumentException(nameof(_requestQueueLimit), "The RequestQueueLimit cannot be a negative number."); } //使用SemaphoreSlim来限制任务最大个数 _serverSemaphore = new SemaphoreSlim(_maxConcurrentRequests); }

ConcurrencyLimiterMiddleware中间件

/// <summary> /// Invokes the logic of the middleware. /// </summary> /// <param>The <see cref="HttpContext"/>.</param> /// <returns>A <see cref="Task"/> that completes when the request leaves.</returns> public async Task Invoke(HttpContext context) { var waitInQueueTask = _queuePolicy.TryEnterAsync(); // Make sure we only ever call GetResult once on the TryEnterAsync ValueTask b/c it resets. bool result; if (waitInQueueTask.IsCompleted) { ConcurrencyLimiterEventSource.Log.QueueSkipped(); result = waitInQueueTask.Result; } else { using (ConcurrencyLimiterEventSource.Log.QueueTimer()) { result = await waitInQueueTask; } } if (result) { try { await _next(context); } finally { _queuePolicy.OnExit(); } } else { ConcurrencyLimiterEventSource.Log.RequestRejected(); ConcurrencyLimiterLog.RequestRejectedQueueFull(_logger); context.Response.StatusCode = StatusCodes.Status503ServiceUnavailable; await _onRejected(context); } }

每次当我们请求的时候首先会调用_queuePolicy.TryEnterAsync(),进入该方法后先开启一个私有lock锁,再接着判断总请求量是否≥(请求队列限制的大小+最大并发请求数),如果当前数量超出了,那么我直接抛出,送你个503状态;

if (result) { try { await _next(context); } finally { _queuePolicy.OnExit(); } } else { ConcurrencyLimiterEventSource.Log.RequestRejected(); ConcurrencyLimiterLog.RequestRejectedQueueFull(_logger); context.Response.StatusCode = StatusCodes.Status503ServiceUnavailable; await _onRejected(context); }

问题来了,我这边如果说还没到你设置的大小呢,我这个请求没有给你服务器造不成压力,那么你给我处理一下吧.

await _serverSemaphore.WaitAsync();异步等待进入信号量,如果没有线程被授予对信号量的访问权限,则进入执行保护代码;否则此线程将在此处等待,直到信号量被释放为止

lock (_totalRequestsLock) { if (TotalRequests >= _requestQueueLimit + _maxConcurrentRequests) { return false; } TotalRequests++; } //异步等待进入信号量,如果没有线程被授予对信号量的访问权限,则进入执行保护代码;否则此线程将在此处等待,直到信号量被释放为止 await _serverSemaphore.WaitAsync(); return true; }

返回成功后那么中间件这边再进行处理,_queuePolicy.OnExit();通过该调用进行调用_serverSemaphore.Release();释放信号灯,再对总请求数递减

Stack策略

再来看看另一种方法,栈策略,他是怎么做的呢?一起来看看.再附加上如何使用的代码.

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wdswdx.html