在并行编程中,经常会遇到多线程间操作共享集合的问题,很多时候大家都很难逃避这个问题做到一种无锁编程状态,你也知道一旦给共享集合套上lock之后,并发和伸缩能力往往会造成很大影响,这篇就来谈谈如何尽可能的减少lock锁次数甚至没有。
一:缘由 1. 业务背景昨天在review代码的时候,看到以前自己写的这么一段代码,精简后如下:
private static List<long> ExecuteFilterList(int shopID, List<MemoryCacheTrade> trades, List<FilterConditon> filterItemList, MatrixSearchContext searchContext) { var customerIDList = new List<long>(); var index = 0; Parallel.ForEach(filterItemList, new ParallelOptions() { MaxDegreeOfParallelism = 4 }, (filterItem) => { var context = new FilterItemContext() { StartTime = searchContext.StartTime, EndTime = searchContext.EndTime, ShopID = shopID, Field = filterItem.Field, FilterType = filterItem.FilterType, ItemList = filterItem.FilterValue, SearchList = trades.ToList() }; var smallCustomerIDList = context.Execute(); lock (filterItemList) { if (index == 0) { customerIDList.AddRange(smallCustomerIDList); index++; } else { customerIDList = customerIDList.Intersect(smallCustomerIDList).ToList(); } } }); return customerIDList; }这段代码实现的功能是这样的,filterItemList承载着所有原子化的筛选条件,然后用多线程的形式并发执行里面的item,最后将每个item获取的客户人数集合在高层进行整体求交,画个简图就是下面这样。
2. 问题分析其实这代码存在着一个很大的问题,在Parallel中直接使用lock锁的话,filterItemList有多少个,我的lock就会锁多少次,这对并发和伸缩性是有一定影响的,现在就来想想怎么优化吧!
3. 测试案例为了方便演示,我模拟了一个小案例,方便大家看到实时结果,修改后的代码如下:
public static void Main(string[] args) { var filterItemList = new List<string>() { "conditon1", "conditon2", "conditon3", "conditon4", "conditon5", "conditon6" }; ParallelTest1(filterItemList); } public static void ParallelTest1(List<string> filterItemList) { var totalCustomerIDList = new List<int>(); bool isfirst = true; Parallel.ForEach(filterItemList, new ParallelOptions() { MaxDegreeOfParallelism = 2 }, (query) => { var smallCustomerIDList = GetCustomerIDList(query); lock (filterItemList) { if (isfirst) { totalCustomerIDList.AddRange(smallCustomerIDList); isfirst = false; } else { totalCustomerIDList = totalCustomerIDList.Intersect(smallCustomerIDList).ToList(); } Console.WriteLine($"{DateTime.Now} 被锁了"); } }); Console.WriteLine($"最后交集客户ID:{string.Join(",", totalCustomerIDList)}"); } public static List<int> GetCustomerIDList(string query) { var dict = new Dictionary<string, List<int>>() { ["conditon1"] = new List<int>() { 1, 2, 4, 7 }, ["conditon2"] = new List<int>() { 1, 4, 6, 7 }, ["conditon3"] = new List<int>() { 1, 4, 5, 7 }, ["conditon4"] = new List<int>() { 1, 2, 3, 7 }, ["conditon5"] = new List<int>() { 1, 2, 4, 5, 7 }, ["conditon6"] = new List<int>() { 1, 3, 4, 7, 9 }, }; return dict[query]; } ------ output ------ 2020/04/21 15:53:34 被锁了 2020/04/21 15:53:34 被锁了 2020/04/21 15:53:34 被锁了 2020/04/21 15:53:34 被锁了 2020/04/21 15:53:34 被锁了 2020/04/21 15:53:34 被锁了 最后交集客户ID:1,7 二:第一次优化从结果中可以看到,filterItemList有6个,锁次数也是6次,那如何降低呢? 其实实现Parallel代码的FCL大神也考虑到了这个问题,从底层给了一个很好的重载,如下所示:
public static ParallelLoopResult ForEach<TSource, TLocal>(OrderablePartitioner<TSource> source, ParallelOptions parallelOptions, Func<TLocal> localInit, Func<TSource, ParallelLoopState, long, TLocal, TLocal> body, Action<TLocal> localFinally);这个重载很特别,多了两个参数localInit和localFinally,过会说一下什么意思,先看修改后的代码体会一下
public static void ParallelTest2(List<string> filterItemList) { var totalCustomerIDList = new List<int>(); var isfirst = true; Parallel.ForEach<string, List<int>>(filterItemList, new ParallelOptions() { MaxDegreeOfParallelism = 2 }, () => { return null; }, (query, loop, index, smalllist) => { var smallCustomerIDList = GetCustomerIDList(query); if (smalllist == null) return smallCustomerIDList; return smalllist.Intersect(smallCustomerIDList).ToList(); }, (finalllist) => { lock (filterItemList) { if (isfirst) { totalCustomerIDList.AddRange(finalllist); isfirst = false; } else { totalCustomerIDList = totalCustomerIDList.Intersect(finalllist).ToList(); } Console.WriteLine($"{DateTime.Now} 被锁了"); } }); Console.WriteLine($"最后交集客户ID:{string.Join(",", totalCustomerIDList)}"); } ------- output ------ 2020/04/21 16:11:46 被锁了 2020/04/21 16:11:46 被锁了 最后交集客户ID:1,7 Press any key to continue . . .