大章26:线程基础

Widnows的线程概念

没有线程概念的时候,机器都是“单线程”运行的,长时间运行的任务会阻止其他任务执行(16位windows下打印文档很容易“冻结”整个机器导致各种程序出错)。

所以微软设计新的OS内核来改进这些问题,该内核决定在进程中运行应用程序的每个实例。进程实际是应用程序的实例要使用的资源的集合,每个进程都被赋予了一个虚拟地址空间来避免被其它进程访问。

但是光这样还不够,如果机器只有一个CPU,应用程序死循环仍然会导致其他程序无法运行。线程就是微软交出的解决方案,它是一个Windows概念,它的职责是对CPU进行虚拟化,为每个进程都提供该进程专用的线程(功能相当于原来的一个物理CPU)。所以单物理CPU机器,一个进程死循环,不影响其他进程。

线程开销⭐

和一切虚拟化机制一样,线程有空间(内存耗用)和时间(运行时的执行性能)上的开销。

下面对每个线程都有的开销一一介绍。

①线程内核对象 (thread kernel object)

这是OS为系统中创建的每个线程都分配并初始化的数据结构之一。

对象中包括对线程进行描述的属性、**线程上下文(thread context)**:线程上下文是包含CPU寄存器集合的内存块,x64使用约1240字节的内存。上下文是线程上一次执行完毕后,CPU寄存器的状态。

②线程环境块 (thread environment block,TEB)

TEB耗用一个内存页(x64中4KB),

里面有**异常处理链首(head)**:线程每进入一个try块,都会在链首(head)中插入一个节点(node),退出try块时删除该节点。

此外,TEB中还有GDI(图形设备接口)和OpenGL用的一些数据。

③用户模式栈 (user- mode stack)

堆栈概念中的栈说的就是这个了,默认分配1MB内存(其实windows是保留1MB容量,等用了才调拨给你)。

用户模式栈存储传给方法的局部变量和实参。还包含一个返回地址:指出当前方法返回时线程该从什么地方执行。

④内核模式栈 (kernel- mode stack)

分配x86是12KB,x64是24KB。

应用程序代码向OS中的内核模式函数传递实参时,会复制 用户模式栈 传去的实参并加以验证并不允许修改。最后OS内核代码开始处理复制的值。

应用程序代码发起内核模式函数调用 -> 用户模式栈 -> 内核模式栈 -> 内核模式函数

⑤DLL线程连接 (attach)和线程分离 (detach)通知

一个Windows机制:在进程中创建线程时,都会调用进程中加载的所有非托管DLL的 DllMain方法并向该方法传递 DLL_THREAD_ATTACH标志;终止线程时,同样调用 DllMain方法并向该方法传递 DLL_THREAD_DETACH标志。

有的DLL需要这些通知才能为进程中 创建/销毁 的每个线程执行特殊的 初始化/资源清理 ,比如C-Runtime库DLL。

上下文调度

首先搞清楚一点,下面讨论的都是单物理CPU(或者单核CPU)且线程数>CPU数的情况!!!想看Windows的调度,去看“线程调度和优先级”节。

Windows任何时刻都只将一个线程分配给一个CPU(或CPU核,下面称CPU)。

CPU会为线程执行一个时间片 (quantum)的时长,大概30ms吧,等时间片到期了,就会进行上下文调度切换执行另一个线程。上下文是线程上一次执行完毕后,CPU寄存器的状态。

上下文调度具体流程:

  1. 将CPU寄存器的值保存到当前正在运行的线程的内核对象内部的一个上下文结构中。
  2. 从现有线程集合中选出一个线程供调度。如果该线程由另一个进程拥有, Windows在
    开始执行任何代码或者接触任何数据之前,还必须切换CPU“看见”的虚拟地址空间。
  3. 将所选上下文结构中的值加载到CPU的寄存器中。

上下文调度之后线程切换完成,CPU就会执行所选的线程,直到下一个时间片过了又要切换。

毫无疑问,这个机制是一个非常大的开销,特别是数据不在cache中的时候访问会很慢导致还没做点啥就切换别的线程了。

上下文切换是净开销,也就是说它所产生的开销不会换来任何内存或性能上的收益,只是为了能够提供一个健壮的、响应灵敏的操作系统。比如,一个应用程序的线程进入死循环, Windows会定期抢占( preempt)它,将新线程分配给CPU从而使新线程有机会运行。假如新线程是任务管理器线程,就能终止包含了死循环线程的进程。

多核情况

理解了上面说的机制,再来理解多核心CPU的情况:

安装了多个CPU(或者一个多核CPU)的计算机可以真正同时运行几个线程,Windows为每个CPU内核都分配一个线程,每个内核都自己执行到其他线程的上下文切换,且Windows确保单个线程不会同时在多个内核上调度

停止疯狂

知道了上下文调度机制,不难得出:如果只关心性能,线程数和CPU核心数一致才是最好的。如果线程数超过了CPU的数目,就会产生上下文切换和性能损失。如果每个CPU只有一个线程,就不会有上下文切换,线程将全速运行。

但是仍然选择这么做,是因为能让Windows整体更不容易崩溃、提高响应能力。

那么标题为什么要叫停止疯狂呢?

因为打开我的任务管理器一看,发现一台4核的cpu跑了300多个进程4000个线程,光给线程就分配掉了4GB的内存,离大谱!而大部分线程压根没被调用就是挂着,是一种极大的浪费。

CPU发展趋势

了解即可,讲讲硬件有个概念。

CPU厂商过去只知道一味地提高CPU速度。但CPU厂商没有延续这个趋势,因为高速运行的CPU会产生大量热量。

所以发展方向变成了实现多线程,物理上有这么些手段:

  • 多个CPU,不适合普通用户。
  • 超线程芯片,硬件上实现多线程。通过分支预测错误和缓存未命中和等待数据的优化,来不停切换线程,实现1个线程能当2个用。WindowsOS层面并不知道硬件CPU其实是超线程的,他只知道有2个线程在并发执行。
  • 多核芯片,适合普通用户。

CLR线程和 Windows线程

CLR使用 Windows的线程处理功能。

使用专用线程

什么时候用

创建线程来执行异步的计算限制(compute- bound)操作,不过这门技术并不推荐,推荐使用线程池来执行异步

但还是说回来,不用线程池而是显式创建线程来专门执行一个计算限制的操作的情况,一般是线程池方式做不到的事,比如以下几种:

  • 线程需要以非普通线程优先级运行。(所有线程池线程都以普通优先级运行)
  • 需要线程表现为一个前台线程,防止应用程序在线程结束任务前终止。
  • 计算限制的任务需要长时间运行。(线程池为了判断是否需要创建一个额外的线程)
  • 要启动线程,并可能调用 Thread的 Abort方法来提前终止它。

创建专用线程,并执行异步的计算限制操作

为了创建专用线程,要构造 System.Threading.Thread类的实例,向构造器传递一个方法
名。以下是 Thread的构造器的原型:

1
2
3
4
public sealed class Thread : CriticalFinalizerObject, ... {
public Thread(ParameterizedThreadStart start);
// 未列出不常用的构造器
}

start参数表示专用线程要执行的方法,这个方法必须和 ParameterizedThreadStart委托的签名匹配:

1
delegate void ParameterizedThreadStart(object obj);

光创建线程还不算完,并不会真的创建一个操作系统线程。要实际创建一个操作系统线程,并让它开始执行回调方法,必须调用 Thread的Start方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
using System.Threading;

public static class Program {
// Main thread: starting a dedicated thread (专用线程)
Thread dedicatedThread = new Thread(ComputeBoundOp);
dedicatedThread.Start(5);

Thread.Sleep(10000); // 模拟做其他工作(10秒)
dedicatedThread.Join();// 等待线程终止。Join:在此实例表示的线程终止前,阻止调用线程
}

// 这个方法的签名必须和 ParameterizedThreadStart委托匹配
private static void ComputeBoundOp (object state) {
WriteLine(state);
Thread. Sleep(1000); // 模拟做其他任务(1秒)
// 这个方法返回后,专用线程将终止
}

// output: 5

使用线程的理由

画面响应(通常是对于客户端GUI应用程序)

Windows为每个进程提供它自己的线程,确保发生死循环的应用程序不会妨碍其他应用程序。类似的,在客户端GUI应用程序中,可以将一些工作交给一个线程进行,使GUI线程能灵敏地响应用户输入。

性能(对于客户端和服务器应用程序)

由于Windows每个CPU调度一个线程,而且多个(核)CPU能并发执行这些线程,所以能提升性能。

线程调度和优先级⭐

Windows的线程调度

前面讲过单核CPU的上下文切换,属于是线程调度的最初级理论,下面由Windows的机制深入。

重温一下上下文切换流程

**上下文(context)**结构反映了线程上一次执行完毕后CPU寄存器的状态。

在一个**时间片(time-slice)**之后,Windows检查现存的所有线程内核对象,在这些对象中只有那些没有正在等待什么的线程才合适调度。

饥饿(starvation)

而在这些线程中,系统调度CPU执行哪一个线程,是由线程自己的优先级决定的,有031(31最高)的优先级。只要存在可调度的优先级31的线程,系统就永远不会调度CPU给030的任何线程。高优先级的线程占用CPU太久,使低优先级线程无法运行,这种情况称为**饥饿(starvation)**。

多处理器的机器很少出现饥饿情况,因为能同时并行执行很多线程。

抢占式操作系统

Windows是抢占式多线程(preemptive multithreaded)操作系统,线程可在任何时间停止(被抢占)并调度另一个线程。

Windows是抢占式,低优先级的线程哪怕时间片没用完,也会被立刻挂起执行优先级更高的线程。

零页线程(zero page thread)

系统启动时会创建一个特殊的零页线程,优先级为0,在没有其他线程需要“干活儿”的时候,零页线程将系统RAM的所有空闲页清零。Windows不允许其他线程的优先级为0。

线程优先级and进程优先级

前面讲过了优先级是0~31,现在看一下自己写的程序可控制的优先级映射。

进程可通过**优先级类(priority class)**来控制优先级,一共6种;线程7种。

注意,表中没有值为0的线程优先级。这是因为0优先级保留给零页线程了,系统不允许其他线程的优先级为0。而且,以下优先级也不可获得:17,18,19,20,21,27,28,29或者30。以内核模式运行的设备驱动程序才能获得这些优先级。

我们可以通过System.Diagnostics的Process类和ProcessThread类获取进程线程的windows视图。也可以通过AppDomain和Thread类获取线程的CLR视图。

前台线程和后台线程

CLR将每个线程要么视为前台线程,要么视为后台线程。一个进程的所有前台线程停止运行时,CLR强制终止仍在运行的任何后台线程。这些后台线程被直接终止;不抛出异常。

每个AppDomain都可运行一个单独的应用程序,而每个应用程序都有自己的前台线程。如果应用程序退出,造成它的前台线程终止,则CLR仍需保持活动并运行,使其他应用程序能继续运行。所有应用程序都退出,它们的所有前台线程都终止后,整个进程就可以被销毁了。

Thread类新建线程默认是前台线程,线程池线程默认是后台线程。当然,可以通过Thread.Isbackground属性随时修改线程是前台还是后台。

案例

一般我们拿前台线程做画面,后台线程IO,这样用户关闭了程序界面 => 前台线程关闭 => 后台IO线程自动被关闭 => IO不执行下去(比如写入数据到Excel),是合理的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void Main() {
// 创建新线程(默认为前台线程)
Thread t = new Thread(Worker);
// 使线程成为后台线程
t.IsBackground = true;
t.Start();// 启动线程
// ⭐如果t是前台线程,则应用程序大约10秒后才终止
// ⭐如果t是后台线程,则应用程序立即终止
}

private static void Worker() {
Thread. sleep(10000);// 模拟做10秒钟的工作
// 下面这行代码只有在由一个前台线程执行时才会显示
Console.WriteLine("Returning from Worker");
}

大章27:计算限制的异步操作

CLR线程池基础

一个CLR拥有一个**线程池(thread pool)**。如果一个CLR下有多个AppDomain,那么它们共享一个线程池;如果一个进程中加载了多个CLR,那么每个CLR都有自己的线程池。

向线程池请求流程⭐

CLR初始化时,线程池中是没有线程的。在内部,线程池维护了一个操作请求队列。应用程序执行一个异步操作时,就会对线程池进行请求,具体是将一个记录项(entry)追加到队列中。线程池会从这个队列中提取记录项,将这个记录项派发(dispatch)给一个线程池的线程;如果线程池没有线程,就创建一个新线程。当这个线程完成任务后并不销毁,而是返回线程池、进入空闲状态。但是如果一个线程在线程池里闲太久了(应用程序很久不向线程池发出请求),为了避免资源浪费,CLR才会终止它。

使用ThreadPool

使用ThreadPool执行简单的计算限制操作。

调用ThreadPool.QueueUserWorkItem(WaitCallback cb);ThreadPool.QueueUserWorkItem(WaitCallback cb, object state);。这两个方法向线程池的队列添加一个**工作项(work item)**以及可选的状态数据,然后方法立刻返回。

传递的回调方法必须满足delegate void WaitCallback(Object state);

例子

下面演示一下如何让一个线程池线程以异步方式调用一个方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
static void Main(string[] args)
{
ThreadPool.QueueUserWorkItem(DoSth, 5);
Console.WriteLine("main thread start ,id:" + + Thread.CurrentThread.ManagedThreadId);
Thread.Sleep(10000);
Console.WriteLine("main thread end ,id:" + +Thread.CurrentThread.ManagedThreadId);

Console.Read();
}

public static void DoSth(object state)
{
Console.WriteLine("thread from ThreadPool is doing sth ,state:" + state + " thread id: " + Thread.CurrentThread.ManagedThreadId);
Thread.Sleep(1000);
}

//由于是异步进行的,所以有时会输出
//main thread start ,id:1
//thread from ThreadPool is doing sth ,state:5 thread id: 3
//main thread end ,id:1

//有时会输出
//thread from ThreadPool is doing sth ,state:5 thread id: 3
//main thread start ,id:1
//main thread end ,id:1

//但是不难看出,执行任务的线程是从线程池拿的,不再是主线程

执行上下文

每个线程都关联了一个**执行上下文(execution context)**数据结构。它包含了安全设置(压缩栈、 Thread的 Principal属性和 Windows身份)、宿主设置、逻辑调用上下文数据。

每当一个**线程(初始线程)使用另一个线程(辅助线程,也就是线程使用的线程)执行任务时,为了确保两者的操作使用相同的安全设置和宿主设置,前者的上下文应流向 flow(复制到)**辅助线程。复制,这要耗费不少时间。

阻断上下文的流向(flow)

可以使用System.Threading.ExecutionContext类来控制线程的执行上下文流到另一个线程:

1
2
3
4
5
6
public sealed class ExecutionContext : IDisposable, ISerializable {
public static AsyncFlowControl SuppressFlow(); // 取消执行上下文在异步线程之间的流动
public static void RestoreFlow(); // 恢复执行上下文在异步线程之间的流动
public static bool IsFlowSuppressed(); // 指示当前是否取消了执行上下文的流动
...
}

通过这个类,你可以阻止执行上下文的流动。当初始线程的上下文不流向辅助线程,辅助线程会使用上一次和它关联的任意执行上下文。所以你想要阻止流动,就需要确保辅助线程不应执行任何要依赖于执行上下文状态,比如用用户的Windows身份。

一般只拿来优化服务端应用程序,这样做对客户端程序优化甚微。

阻断例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public static void Main() {
// 将一些数据放到Main线程的逻辑调用上下文中
CallContext.LogicalSetData("Name","Jeffrey");

// 初始化要由一个线程池线程做的一些工作,
// 线程池线程能访问逻辑调用上下文数据
ThreadPool.QueueUserWorkItem (state => Console.WriteLine("Before:" + CallContext.LogicalGetData("Name")));

// 现在,阻止Main线程的执行上下文的流动
ExecutionContext.SuppressFlow();
// 然后,再做同样的事
ThreadPool.QueueUserWorkItem (state => Console.WriteLine("After:" + CallContext.LogicalGetData("Name")));
}

// output:
// Before:Jeffrey
// After:

阻断后的输出After,为空。注意,无论是对于ThreadPool还是Task对象,都有效。

线程池如何管理线程

设置线程池限制

CLR允许开发人员设置线程池要创建的最大线程数。书中说默认大概1000。不推荐自己设置,但是可以使用ThreadPool类提供的几个静态方法:GetMaxThreads、SetMaxThreads、GetMinThreads、SetMinThreads、GetAvailableThreads 来获取或更改线程数信息。

线程池调度机制

全局队列

当调用ThreadPool.QueueUserWorkItem、Timer类、Task时,任务会被添加进全局队列中,全局队列采用先进先出FIFO的方式,让工作者线程们自己去取任务。

全局队列使用线程同步锁,这是为了避免多个线程同时取到一个任务。

本地队列

在全局队列中领完的任务会放入工作者线程各自的本地队列,本地队列采用后入先出LIFO的方式来执行任务。

本地队列一般不锁,因为只有对应的工作者线程访问它。但是也有例外,看下面。

调度机制

下面从工作者线程的角度去理解整体调度流程。

如果工作者线程发现自己本地队列空了,就会尝试从另一个工作者线程的本地队列“偷”一个Task。这个Task在本地队列的队尾,并会要求获取一个线程同步锁。

如果所有本地队列都空了,工作者线程会使用FIFO算法从全局队列取出一个工作项并获得它的锁。

如果全局队列也为空,工作者线程会进入睡眠状态。

如果工作者线程睡眠时间很长,它会自己醒来并销毁自身,释放线程使用的资源(内核、栈等)。

协作式取消和超时

讲的是可以用一个辅助类,可以传递自己的一个bool字段作为参数给线程,也可以注册一些回调事件。当执行这个辅助类的Cancel方法时,会将bool字段设置成false并执行那些回调。从而实现操控线程停止事务。

无论是ThreadPool方法还是Task,都能使用这种方法。

CancellationTokenSource

System.Threading.CancellationTokenSource就是上面说的辅助类。

1
2
3
4
5
6
7
8
9
10
11
public sealed class CancellationTokenSource: IDisposable// 一个引用类型
{
public CancellationTokenSource();
public void Dispose();// 释放资源,比如WaitHandle

public Boolean IsCancellationRequested { get; }// 执行Cancel后就会变成false
public CancellationToken Token { get; }

public void Cancel();// 内部调用 Cancel并传递 false
public void Cancel(Boolean throwOnFirstException);// 如果true,回调执行一遇到抛错就立刻返回;如果false,直到全部回调执行结束才会抛出一个报错list
}

这个类通过Token方法可以获得一个或多个CancellationToken(一个值类型),可以对它注册回调,当Source.Cancel的时候会触发Token们的回调

1
2
3
4
5
6
7
8
9
10
11
12
13
public struct CancellationToken { // 一个值类型
public static CancellationToken None { get; }

public Boolean IsCancellationRequested { get; } // 由非通过Task调用的操作调用(ThreadPool)
public void ThrowIfCancellationRequested(); // 由通过Task调用的操作调用(Task)

// CancellationTokenSource, WaitHandle会收到信号
public Waithandle Waithandle { get; }
// Gethashcode, Equals, operator==和 operator!=成员未列出

public Boolean Canbecanceled { get; }// 很少使用
public CancellationTokenRegistration Register(Action<object> callback, Object state, Boolean useSynchronizationContext);// 未列出更简单的重载版本
}

例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
void Main()
{
CancellationTokenSource cts = new CancellationTokenSource();
CancellationToken token = cts.Token;
// 注册回调
token.Register(() => Console.WriteLine("Count is cancelled, thread " + Thread.CurrentThread.ManagedThreadId));
ThreadPool.QueueUserWorkItem(_ => Count(token, 1000));

Console.WriteLine("Press <Enter> to cancel Thread Works");
Console.ReadLine();
cts.Cancel();

Console.Read();
}


private static void Count(CancellationToken token, int countTo)
{
for (int count = 0; count < countTo; count++)
{
// 当Source执行Cancel时,会变成false
if (token.IsCancellationRequested)
{
Console.WriteLine("break, thread " + Thread.CurrentThread.ManagedThreadId);
break;
}

Console.WriteLine(count);
Thread.Sleep(200);
}
}

// output:
// Press <Enter> to cancel Thread Works
// 0
// 1
// 2
// Count is cancelled, thread 1
// break, thread 3

可以考虑这么用IsCancellationRequested字段以及Token回调事件

注意了,输出的线程号不一样:**回调事件是调用线程做的(就是主线程thread 1)**。

使用任务(Task)

使用ThreadPool的方法,很容易就能发起一次异步的计算限制操作。但是它不透明,你无法知道什么时候完成,也无法对其添加回调函数。所以我们通过System.Threading.Tasks中的类型来使用任务。

Task与ThreadPool等价写法

1
2
3
4
ThreadPool.QueueUserWorkItem(DoSth, 5);// 用线程池方法

new Task(DoSth, 5).Start();// 用Task做相等的事
Task.Run(() => Dosth(5));// 与上面等价

注意!无论是ThreadPool还是Task.Run,他们都是从线程池取线程

等待任务

使用Task.Wait();可以让线程等待返回结果,返回类型在创建线程Task<TResult>的时候指定,返回值通过Task.Result获取。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void Main() {
Task<int> task = new Task<int>(() =>
{
Thread.Sleep(3000); // 模拟做其他工作(3秒)

return 100;
});

task.Start(); // 此时才开始执行任务

task.Wait(); // 进行3秒的等待

Console.WriteLine(task.Result); // Result属性内部会调用Wait !!!
}

// 3秒后output: 100

如果在Task中抛错,异常会被吞噬并存储到一个集合中,线程会回到线程池。等到再调用Wait或Result时,会抛出System.AggregateException对象,它就是那个存储异常的集合。

但是如果一直不调用Wait或Result,就抛不出错。你可以通过向TaskScheduler.UnobservedTaskException事件登记回调函数来使Task被GC时抛出其异常。

Task.WaitAny

WaitAny静态方法 会阻塞调用线程,直到数组中的任何Task对象完成。方法返回一个int索引值,指明完成的是数组中哪个Task对象。方法返回后,线程被唤醒并继续运行,如果发生超时返回-1。

Task.WaitAll

WaitAll静态方法 也会阻塞调用线程,直到数组中的所有Task对象完成。方法返回一个bool值,true代表完成,false代表超时。

取消任务

前面使用CancellationTokenSource的IsCancellationRequested属性来取消ThreadPool事务,这边Task类似,不过用的是Source的ThrowIfCancellationRequested方法

1
2
3
4
5
6
7
8
9
10
private static int Dosth(CancellationToken ct, int n) {
int sum;
for(; n > 0; n--){
// 调用该Source的Cancel方法后,会抛出OperationCanceledException,否则不会抛出。
ct.ThrowIfCancellationRequested();

sum += n;
}
return sum;
}

和正常Task抛错一样会被吞,直到你调用Wait或Result才会抛出来。

延续任务

任务完成时自动启动新任务,用ContinueWith方法。在任务结束后,会调用线程池的其他线程继续帮你完成任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void Main() {
Task<int> t = Task.Run(() =>
{
Console.WriteLine("thread id " + Thread.CurrentThread.ManagedThreadId);
Thread.Sleep(3000); // 模拟做其他工作(3秒)
return 100;
});

Task cwt = t.ContinueWith(task => Console.WriteLine(task.Result + ",thread id " + Thread.CurrentThread.ManagedThreadId)); // 结束后调用线程池的其他线程
}

// output:
// thread id 3
// 100,thread id 4

具体看thread id,执行任务 和 执行延续任务 的线程不是同一个。

可以传递一个TaskContinuationOptions位标志枚举类给延续任务,默认是None,有一个可能用到的TaskContinuationOptions.OnlyOnRanToCompletion枚举值,传这个代表只有在第一个任务成功执行(无抛错无取消)的情况才会执行延续任务。

任务启动子任务

套娃。直到所有子任务运行结束,父任务才认为自己结束。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
void Main(){
Console.WriteLine("main,thread id:" + Thread.CurrentThread.ManagedThreadId);

Task t = Task.Run(() =>
{
Console.WriteLine("main task,thread id:" + Thread.CurrentThread.ManagedThreadId);
new Task(() => Console.WriteLine("sub1,thread id:" + Thread.CurrentThread.ManagedThreadId)).Start();
new Task(() => Console.WriteLine("sub2,thread id:" + Thread.CurrentThread.ManagedThreadId)).Start();
new Task(() => Console.WriteLine("sub3,thread id:" + Thread.CurrentThread.ManagedThreadId)).Start();
// 这回咱们不sleep了,就看看是不是用了回收的线程,结果确实用了!
});

}

// output:
// main,thread id:1
// main task,thread id:3
// sub1,thread id:4
// sub3,thread id:3
// sub2,thread id:4

看输出,顺便测试了一下子线程会不会借助已经回收到线程池的父线程来完成任务,结果是肯定的!

任务内部揭秘

Task比起ThreadPool多了很多可控性,但是这不是无偿的,Task方式起线程会比ThreadPool方法至少多了:

  • Task唯一标识id,从1开始递增分配。只有查询时才分配,一经分配不会重复发放相同数字。
  • 执行状态id。通过Task.Status属性查询。
  • 父任务引用、回调方法引用、回调方法需要参数的引用
  • 一个CancellationToken、ContinueWithTask对象集合
  • etc.

Task的生命周期

可通过Task.Status属性查询执行状态id。只展开聊这个,透过它能理解Task的生命周期。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public enum TaskStatus {
// -----------这里只贴出生命周期内的状态-----------

Created, // 任务已显式创建:可以手动 Start()这个任务
WaitingForActivation, // 任务已隐式创建:会自动开始

WaitingToRun, // 任务已调度,但尚末运行
Running, // 任务正在运行

// 任务正在等待它的子任务完成,子任务完成后它才完成
WaitingForChildrenToComplete,

// 任务的最终状态是以下3个之一
RanToCompletion,
Canceled,
Faulted,
}

下面贴出微软文档的这个类,补全一下,看的话看上面的就行。

枚举 详细
Canceled 6 该任务已通过对其自身的 CancellationToken 引发 OperationCanceledException 对取消进行了确认,此时该标记处于已发送信号状态;或者在该任务开始执行之前,已向该任务的 CancellationToken 发出了信号。 有关详细信息,请参阅任务取消
Created 0 该任务已初始化,但尚未被计划。
Faulted 7 由于未处理异常的原因而完成的任务。
RanToCompletion 5 已成功完成执行的任务。
Running 3 该任务正在运行,但尚未完成。
WaitingForActivation 1 该任务正在等待 .NET 基础结构在内部将其激活并进行计划。
WaitingForChildrenToComplete 4 该任务已完成执行,正在隐式等待附加的子任务完成。
WaitingToRun 2 该任务已被计划执行,但尚未开始执行。

任务调度器

就是TaskScheduler,它负责执行被调度的任务,同时向VS调试器公开任务信息。

官方提供2种TaskScheduler:

  • 线程池任务调度器(thread pool task scheduler)
  • 同步上下文任务调度器(synchronization context task scheduler)

默认情况,应用程序使用线程池任务调度器。它将任务调度给线程池的工作者线程。调用TaskScheduler.Default方法获取线程池任务调度器引用。

同步上下文任务调度器将借助画面线程!调用TaskScheduler.FromCurrentSynchronizationContext方法获取同步上下文任务调度器引用。

同步上下文任务调度器

同步上下文任务调度器适合提供了图形用户界面的应用程序,比如wpf。它将所有任务都调度给应用程序的GUI线程,使所有任务代码都能成功更新UI组件。

该调度器不使用线程池,它调度的还是画面线程。如果用Default调度器的策略,线程池线程执行更新UI组件,会抛InvalidOperationException。

用Task更新UI例子 ♥wpf

wpf与unity一样,都是单ui线程的,其他线程没有办法调用它的api来修改ui。

所以下面可以看到,用上下文同步给Task从而实现更新UI,**实际上仍然是借调了主线程(ui线程)**,这个Task执行的时候你可以明显感觉到画面卡顿,因为画面线程在干活。

从来就没有什么魔法…

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public partial class MainWindow : Window
{
private TaskScheduler taskScheduler;

public MainWindow()
{
taskScheduler = TaskScheduler.FromCurrentSynchronizationContext();
InitializeComponent();

Console.WriteLine("GUI thread id:" + Thread.CurrentThread.ManagedThreadId);

Task<int> t = Task.Run(() =>
{
Thread.Sleep(2000);
Console.WriteLine("timing 1 thread id:" + Thread.CurrentThread.ManagedThreadId);
return 100;
});

t.ContinueWith(o =>
{
Console.WriteLine("timing 2 thread id:" + Thread.CurrentThread.ManagedThreadId);
myTextBox.Text = t.Result.ToString();
}, taskScheduler); // 注意!!! 如果这里不使用上下文同步的taskScheduler,会抛InvalidOperationException且更新UI失败
}

}

// output:
// GUI thread id:1
// timing 1 thread id:3
// timing 2 thread id:1

其他自定义调度器

![image-20220113173939283](E:\My Github\hexo\blog\source_posts\Tech\CSharp\CLR-Via-CSharp\cp7.assets\image-20220113173939283.png)

并行执行

Parallel

静态 System.Threading.Tasks.Parallel 类,内部使用Task对象,可以用线程池的线程来并行执行操作。

要注意,并行执行对应顺序执行,它是无法保证顺序的。

For、ForEach、Invoke

1
2
3
4
5
6
7
8
9
// 线程池的线程并行完成
Parallel.For(0, 1000, i=> DoSth(i));
Parallel.ForEach(collection, item => DoSth(item)); // ForEach要用迭代器遍历,比For稍慢

Parallel.Invoke(
() => Method1(),
() => Method2(),
() => Method3(),
);

另外,Invoke有一个可控性极强的重载,指定 localInit、body、localFinally委托

1
2
3
4
5
6
Parallel.ForEach<TSource,TLocal> (IEnumerable<TSource> item, Func<> localInit, Func<> body, Func<> localFinally);

// item 是集合的每个子元素。
// localInit(初始化委托) 为参与工作的每个任务都调用一次该委托,在body前调用。
// body(主体委托) 为参与工作的每个任务都调用一次该委托。
// localFinally(终结委托) 为参与工作的每个任务都调用一次该委托,在body后调用。

返回值ParallelLoopResult

For、ForEach都返回一个ParallelLoopResult实例。

1
2
3
4
5
6
public struct ParallelLoopResult {
// 如果操作提前终止,以下方法返回 false
public bool IsCompleted { get; }
// 保证得到处理的最低一项的索引
public Int64? LowestBreakIteration { get; }
}

如果 IsCompleted == true 说明全部运行完成,

如果 IsCompleted == false、LowestBreakIteration == null 说明某个线程调用了Stop,

如果 IsCompleted == false、LowestBreakIteration != null 说明某个线程break了。

并行语言继承查询(PLINQ)

当只用一个线程来顺序执行调用LINQ时,我们称之为LINQ;当用并行的方式执行LINQ时,我们称之为**PLINQ(Parallel LINQ)**。

使用扩展方法AsParallel实现。

1
2
public static ParallelQuery<TSource> AsParallel<TSource> (this IEnumerable<TSource> source);
public static ParallelQuery Asparallel (this IEnumerable source);

执行定时计算限制操作

Timer类

定时器,在System.Threading中定义的。

Timer的构造函数需要传入一个TimerCallback委托 回调函数,在时间到达后使用线程池线程调用这个回调函数。

TimerCallback委托

1
delegate void TimerCallback(Object state);

Timer内部实现

在内部,线程池为所有Timer对象只打算使用一个线程。这个线程知道下一个Timer对象在什么时候到期(计时器还有多久触发)。下一个Timer对象到期时,线程就会唤醒,在内部调用 Threadpool的 QueueUserWorkItem,将一个工作项添加到线程池的队列中,使你的回调方法得到调用。

那么如果new了很多个Timer,那么有可能出现并行执行的情况,这个时候会用新的线程来帮你同时执行回调方法。

想避免用新线程,可以在构造Timer时传入period参数指定Timeout.Infinite,这样,计时器就只触发一次。然后,在你的回调方法中,调用 Change方法来指定一个新的dueTime,dueTime告诉CLR在首次调用回调方法之前需要等待多少毫秒,可以传递0立刻执行。

例子、await允许线程返回

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
static Timer timer;

static void Main(string[] args)
{
/// Timer(回调, 回调参数, dueTime, period)
timer = new Timer(Status, null, 0, Timeout.Infinite);
Console.Read();
}

static void Status(object state)
{
Console.WriteLine(DateTime.Now + " " + Thread.CurrentThread.ManagedThreadId);
Thread.Sleep(1000);

/// Timer.Change(dueTime, period)
timer.Change(2000, Timeout.Infinite); // 返回前让Timer在2秒后再次触发
// 这个方法返回后,线程回归池中,等待下一个工作项
}

// output:
// 2022/01/14 11:35:32 4
// 2022/01/14 11:35:35 4
// 2022/01/14 11:35:38 4

然后书中用了await/async写一个等效的实现。我这里打印了线程号,可以观察出来其实两者并不同。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
static void Main(string[] args)
{
Status(null);
Console.WriteLine(DateTime.Now + " main " + Thread.CurrentThread.ManagedThreadId);
Console.Read();
}

static async void Status(object state)
{
while (true)
{
Console.WriteLine(DateTime.Now + " " + Thread.CurrentThread.ManagedThreadId);
Thread.Sleep(1000);

// 模拟做2秒的事
await Task.Delay(2000);
// 2秒之后,某个线程池的线程会在await之后介入并继续循环
}
}

// output:
// 2022/01/14 11:30:33 1
// 2022/01/14 11:30:34 main 1
// 2022/01/14 11:30:36 4
// 2022/01/14 11:30:39 4

观察输出理解await做了什么:

画面线程走到await之前都是正常顺序执行的,但是遇到了await之后立刻退出了方法并继续执行了;而Status方法借由线程池线程来继续执行

所以画面既没卡死,也在不断更新UI。我估计await这里的做法就是起了个Task把回调包进去,然后传递了同步上下文给它让他可以更新UI。

大章28:I/O限制的异步操作⭐

讲述如何异步执行I/O限制的操作,允许将任务交由硬件设备处理,期间完全不占用线程和CPU资源。当然这还是离不开线程池得处理。

Windows如何执行I/O操作

从硬件开始理解

图中展示的是连接了几个硬件设备的OS。

其实每个硬件都有自己的电路板,知道如何驱动自己的设备(寻道、读写数据、与计算机内存交换数据),所以其实不需要计算机线程来处理这些事。

程序请求I/O流程⭐

当你使用FileStream fs = new FileStream(...); var bytes = fs.Read();之后发生了这些事:

① 线程从托管代码转变为本机代码,Read内部调用Win32 ReadFile函数

② ReadFile分配一个小数据结构,称作I/O请求包也就是IRP (I/O Request Packet)。

IRP结构初始化后包括:文件句柄,文件中的偏移量(字节读取开始位置)、一个Byte[]数组地址(用于写入读取的字节)等等。

④ 之后ReadFile将线程从 本机/用户模式代码=>本机/内核模式代码,从而调用Windows内核并向其传递IRP数据包。Windows内核根据IRP中得设备句柄,传递给对应硬件设备的驱动程序的IRP队列

⑤ 对应设备驱动软件在自己的IRP队列拿到IRP数据包,将信息传给物理硬件设备得电路板,执行I/O操作。

⑥ 设备执行I/O期间,发出请求的线程将无事可做,所以Windows将其变成睡眠状态。

⑦ 最终,硬件完成I/O操作。Windows会唤醒你的线程,并调度给一个CPU使它从内核模式返回用户模式,再返回至托管代码

异步请求I/O流程⭐

上面演示了正常(同步)请求I/O流程,可以看⑥中你的线程在等待I/O期间是无所事事的。这是极大的浪费。

当你以异步方式发送I/O请求(用ReadAsync而不是Read) FileStream fs = new FileStream(...,FileOptions.Asynchronous); Task<int> task = fs.ReadAsync(); 时发生了这些事:

① ReadAsync内部分配一个Task<int>对象来代表用于完成读取操作的代码。

② 和同步一样的方式,初始化IRP并发送到对应驱动软件IRP队列。

③ 驱动软件根据IRP数据包让硬件执行I/O,此刻,你的线程并不再睡眠,而是直接返回你的代码

④ 从代码上看,你的线程会从ReadAsync的调用中很快返回,但是IRP可能还未处理好,所以不能在ReadAsync之后的代码中访问获取的Byte[]。

那么什么时候可以访问结果Byte[]呢?答案是为①中返回的Task<int>对象添加延续任务ContinueWith来执行回调方法。C#的异步函数可以简化这部分代码,从而以顺序方式写代码让你感觉像写同步I/O代码一样。

I/O异步对比同步优势

资源上来说,同步阻塞线程,会导致CLR线程池不得不开辟更多线程来继续操作,增大开销;当I/O结束时线程又会回到线程池,导致过饱和,下场就是CPU同步上下文轮询花费更久。

效率上来说,一个线程去同步执行10个下载任务,每个5秒就得50秒;异步执行就只需要5秒。

C#的异步函数⭐

前面聊过了使用 ReadAsync + Task.ContinueWith 来实现异步,但是那么写代码比较复杂,于是微软推出了 异步函数 + Task 的方案,使开发者能够以同步顺序写代码实现异步。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
static void Main()
{
Console.WriteLine(DateTime.Now + " before " + "tid:" + Thread.CurrentThread.ManagedThreadId);
Test(3);
Console.WriteLine(DateTime.Now + " after " + "tid:" + Thread.CurrentThread.ManagedThreadId);

Console.Read();
}

static async void Test(int i)
{
await Task.Delay(3000);
var result = i * i;
Console.WriteLine(DateTime.Now + " body " + result + " tid:" + Thread.CurrentThread.ManagedThreadId);
}

// output:
// 17:16:35 before tid:1
// 17:16:35 after tid:1
// 17:16:38 body 9 tid:4

照这个示例来一步一步说明。

async => 状态机

我们很容易分辨Test是异步函数,因为他用async来修饰。

一旦方法被标记为async,编译器就会将方法的代码转换成实现了状态机的一个类型。具体下节讨论。

await =>Task.ContinueWith

await Task.Delay(3000);这一句,Delay方法会在内部分配一个Task并实现task.ContinueWith(await之后所有的代码);等价的效果。

最后,状态机恢复

当把后续代码包到Task的延续任务之后,线程不再等待Task的执行完毕,而是直接回到调用方法的地方Test(3);执行它之后的代码…

等到Task内部方法运行结束后,一个线程池线程会通知Task对象,开始激活ContinueWith回调方法(线程恢复状态机)。

编译器如何将异步函数转换成状态机⭐

通过实现一个简单的异步函数使用案例,然后将IL代码反编译成C#代码来理解编译器到底做了些什么。

源代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
internal sealed class Type1 { }
internal sealed class Type2 { }

private static async Task<Type1> Method1Async()
{
// 异步执行一些操作,最后返回一个Type1对象
}

private static async Task<Type2> Method2Async()
{
// 异步执行一些操作,最后返回一个Type2对象
}

private static async Task<string> MyMethodAsync(int arg)
{
int local = arg;

try
{
Type1 result1 = await Method1Async();
for (int x = 0; x < 3; x++)
{
Type2 result2 = await Method2Async();
}
}
catch (Exception e)
{
Console.WriteLine("Catch");
}
finally
{
Console.WriteLine("Finally");
}

return "Done";
}

反编译代码

IL反编译成C#并精简后的代码:

要点是观察状态机的状态m_state在什么地方变化。

然后再观察m_x怎么实现的await循环for(){ await ...};

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
// AsyncStateMachine特性指出这是一个异步方法(对使用反射的工具有用)
// 类型指出实现状态机的是哪个结构
[DebuggerStepThrough, AsyncStateMachine(typeof(StateMachine))]
private static Task<string> MyMethodAsync(Int32 arg) {

// 创建状态机实例并初始化它
StateMachine stateMachine = new StateMachine() {
// 创建 builder,从这个存根方法返回Task<String>
// 状态机访问builder来设置Task完成/异常
m_builder = AsyncTaskMethodBuilder<string>.Create(),

m_state = -1, // 初始化状态机位置
m_arg = arg, // 将实参拷贝到状态机字段
};

// 开始执行状态机
stateMachine.m_builder.Start(ref statemachine):
return stateMachine.m_builder.Task; // 返回状态机的Task
}

// 这是状态机结构
[CompilerGenerated, StructLayout(LayoutKind.Auto)]
private struct StateMachine : IAsyncStateMachine {

// 代表状态机 builder(Task)及其位置的字段
public AsyncTaskMethodBuilder<string> m_builder;
public int m_state;

// 实参和局部变量现在成了字段
public int m_argument, m_local, m_x;
public Type1 m_resultType1;
public Type2 m_resultType2;

// 每个 awaiter 类型一个字段
// 任何时候这些字段只有一个是重要的,那个字段引用最近执行的、以异步方式完成的 await
private TaskAwaiter<Type1> m_awaiterType1;
private TaskAwaiter<Type2> m_awaiterType2;

// 这是状态机方法本身
void IAsyncStateMachine.MoveNext() {
string result = null; // Task的结果值

// 编译器插入txy块来确保状态机的任务完成
try {
bool executeFinally = true; // 先假定逻辑上离开try块
if(m_state == -1) { // 如果第一次在状态机方法中,
m_ local = m_argument; // 原始方法就从头开始执行
}

// 原始代码中的try块
try{
TaskAwaiter<Type1> awaiterType1;
TaskAwaiter<Type2> awaiterType2;

switch(m_state) {
case -1: // 开始执行源代码中try块中的代码
// 调用 Method1Async并获得它的 awaiter
awaiterType1 = Method1Async().GetAwaiter();
if(!awaiterType1.IsCompleted){
m_state = 0; // Method1Async要以异步方式完成
m_awaiterType1 = awaiterType1; // 保存awaiter以便将来返回

// 告诉awaiter在操作完成时调用MoveNext
m_builder.AwaitUnsafeOnCompleted(ref awaiterType1, ref this);
// 上述代码调用 awaiterType1 的 OnCompleted,它会在被等待的任务上调用 ContinueWith(t=> MoveNext())
// 任务完成后ContinueWith调用MoveNext

executeFinally = false; // 逻辑上不离开try块
return; // 线程返回至调用者
}
// Method1Async以同步方式完成了
break;

case 0: // Method1Async 以异步方式完成了
awaiterType1 = m_awaiterType1; // 恢复最新的awaiter
break;
case 1: // Method2Async 以异步方式完成了
awaiterType2 = m_awaiterType2; // 恢复最新的awaiter
goto ForLoopEpilog;
}

// 在第一个await后,我们捕捉结果并启动for循环
m_resultType1 = awaiterType1.GetResult(); // 获取awaiter的结果

//序幕
ForLoopPrologue:
m_x = 0; // for循环初始化
goto ForLoopBody;
//收场
ForLoopEpilog:
m_resultType2 = awaiterType2.GetResult();
m_x ++;
// 循环主体
ForLoopBody:
if(m_x < 3) {
// 调用 Method2Async并获取它的 awaiter
awaiterType2 = Method2Async().GetAwaiter();

if(!awaiterType2.IsCompleted){
m_state = 1; // Method2Async要以异步方式完成
m_awaiterType2 = awaiterType2; // 保存 awaiter以使将来返回

// 告诉 awaiter在操作完成时调用 MoveNext
m_builder.AwaitUnsafeOnCompleted(ref awaiterType2, ref this);
executeFinally = false; // 逻辑上不离开try块
return; // 线程返回至调用者
}

// Method2Async以同步方式完成了
goto ForLoopEpilog; // 如果走这个goto,意味着以同步方式完成就再次循环
}

}
catch (Exception) {
Console.WriteLine("Catch"); // 源代码的catch块
}
finally {
// 本来,只要线程物理上离开try就会执行finally,
// 但我们希望在线程逻辑上离开try时才执行这些代码
if(executeFinally){
Console.WriteLIne("Finally"); // 源代码的finally块
}
}
result = "Done"; // 源代码最后一句
}
catch (Exception e){
// 未处理的异常:通过设置异常来完成状态机的Task
m_builder.SetException(e);
return;
}
// 无异常:通过返回结果来完成状态机的Task
m_builder.SetResult(result):
}
}

梳理一下

当编译器遇到async时,会尝试将方法编译成一个new状态机

当编译器遇到await时,编译器会在它上面调用GetAwaiter方法。调用该方法所返回的对象为awaiter,正是它将被等待的对象与状态机粘合起来。

状态机获得awaiter后会查询其IsCompleted属性:如果操作以同步方式执行就返回true,如果以异步方式完成就返回false。如果为false,状态机调用awaiter的OnCompleted方法并传递一个委托(内含原任务和MoveNext)。此刻,状态机允许它的线程回到原地以执行其他代码。

将来某个时候,awaiter会在完成任务时调用委托以执行MoveNext,状态机往下运行,从而使得方法也回到当初离开的位置继续执行。之后,方法内其他的awaiter得以以同样的方式执行。

异步函数扩展性

在扩展性方面,只要是能用Task对象包装的操作,都可以用await来等待。

TaskLogger

clr作者自制的类,可用它打印尚未完成的异步操作。会影响性能,可以只在调试的时候启用。

// TODO

await支持void

异步函数的返回类型一般是TaskTask<Result>,它们代表函数的状态机完成。

但是异步函数是支持返回void的,毕竟所有事件签名几乎都是这么写的:

1
void EventHandlerCallback(Object sender, EventArgs e);

C#编译器仍然会给返回void的异步函数创建状态机,但不再创建Task对象(因为创建了也没法使用)。所以你无法知道返回void的异步函数状态机在什么时候运行完毕。

但这一般不致命,因为一般不需要知道它什么时候结束,只要可以在它执行结束完再执行后续代码就行了。

异步函数和异常处理

如果硬件驱动收到IRP后,在进行I/O时虽然完成了但是内部出现错误,他就会向CLR的线程池post已完成的IRP。一个线程池线程会完成Task对象并设置异常,当你的状态机恢复后,await操作符会发现操作失败并引发该异常。

前面提过,当你用Task.Start()这种方式时,通常抛出一个AggregateException;但是如果你用await someTask的方式,编译器会优化体验,直接抛出第一个内部异常

线程处理模型:GUI线程⭐

简单聊下线程处理模型,.NET framework 支持很多种不同的应用程序模型,每种模型有自己的线程处理模型。比如说控制台应用程序没有任何线程处理模型。

GUI线程

而GUI应用程序比如wpf引入了一个线程处理模型,这个模型中UI元素只能由创建它的线程更新,而这个线程我们叫做GUI线程

如果通过线程池线程来更新UI元素,就会抛出异常。

await优化 = 恢复上下文

System.Threading.SynchronizationContext 类的派生对象将应用程序模型连接到它的线程处理模型。
我们不和这个类打交道,只要知道它可以让你获得正确的同步上下文就行了。

await someTask;的await关键字发生等待前,它会传递一个SynchronizationContext对象过去。之后,异步发生,会抽一个线程池线程来执行Task。当线程池线程完成Task后,会使用该SynchronizationContext对象,确保为应用程序模型使用正确的线程处理模型,换成人话就是会调用wpf的GUI线程来做事

这就是await做的优化:await someTask;之后的代码如果有更新UI元素也能正常实现。但是注意,Task是由线程池线程来执行的,所以你在Task里面更新UI元素仍然会抛错。

来个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public MainWindow()
{
InitializeComponent();
Test();

Console.WriteLine("GUI thread id:" + Thread.CurrentThread.ManagedThreadId);

}

async void Test()
{
var t = Task.Run(() =>
{
Console.WriteLine("task thread id:" + Thread.CurrentThread.ManagedThreadId);
Thread.Sleep(3000);
});
await t;

Console.WriteLine("after task thread id:" + Thread.CurrentThread.ManagedThreadId);
myTextBox.Text = "123"; // 3秒后成功更新UI
}

// output:
// GUI thread id: 1
// task thread id: 3
// after task thread id: 1

I/O请求优先级

Windows允许线程在发出I/O请求时指定优先级。但是只允许指定本进程的,不允许指定其他进程的。

1
2
3
4
5
public static void Main() {
using (ThreadIO.BeginBackgroundProcessing()){
// 在这里执行低优先级I/O请求(例如:调用 ReadAsync/ WriteAsync)
}
}

大章29:基元线程同步构造

锁的存在是为了实现线程同步、避免多个线程同时修改一个数据源从而造成数据损坏。锁的存在意义是一次只允许一个线程访问资源。什么时候不需要锁?多个线程如果只访问不修改一个数据源,是不需要锁的。

锁的缺点很多,

即便是最快的锁,也会损害性能,需要时间开销;

一次只允许一个线程访问资源是锁的存在意义,但是也就意味着某个线程会被锁阻塞,导致线程池不得不创建更多的线程来处理其他问题。

类库和线程安全

FCL(Framework Class Library)确保其所有静态方法都是线程安全的。比如Console类就有一个静态字段,类的许多方法都要释放获取这个字段上的锁,从而保证一次只有一个线程访问控制台。

线程安全

如果你的代码在进程中有多个线程同时运行一段代码,如果每次执行的结果都和单线程运行时的结果一致, 那么就是线程安全的。

线程安全方法并不意味着一定需要线程同步锁。

比如System.Math的静态Max方法:

1
2
3
public static int Max(int val1, int val2) {
return (val1 < val2) ? val2 : val1;
}

这个方法是线程安全的,它没有获取任何锁。多个线程可以同时调用Max方法,每个线程都处理的是自己的栈上数据,互不干扰。

使所有实例方法非线程安全,使静态方法线程安全

FCL并不保证实例方法是线程安全的。并不是不保证,而是没必要。正如标题,FCL是遵守这个原则的。

为什么说不保证实例方法线程安全?首先是没必要,线程在构造对象时,只有这个线程才拥有对象引用,其他线程都不能访问那个对象。其次是要了不好,线程同步锁会严重影响性能。

然而这句话也不是绝对的,如果线程随后公开了这个对象引用(比如把它放到一个静态字段中),传给了一个ThreadPool或Task的线程,那么在多个线程可能同时进行非只读访问的前提下,就需要线程同步

基元用户模式和内核模式构造

基元 (primitive)

基元,是指可以在代码中使用的最简单的构造。有两种基元构造:用户模式(user-mode)和内核模式( kernel-mode)。

用户模式的速度显著快于内核模式,但是这个速度的优势是由于协调是在硬件中发生的,这也同样意味着Windows操作系统永远检测不到一个线程在基元用户模式的构造上阻塞了。

而内核模式是由Windows操作系统自身提供的,所以它们在获取其他线程资源时,Windows会阻塞线程以避免它浪费CPU时间,直到资源可用才恢复线程。

线程在 用户模式 和 内核模式 之间转换,是一个巨大的开销。

死锁活锁

当一个线程遇到资源等待的问题时,

如果它是用户模式的,那么线程就会一直在一个CPU上运行,称为 活锁(livelock);

如果它是内核模式的,那么线程就会一直阻塞,称为 死锁(deadlock)。

死锁永远优于活锁,要知道,死锁只是卡死一个线程那也只是浪费了内存,而活锁线程除了内存,还一直在占用CPU的时间片。

用户模式构造

原子性⭐

CLR保证对以下数据类型变量的读写是原子性的:bool、char、(S)Byte、Int16、Int32、IntPtr、Single以及引用类型。这意味着变量中的所有字节都一次性读取或写入

什么意思呢,比如执行以下代码时,

1
2
3
4
5
int x;
x = 0x01234567;

Int64 y;
y = 0x0123456789abcdef;

x变量会一次性(原子性)地从0x00000000变成0x01234567。另一个线程不可能看到处于中间状态的值。

y变量就不同了,其他线程查询到它的值可能是0x0000000000000000 到 0x0123456789abcdef的值,因为读取和写入操作不是原子性的。这称为一次 torn read(撕裂读取:一次读取被撕成两半,或者在机器级别上要2个MOV指令才能读完)。

有两种基元用户模式线程同步构造:易变构造和互锁构造。

易变构造:在特定的时间,它在包含一个简单数据类型的变量上执行原子性的读或写操作。

互锁构造:在特定的时间,它在包含一个简单数据类型的变量上执行原子性的读和写操作。

易变构造 (volatile construct)

说的就是由于代码是由 C#编译器转换为IL语言、再由JIT将IL语言转换成本机CPU指令,最后再执行的,所以其中包括很多的编译器优化,导致你最终生成的代码运行顺序与你书写的不同。

可以使用两个System.Threading.Volatile类(易变构造)的静态方法来确保代码顺序执行而不被优化得偏离你的意图:

Volatile.Write 方法执行一次原子性的写入操作。强迫location中的值在调用时写入。按照编码顺序,之前的加载和存储操作必须在调用Volatile.Write之前发生。

Volatile.Read 方法执行一次原子性的读取操作。强迫location中的值在调用时读取。按照编码顺序,之后的加载和存储操作必须在调用Volatile.Read之后发生。

易变构造volatile 例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// 顺序可能被优化到偏离意图的执行案例
class Before {
private int flag = 0;
private int val = 0;

// 这个方法由一个线程执行
public void Thread1() {
val = 5;
flag = 1;
}

// 这个方法由另一个线程执行
public void Thread2() {
// 注意:value可能先于flag读取
if(flag == 1) {
Console.WriteLine(val);
}
}
}

// 修改后,确保按照意图执行
class After {
private int flag = 0;
private int val = 0;

public void Thread1() {
// flag的写入 必然在所有前面代码之前
val = 5;
Volatile.Write(ref flag, 1);
}

public void Thread2() {
// flag的读取 必然在所有后续代码之前
if(Volatile.Read(ref flag) == 1) {
Console.WriteLine(val);
}
}
}

互锁构造

互锁,说的是System.Threading.Interlocked类提供的方法,Interlocked类中的每个方法都执行一次原子读取以及写入操作。Interlocked的所有方法都建立了完整的内存栅栏(memory fence):调用某个Interlocked方法之前的任何变量写入都在这个Interlocked方法之前执行;这个调用之后的任何变量读取都在这个调用之后读取。

Interlocked类

展示对int处理的接口,来理解他能做哪些事。

1
2
3
4
5
6
7
8
9
10
public static class Interlocked {
// return (++location)
public static Int32 Increment (ref Int32 location);
// return (--location)
public static Int32 Decrement (ref Int32 location);

public static Int32 Add (ref Int32 location1, Int32 value);
public static Int32 Exchange (ref Int32 location1, Int32 value);
public static Int32 CompareExchange (ref Int32 location1, Int32 value, Int32 comparand);
}

// TODO 后面的都略了,太需要实战应用,光看不够

大章30:混合线程同步构造

双检锁技术

双检锁(Double- Check Locking)是一个非常著名的技术,开发人员用它将单实例(singleton)对象的构造推退到应用程序首次请求该对象时进行。也就是懒加载(也叫延迟初始化 lazy init)

单例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public sealed class Singleton {
// s_lock对象是实现线程安全所需要的。定义这个对象时,我们假设创建单实例对象的
// 代价高于创建一个System.Object对象,并假设可能根本不需要创建单实例对象
// 否则,更经济、更简单的做法是在一个类构造器中创建单实例对象

private static object s_lock = new object();

private static Singleton instance = null;

private Singleton() {
}

public static Singleton GetSingleton() {
if(instance != null) return instance;

Monitor.Enter(s_lock); // Monitor.Enter: 获取指定对象上的排他锁
if(instance == null) {
Singleton temp = new Singleton();
// 将引用保存到instance中(参见正文的详细讨论)
Volatile.Write(ref instance, temp);
}
Monitor.Exit(s_lock);

return instance;
}
}

内存栅栏

由于CLR对任何锁方法的调用都构成了一个完整的内存栅栏,在柵栏之前写入的任何变量都必须在栅栏之前完成;在栅栏之后的任何变量读取都必须在栅栏之后开始。所以第二个if中instance的值必须等待锁结束后,其他线程才能获取到,而不是什么缓存到寄存器中的东西。

放到JAVA就不一样了,它因为不保证完整的内存栅栏,导致第二个if可能读取到缓存到寄存器中的instance数值,从而返回true。

发布(publishing)

Volatile.Write(ref instance, temp);是解决了什么问题呢?

使一个值对其他线程可见称为发布。如果按照常规思路写instance = new Singleton();,编译器可能并不会按照你顺序写的代码来执行。它可能会先为Singleton分配内存,将引用发布到instance,然后再调用构造器,这导致了其他线程看到的instance不为null也不为正确的构造好的Singleton对象,而是一个正在构造的Singleton对象

简化版本

1
2
3
4
5
6
7
8
9
public sealed class Singleton {
private static Singleton instance = new Singleton();

private Singleton() { }

public static Singleton GetSingleton() {
return instance;
}
}

这个版本与前面的效果是一样的!即使它没用“双检锁”也能保证线程安全。具体原因看下面。

在首次有线程访问这个Singleton的时候,CLR就会自动调用类构造器,创建并返回一个对象实例。

线程安全⭐

关于线程安全需要特別说明一下,由于程序可能在多线程环境下运行,也就是可能出现同时多个线程准备执行静态构造函数的情況。CLR确保这个过程是安全的,实际上调用静态构造函数的线程需要先获得一个互斥线程同步锁,如果有多个线程试图执行类型的静态构造函数,只有一个线程能获得该锁;获得锁的线程完成初始类型初始化操作,其它线程只能等待;当初始化完成,等待的线程被唤醒,然后发现静态构造函数已经被执行过,就不会再执行,此时获得的字段也是初始化完成后的状态。

下面献上我自己的测试代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
class Program
{
private static bool stop = false;

static void Main(string[] args)
{
Task.Run(() =>
{
Utils.WriteLine(" Task1 Started");
Utils.WriteLine(" Task1 " + Singleton.Instance().counter.ToString());
Utils.WriteLine(" Task1 End");
});
Utils.WriteLine( "------------------");
Task.Run(() =>
{
Utils.WriteLine(" Task2 Started");
Utils.WriteLine(" Task2 " + Singleton.Instance().counter.ToString());
Utils.WriteLine(" Task2 End");
});

Console.Read();
}



}

public sealed class Singleton
{
public int counter;

private static Singleton instance = new Singleton();

private Singleton()
{
counter++;
Utils.WriteLine(" ctor bf" + counter);
Thread.Sleep(5000);
counter += 10;
Utils.WriteLine(" ctor af" + counter);
}

public static Singleton Instance()
{
return instance;
}
}

public class Utils
{
public static void WriteLine(string txt)
{
Console.WriteLine(Thread.CurrentThread.ManagedThreadId + txt);
}
}

静态构造、实例构造、内联执行顺序

引用:https://www.cnblogs.com/xiaoxiaotank/p/11155886.html

一般情况下是 静态字段内联 > 静态构造 > 实例字段内联 > 实例构造

如果加上基类就是:

Child静态字段内联 > Child静态构造 > Child实例字段内联 > Base静态字段内联 > Base静态构造 > Base实例字段内联 > Base实例构造 > Child实例构造

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
static void Main(string[] args)
{
Console.WriteLine("---------------一般初始化顺序---------------");
var child1 = new Child1();
Console.WriteLine("\n---------------子类静态字段初始化需要使用父类静态字段时初始化顺序---------------");
var child2 = new Child2();
Console.WriteLine("\n---------------子类静态构造函数中使用父类静态字段时初始化顺序---------------");
var child3 = new Child3();

Console.ReadKey();
}

public class Child1 : Base1
{
// 静态构造器
public static Display ChildStatic = new Display("Child static filed");

private Display _childFiled = new Display("Child filed");
// 静态构造器
static Child1() => Console.WriteLine("Child static ctor");
// 实例构造器
public Child1() => Console.WriteLine("Child ctor");
}

public class Child2 : Base2
{
/// <summary>
/// 子类静态字段初始化需要使用父类静态字段
/// </summary>
public static Display ChildStatic = new Display("Child static filed", () => BaseStatic);

private Display _childFiled = new Display("Child filed");

static Child2() => Console.WriteLine("Child static ctor");

public Child2() => Console.WriteLine("Child ctor");
}

public class Child3 : Base3
{
public static Display ChildStatic = new Display("Child static filed");

private Display _childFiled = new Display("Child filed");

/// <summary>
/// 子类静态构造函数中使用父类静态字段
/// </summary>
static Child3()
{
Console.WriteLine("Child static ctor");
var baseStatic = BaseStatic;
}

public Child3() => Console.WriteLine("Child ctor");
}

/// <summary>
/// 3个Base类相同,这里是为了演示静态成员的初始化
/// </summary>
public class Base1
{
// 静态字段
public static Display BaseStatic = new Display("Base static filed");
// 实例字段
private Display _baseFiled = new Display("Base filed");
// 静态构造器
static Base1() => Console.WriteLine("Base static ctor");
// 实例构造器
public Base1() => Console.WriteLine("Base ctor");
}

public class Base2
{
public static Display BaseStatic = new Display("Base static filed");

private Display _baseFiled = new Display("Base filed");

static Base2() => Console.WriteLine("Base static ctor");

public Base2() => Console.WriteLine("Base ctor");
}

public class Base3
{
public static Display BaseStatic = new Display("Base static filed");

private Display _baseFiled = new Display("Base filed");

static Base3() => Console.WriteLine("Base static ctor");

public Base3() => Console.WriteLine("Base ctor");
}

public class Display
{
public Display(string msg, Func<Display> displayFunc = null)
{
Console.WriteLine(msg);
var display = displayFunc?.Invoke();
}
}

result:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
---------------一般初始化?序---------------
Child static filed
Child static ctor
Child filed
Base static filed
Base static ctor
Base filed
Base ctor
Child ctor

---------------子?静?字段初始化需要使用父?静?字段?初始化?序---------------
Child static filed
Base static filed
Base static ctor
Child static ctor
Child filed
Base filed
Base ctor
Child ctor

---------------子?静??造函数中使用父?静?字段?初始化?序---------------
Child static filed
Child static ctor
Base static filed
Base static ctor
Child filed
Base filed
Base ctor
Child ctor

Lazy模式

FCL有两个类型封装了上面描述的模式,也就是懒汉模式

下面是泛型System.Lazy类(方法未列完):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class Lazy<T> {
public Lazy(Func<T> valueFactory, LazyThreadSafetyMode mode);
public Boolean IsValueCreated { get; }
public T Value { get; }
}

// enum
LazyThreadSafetyMode {
None, // 完全没有线程安全支持(适合GUI应用程序)
ExecutionAndPublication, // 使用双检锁技术
PublicationOnly, // 使用Interlocked.CompareExchange技术
}

// -------------------------------------------------------------------
// demo
void Main(){
// 创建一个“延迟初始化”包装器,它将Datetime的获取包装起来
Lazy<string> s = new Lazy<string>(() => DateTime.Now.ToLongTimeString(), true);

Console.WriteLine(s.IsValueCreated); // 还没查询Value,返回false
Console.WriteLine(s.Value); // 开始调用委托
Console.WriteLine(s.IsValueCreated); // 已经查询了Value,返回true
Thread.Sleep(10000);
Console.WriteLine(s.Value); // 委托没有被重复调用,显示相同结果是正常的
}

// demo output:
// False
// 2:40:42 PM
// True
// 2:40:42 PM