细说.NET中的多线程:使用Task

上一节我们介绍了线程池相关的概念以及用法。我们可以发现ThreadPool. QueueUserWorkItem是一种起了线程之后就不管了的做法。但是实际应用过程,我们往往会有更多的需求,比如如果更简单的知道线程池里面的某些线程什么时候结束,线程结束后如何执行别的任务。Task可以说是ThreadPool的升级版,在线程任务调度,并行编程中都有很大的作用。

创建并且初始化Task

使用lambda表达式创建Task

Task.Factory.StartNew(() => Console.WriteLine("Hello from a task!"));

var task = new Task(() => Console.Write("Hello"));

task.Start();

用默认参数的委托创建Task

using System;

using System.Threading.Tasks;

namespace MultiThread

{

class ThreadTest

{

static void Main()

{

var task = Task.Factory.StartNew(state => Greet("Hello"), "Greeting");

Console.WriteLine(task.AsyncState);  // Greeting

task.Wait();

}

static void Greet(string message) { Console.Write(message); }

}

}

这种方式的一个优点是,task.AsyncState作为一个内置的属性,可以在不同线程中获取参数的状态。

System.Threading.Tasks.TaskCreateOptions

创建Task的时候,我们可以指定创建Task的一些相关选项。在.Net 4.0中,有如下选项:

LongRunning

用来表示这个Task是长期运行的,这个参数更适合block线程。LongRunning线程一般回收的周期会比较长,因此CLR可能不会把它放到线程池中进行管理。

PreferFairness

表示让Task尽量以公平的方式运行,避免出现某些线程运行过快或者过慢的情况。

AttachedToParent

表示创建的Task是当前线程所在Task的子任务。这一个用途也很常见。

下面的代码是创建子任务的示例:

using System;

using System.Threading;

using System.Threading.Tasks;

namespace MultiThread

{

class ThreadTest

{

public static void Main(string[] args)

{

Task parent = Task.Factory.StartNew(() =>

{

Console.WriteLine("I am a parent");

Task.Factory.StartNew(() =>        // Detached task

{

Console.WriteLine("I am detached");

});

Task.Factory.StartNew(() =>        // Child task

{

Console.WriteLine("I am a child");

}, TaskCreationOptions.AttachedToParent);

});

parent.Wait();

Console.ReadLine();

}

}

}

如果你等待你一个任务结束,你必须同时等待任务里面的子任务结束。这一点很重要,尤其是你在使用Continue的时候。(后面会介绍)

等待Task

在ThreadPool内置的方法中无法实现的等待,在Task中可以很简单的实现了:

using System;

using System.Threading;

using System.Threading.Tasks;

namespace MultiThread

{

class ThreadTest

{

static void Main()

{

var t1 = Task.Run(() => Go(null));

var t2 = Task.Run(() => Go(123));

Task.WaitAll(t1, t2);//等待所有Task结束

//Task.WaitAny(t1, t2);//等待任意Task结束

}

static void Go(object data)  // data will be null with the first call.

{

Thread.Sleep(5000);

Console.WriteLine("Hello from the thread pool! " + data);

}

}

}

注意:

当你调用一个Wait方法时,当前的线程会被阻塞,直到Task返回。但是如果Task还没有被执行,这个时候系统可能会用当前的线程来执行调用Task,而不是新建一个,这样就不需要重新创建一个线程,并且阻塞当前线程。这种做法节省了创建新线程的开销,也避免了一些线程的切换。但是也有缺点,当前线程如果和被调用的Task同时想要获得一个lock,就会导致死锁。

Task异常处理

当等待一个Task完成的时候(调用Wait或者或者访问Result属性的时候),Task任务中没有处理的异常会被封装成AggregateException重新抛出,InnerExceptions属性封装了各个Task没有处理的异常。

using System;

using System.Threading.Tasks;

namespace MultiThreadTest

{

class Program

{

static void Main(string[] args)

{

int x = 0;

Task<int> calc = Task.Factory.StartNew(() => 7 / x);

try

{

Console.WriteLine(calc.Result);

}

catch (AggregateException aex)

{

Console.Write(aex.InnerException.Message);  // Attempted to divide by 0

}

}

}

}

对于有父子关系的Task,子任务未处理的异常会逐层传递到父Task,并且最后包装在AggregateException中。

using System;

using System.Threading.Tasks;

namespace MultiThreadTest

{

class Program

{

static void Main(string[] args)

{

TaskCreationOptions atp = TaskCreationOptions.AttachedToParent;

var parent = Task.Factory.StartNew(() =>

{

Task.Factory.StartNew(() =>  // Child

{

Task.Factory.StartNew(() => { throw null; }, atp);  // Grandchild

}, atp);

});

// The following call throws a NullReferenceException (wrapped

// in nested AggregateExceptions):

parent.Wait();

}

}

}

取消Task

如果想要支持取消任务,那么在创建Task的时候,需要传入一个CancellationTokenSouce

示例代码:

using System;

using System.Threading;

using System.Threading.Tasks;

namespace MultiThreadTest

{

class Program

{

static void Main(string[] args)

{

var cancelSource = new CancellationTokenSource();

CancellationToken token = cancelSource.Token;

Task task = Task.Factory.StartNew(() =>

{

// Do some stuff...

token.ThrowIfCancellationRequested();  // Check for cancellation request

// Do some stuff...

}, token);

cancelSource.Cancel();

try

{

task.Wait();

}

catch (AggregateException ex)

{

if (ex.InnerException is OperationCanceledException)

Console.Write("Task canceled!");

}

Console.ReadLine();

}

}

}

任务的连续执行

Continuations

任务调度也是常见的需求,Task支持一个任务结束之后执行另一个任务。

Task task1 = Task.Factory.StartNew(() => Console.Write("antecedant.."));

Task task2 = task1.ContinueWith(task =>Console.Write("..continuation"));

Continuations 和Task<TResult>

Task也有带返回值的重载,示例代码如下:

Task.Factory.StartNew<int>(() => 8)

.ContinueWith(ant => ant.Result * 2)

.ContinueWith(ant => Math.Sqrt(ant.Result))

.ContinueWith(ant => Console.WriteLine(ant.Result));  // output 4

子任务

前面提到了,当你等待一个任务的时候,同时需要等待它的子任务���成。

下面代码演示了带子任务的Task:

using System;

using System.Threading.Tasks;

using System.Threading;

namespace MultiThreadTest

{

class Program

{

public static void Main(string[] args)

{

Task<int[]> parentTask = Task.Factory.StartNew(() =>

{

int[] results = new int[3];

Task t1 = new Task(() => { Thread.Sleep(3000); results[0] = 0; }, TaskCreationOptions.AttachedToParent);

Task t2 = new Task(() => { Thread.Sleep(3000); results[1] = 1; }, TaskCreationOptions.AttachedToParent);

Task t3 = new Task(() => { Thread.Sleep(3000); results[2] = 2; }, TaskCreationOptions.AttachedToParent);

t1.Start();

t2.Start();

t3.Start();

return results;

});

Task finalTask = parentTask.ContinueWith(parent =>

{

foreach (int result in parent.Result)

{

Console.WriteLine(result);

}

});

finalTask.Wait();

Console.ReadLine();

}

}

}

这段代码的输出结果是: 1,2,3

FinalTask会等待所有子Task结束后再执行。

TaskFactory

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/3f3fa230a8819887f63f20088c2670ec.html