線程池
每個CLR有一個線程池
由CLR控制的所有AppDomain共享
線程池維護了一個「操作請求隊列」
應用程序執行一個異步操作時,就調用方法將一個記錄項追加到該隊列中
線程池從該隊列提取記錄項,將其派發給一個線程池線程
如果池裡沒有線程/發出請求的速度超過了線程池線程處理它們的速度,就創建新線程
線程完成任務後,返回線程池
當一個線程池閑置一段時間後,會自我銷毀並釋放資源
工作項
ThreadPool.QueueUserWorkItem
向線程追加一個「工作項」以及可選的狀態數據,然後方法立即返回
池中的某個線程會處理該「工作項」
「工作項」的回調方法必須匹配WaitCallback委托類型
delegate void WaitCallback(Object state);
static void Main(string[] args)
{
ThreadPool.QueueUserWorkItem(Test, null);
}
static void Test(object state) { }
執行上下文
每個線程都關聯了一個執行上下文的數據結構
該結構包含了
安全設置
宿主設置
邏輯調用上下文數據
每當一個線程使用另一個線程執行任務時,前者的執行上下文應複製到後者,確保後者使用的是同一套的設置(安全/宿主)
默認情況下,初始線程的執行上下文「流向」任何輔助線程
可以通過System.Threading.ExecutionContext控制執行上下文的流動
協作式取消
協作式取消
要取消的操作必須顯式支持取消
需要一個System.Threading.CancellationTokenSource對象
該對象構建後,可將其中的Token屬性傳給操作,使操作可以取消
static void Main(string[] args)
{
CancellationTokenSource cancelSrc = new CancellationTokenSource();
CancellationToken ct = cancelSrc.Token;
ThreadPool.QueueUserWorkItem(o => Count(ct, 100));
}
static void Count(CancellationToken token, int countTo)
{
for (int i = 0; i < countTo; i++)
{
if (token.IsCancellationRequested)
{
Console.WriteLine("Operation Cancelled");
break;
}
Console.WriteLine(i);
Thread.Sleep(200);
}
Console.WriteLine("Done");
}
任務
System.Threading.Task
Task.Run(() => { Count(ct, 10); });
Task比ThreadPool.QueueUserWorkItem有更多的內建機制
知道操作在甚麼時候完成
操作完成時獲得返回值
在Task構造時,可以傳遞TaskCreationOptions標志控制Task的執行方式
namespace System.Threading.Tasks
{
[Flags]
public enum TaskCreationOptions
{
None = 0,
PreferFairness = 1,
LongRunning = 2,
AttachedToParent = 4,
DenyChildAttach = 8,
HideScheduler = 16,
RunContinuationsAsynchronously = 64
}
}有的標志只是「提議」,TaskScheduler在調度一個Task時不一定會採納
等待任務完成並獲取結果
static void Main(string[] args)
{
Task<int> t2 = new Task<int>(() => { return Sum(10); });
t2.Start();
t2.Wait(); //等待
Console.WriteLine($"Result: {t2.Result}"); //.Result取得結果
}
static int Sum(int n)
{
int sum = 0;
for(int i = 1; i < n; i++)
{
sum += i;
Console.WriteLine($"Step Result: {sum}");
Thread.Sleep(200);
}
return sum;
}線程調用Wait方法時:
如果要等待的Task已經開始執行,那調用Wait的線程會阻塞直到Task運行結束
否則,系統可能用Wait的線程來執行Task,調用Wait的線程就不會阻塞
取決於TaskScheduler
如果線程在調用Wait前獲得了一個同步鎖,而Task試圖獲取同一個鎖,就會造成死鎖
除了Task的對象方法Wait,Task還供了兩個靜態方法
WaitAny
WaitAll
任務完成時自動啟動新任務
ContinueWith
Task<int> t2 = new Task<int>(() => { return Sum(10); });
t2.ContinueWith(task => Console.WriteLine("Next Action Operated"));
t2.Start();
t2.Wait();
Console.WriteLine($"Result: {t2.Result}");在調用ContinueWith時,可傳遞一組TaskContinuationOptions標志
namespace System.Threading.Tasks
{
[Flags]
public enum TaskContinuationOptions
{
None = 0,
PreferFairness = 1,
LongRunning = 2,
AttachedToParent = 4,
DenyChildAttach = 8,
HideScheduler = 16,
LazyCancellation = 32,
RunContinuationsAsynchronously = 64,
NotOnRanToCompletion = 65536,
NotOnFaulted = 131072,
OnlyOnCanceled = 196608,
NotOnCanceled = 262144,
OnlyOnFaulted = 327680,
OnlyOnRanToCompletion = 393216,
ExecuteSynchronously = 524288
}
}
啟動子任務
Task<int[]> parent = new Task<int[]>(() =>
{
var results = new int[3];
new Task(() => results[0] = Sum(5), TaskCreationOptions.AttachedToParent).Start();
new Task(() => results[1] = Sum(10), TaskCreationOptions.AttachedToParent).Start();
new Task(() => results[2] = Sum(15), TaskCreationOptions.AttachedToParent).Start();
return results;
});AttachedToParent標志將一個Task與創建它的Task關聯,結果是除非所有子任務結束運行,否則父任務不認為已經結束
任務內部揭秘
每個Task對象都有一組字段構成了任務的狀態
ID
Task執行狀態
對父任務的引用
對Task創建時指定的TaskScheduler的引用
對回調方法的引用
對要傳給回調方法的對象的引用
對ExecutionContext的引用
對ManualResetEventSlim對象的引用
對根據需要創建的補充狀態的引用
CancellationToken
ContinueWithTask對象集合
為拋出未處理異常的子任務而準備的Task對象集合
創建Task的所需內存遠比ThreadPool.QueueUserWorkItem大
任務工廠
Task t3 = new Task(() =>
{
var cts = new CancellationTokenSource();
var tf = new TaskFactory<int>(
cts.Token,
TaskCreationOptions.AttachedToParent,
TaskContinuationOptions.ExecuteSynchronously,
TaskScheduler.Default
);
var childTask = new[]
{
tf.StartNew(() => Sum(5)),
tf.StartNew(() => Sum(10)),
};
});用於創建一組共享相同配置的Task對象
CancellationToken
TaskScheduler
TaskCreationOptions
TaskContinuationOptions
子任務通過StartNew來創建
任務調度器
負責執行被調度的任務
FCL提供了兩個派生自TaskScheduler的類型
線程池任務調度器(默認)
將任務調度給線程池的工作者線程
同步上下文任務調度器
Task的優勢
使用內存、創建/銷毀的時間比線程少
線程池可用CPU數量自動伸縮任務規模
每個任務完成一個階段,運行任務的線程回到線程池,並等待接收新任務
線程池是站在整個進程的高度觀察任務,所以它能更好地調度任務,減少進程中的線程數和上下文切換
Parallel.For/ForEach/Invoke
Parallel.For/ForEach
Parallel.For(0, 1000, i => Console.WriteLine(i));
Parallel.ForEach(new[] { 1, 2, 3 }, item => Console.WriteLine(item));使用Parallel的靜態遍歷函數,可以多個線程來輔助遍歷提升效率
Parallel所有方法都讓調用線程參與處理
使用前提
工作項必須能并行執行
避免會修改任何共享數據的工作項
開銷
委托對象必須分配,針對每個工作項都要調用一次這些委托
如果每一個工作項都涉及大量工作,效率才會有所提升
如果工作項很少,或工作項本身的處理效率很快,使用Parallel反而會降低性能
并行LINQ(PLINQ)
Parallel LINQ
一般LINQ稱為「順序查詢」
并行查詢則為PLINQ
PLINQ內部使用Task,將集合中的數據項的處理工作分散到多個CPU上
和Parallel的方法類似,如果要同時處理大量項/每項處理過程耗時長,PLINQ能獲得更大收益
實際使用上,是將LINQ的指令通過ParallelEnumberable.AsParallel使調用的命令轉換成并行版本的
可以使用ParallelEnumberable.AsSequential把并行版本轉換成順序版本
并行查詢返回一個ParallelQuery的結果,應使用ParallelEnumerable.ForAll來處理
PLINQ用多個線程處理查詢,返回的結果是無序的
使用ParallelEnumerable.AsOrdered,線程會成組處理數據項,然後這些組被合并回去,同時保持順序,但會損害性能
可以調用WithMergeOption,傳遞ParallelMergeOption標志,控制結果的緩沖和合并方式
namespace System.Linq
{
public enum ParallelMergeOptions
{
Default = 0,
NotBuffered = 1,
AutoBuffered = 2,
FullyBuffered = 3
}
}
PLINQ會分析一個查詢,然後決定是順序處理還是并行處理
可以調用WithExecutionMode,並傳遞ParallelExecutionMode標志來強迫使用并行方式處理查詢
namespace System.Linq
{
public enum ParallelExecutionMode
{
Default = 0,
ForceParallelism = 1
}
}
System.Threading.Timer定時操作
System.Threading.Timer
讓一個線程池線程定時調用一個方法
構造Timer實例等同告訴線程池:在將來某個時間回調你的一個方法
回調方法委托類型:TimerCallback
delegate void TimerCallback(Object state);
內部工作
線程池為所有Timer對象只使用了一個線程,該線程知道下一個Timer對象的計時器還有多久觸發
Timer對象到期時,線程會被喚醒,調用ThreadPool.QueueUserWorkItem,將一個工作項添加到線程池的隊列中
如果回調方法執行時間很長,可能會造成多個線程池線程同時執行該回調
構造Timer時,應為period參數指定Timeout.Infinite,使計時器只觸發一次
回調方法中調用Timer的對象方法Change,dueTime指定Timeout.Infinite
使用Dispose方法完全取消計時器
Timer被GC時,其終結代碼告訴線程池取消計時器,對回調方法的調用也會停止
線程池的線程管理
最好是將線程池看成一個黑盒
線程池永遠都不應該設置線程數上限,因為可能發生飢餓或死鎖
CLR團隊一直增加線程池默認擁有的最大線程數
32位進程最大有2GB地址空間
加載了Win32、CLR DLLs,分配了本地堆和托管堆、用戶模式棧、線程環境塊後,大約能提供1360個線程
64位進程最大有8TB地址空間
具體的工作者線程管理
ThreadPool.QueueUserWorkItem和Timer類
總是將工作項放到全局隊列中
工作者線程採用FIFO將工作項從隊列中取出
所有工作者線程會競爭一個線程同步鎖,保証同一個工作項不會被多個線程獲取
可能成為瓶頸
TaskScheduler對Task的調度
非工作者線程調度Task時,該Task被添加至全局隊列
每個工作者線程有自己的本地隊列,工作者線程調度一個Task時,Task加到本地隊列
使用LIFO將任務從本地隊列取出
只有自己訪問自己,無需同步鎖
當發現本地隊列為空,會嘗試從其他工作者線程中的本地隊列尾部「偷」Task
此時需要獲取一個同步鎖
當所有本地隊列都為空,使用FIFO從全局隊列提取工作項
當全局隊列為空,進入睡眠,一定時間後還沒被喚醒剛自己醒來並銷毀自身
參考書目
- 《CLR via C#》(第4版) Jeffrey Richter