object data = limitedQueue.Dequeue();
if (data == null)
return false;
}
return true;
}
}
public interface ILimitingService:IDisposable
{
/// <summary>
/// 申请流量处理
/// </summary>
/// <returns>true:获取成功,false:获取失败</returns>
bool Request();
}
public class LimitingFactory
{
/// <summary>
/// 创建限流服务对象
/// </summary>
/// <param>限流模型</param>
/// <param>最大QPS</param>
/// <param>最大可用票据数</param>
public static ILimitingService Build(LimitingType limitingType = LimitingType.TokenBucket, int maxQPS = 100, int limitSize = 100)
{
switch (limitingType)
{
case LimitingType.TokenBucket:
default:
return new TokenBucketLimitingService(maxQPS, limitSize);
case LimitingType.LeakageBucket:
return new LeakageBucketLimitingService(maxQPS, limitSize);
}
}
}
/// <summary>
/// 限流模式
/// </summary>
public enum LimitingType
{
TokenBucket,//令牌桶模式
LeakageBucket//漏桶模式
}
public class LimitedQueue<T> : Queue<T>
{
private int limit = 0;
public const string QueueFulled = "TTP-StreamLimiting-1001";
public int Limit
{
get { return limit; }
set { limit = value; }
}
public LimitedQueue()
: this(0)
{ }
public LimitedQueue(int limit)
: base(limit)
{
this.Limit = limit;
}
public new bool Enqueue(T item)
{
if (limit > 0 && this.Count >= this.Limit)
{
return false;
}
base.Enqueue(item);
return true;
}
}
调用方法:
var service = LimitingFactory.Build(LimitingType.TokenBucket, 500, 200);
while (true)
{
var result = service.Request();
//如果返回true,说明可以进行业务处理,否则需要继续等待
if (result)
{
//业务处理......
}
else
Thread.Sleep(1);
}
二、漏桶算法
声明一个固定容量的桶,每接受到一个请求向桶中添加一个令牌,当令牌桶达到上线后请求丢弃或等待,具体算法如下:
创建一个固定容量的漏桶,请求到达时向漏桶添加一个令牌
如果请求添加令牌不成功,请求丢弃或等待
另一个线程以固定的速率消费桶里的令牌
工作过程也包括3个阶段:产生令牌、消耗令牌和判断数据包是否通过。其中涉及到2个参数:令牌自动消费的速率和令牌桶的大小,个过程的具体工作如下。
产生令牌:业务程序根据具体业务情况申请令牌。申请一次,令牌桶令牌加一。如果桶中令牌数已到达上限,则挂起业务后等待令牌。
消费令牌:周期性的以固定速率消费令牌桶中令牌,桶中的令牌不断较少。
判断是否通过:判断是否已有令牌桶是否存在有效令牌,当桶中的令牌数量可以满足需求时,则继续业务处理,否则将挂起业务,等待令牌。
C#的一个实现方式:
class LeakageBucketLimitingService: ILimitingService
{
private LimitedQueue<object> limitedQueue = null;
private CancellationTokenSource cancelToken;
private Task task = null;
private int maxTPS;
private int limitSize;
private object lckObj = new object();
public LeakageBucketLimitingService(int maxTPS, int limitSize)
{
this.limitSize = limitSize;
this.maxTPS = maxTPS;