C#筆記 – 線程及異步(二) 任務

線程池
  • 每個CLR有一個線程池

    • 由CLR控制的所有AppDomain共享

  • 線程池維護了一個「操作請求隊列」

    • 應用程序執行一個異步操作時,就調用方法將一個記錄項追加到該隊列中

    • 線程池從該隊列提取記錄項,將其派發給一個線程池線程

    • 如果池裡沒有線程/發出請求的速度超過了線程池線程處理它們的速度,就創建新線程

  • 線程完成任務後,返回線程池

  • 當一個線程池閑置一段時間後,會自我銷毀並釋放資源

工作項
  • ThreadPool.QueueUserWorkItem

    • 向線程追加一個「工作項」以及可選的狀態數據,然後方法立即返回

    • 池中的某個線程會處理該「工作項」

    • 「工作項」的回調方法必須匹配WaitCallback委托類型

      • delegate void WaitCallback(Object state);
    • static void Main(string[] args)
      {
         ThreadPool.QueueUserWorkItem(Test, null);
      }

      static void Test(object state) { }

執行上下文
  • 每個線程都關聯了一個執行上下文的數據結構

  • 該結構包含了

    • 安全設置

    • 宿主設置

    • 邏輯調用上下文數據

  • 每當一個線程使用另一個線程執行任務時,前者的執行上下文應複製到後者,確保後者使用的是同一套的設置(安全/宿主)

  • 默認情況下,初始線程的執行上下文「流向」任何輔助線程

    • 可以通過System.Threading.ExecutionContext控制執行上下文的流動

協作式取消
  • 協作式取消

    • 要取消的操作必須顯式支持取消

    • 需要一個System.Threading.CancellationTokenSource對象

      • 該對象構建後,可將其中的Token屬性傳給操作,使操作可以取消

        • static void Main(string[] args)
          {
             CancellationTokenSource cancelSrc = new CancellationTokenSource();
             CancellationToken ct = cancelSrc.Token;
             ThreadPool.QueueUserWorkItem(o => Count(ct, 100));
          }

          static void Count(CancellationToken token, int countTo)
          {
             for (int i = 0; i < countTo; i++)
            {
                 if (token.IsCancellationRequested)
                {
                     Console.WriteLine("Operation Cancelled");
                     break;
                }
                 Console.WriteLine(i);
                 Thread.Sleep(200);
            }
             Console.WriteLine("Done");
          }

任務
  • System.Threading.Task

    • Task.Run(() => { Count(ct, 10); });
    • Task比ThreadPool.QueueUserWorkItem有更多的內建機制

      • 知道操作在甚麼時候完成

      • 操作完成時獲得返回值

  • 在Task構造時,可以傳遞TaskCreationOptions標志控制Task的執行方式

    • namespace System.Threading.Tasks
      {
        [Flags]
         public enum TaskCreationOptions
        {        
             None = 0,
             PreferFairness = 1,
             LongRunning = 2,
             AttachedToParent = 4,
             DenyChildAttach = 8,
             HideScheduler = 16,
             RunContinuationsAsynchronously = 64
        }
      }
    • 有的標志只是「提議」,TaskScheduler在調度一個Task時不一定會採納

  • 等待任務完成並獲取結果

    • static void Main(string[] args)
      {
         Task<int> t2 = new Task<int>(() => { return Sum(10); });
         t2.Start();
         t2.Wait(); //等待
         Console.WriteLine($"Result: {t2.Result}"); //.Result取得結果
      }

      static int Sum(int n)
      {
         int sum = 0;
         for(int i = 1; i < n; i++)
        {
             sum += i;
             Console.WriteLine($"Step Result: {sum}");
             Thread.Sleep(200);
        }
         return sum;
      }
    • 線程調用Wait方法時:

      • 如果要等待的Task已經開始執行,那調用Wait的線程會阻塞直到Task運行結束

      • 否則,系統可能用Wait的線程來執行Task,調用Wait的線程就不會阻塞

        • 取決於TaskScheduler

        • 如果線程在調用Wait前獲得了一個同步鎖,而Task試圖獲取同一個鎖,就會造成死鎖

    • 除了Task的對象方法Wait,Task還供了兩個靜態方法

      • WaitAny

      • WaitAll

  • 任務完成時自動啟動新任務

    • ContinueWith

    • Task<int> t2 = new Task<int>(() => { return Sum(10); });
      t2.ContinueWith(task => Console.WriteLine("Next Action Operated"));
      t2.Start();
      t2.Wait();            
      Console.WriteLine($"Result: {t2.Result}");
    • 在調用ContinueWith時,可傳遞一組TaskContinuationOptions標志

      • namespace System.Threading.Tasks
        {
          [Flags]
           public enum TaskContinuationOptions
          {
               None = 0,
               PreferFairness = 1,
               LongRunning = 2,
               AttachedToParent = 4,
               DenyChildAttach = 8,
               HideScheduler = 16,
               LazyCancellation = 32,
               RunContinuationsAsynchronously = 64,
               NotOnRanToCompletion = 65536,
               NotOnFaulted = 131072,
               OnlyOnCanceled = 196608,
               NotOnCanceled = 262144,
               OnlyOnFaulted = 327680,
               OnlyOnRanToCompletion = 393216,
               ExecuteSynchronously = 524288
          }
        }
  • 啟動子任務

    •     Task<int[]> parent = new Task<int[]>(() =>
        {
             var results = new int[3];
           
             new Task(() => results[0] = Sum(5), TaskCreationOptions.AttachedToParent).Start();
             new Task(() => results[1] = Sum(10), TaskCreationOptions.AttachedToParent).Start();
             new Task(() => results[2] = Sum(15), TaskCreationOptions.AttachedToParent).Start();
           
             return results;
        });
    • AttachedToParent標志將一個Task與創建它的Task關聯,結果是除非所有子任務結束運行,否則父任務不認為已經結束

任務內部揭秘
  • 每個Task對象都有一組字段構成了任務的狀態

    • ID

    • Task執行狀態

    • 對父任務的引用

    • 對Task創建時指定的TaskScheduler的引用

    • 對回調方法的引用

    • 對要傳給回調方法的對象的引用

    • 對ExecutionContext的引用

    • 對ManualResetEventSlim對象的引用

    • 對根據需要創建的補充狀態的引用

      • CancellationToken

      • ContinueWithTask對象集合

      • 為拋出未處理異常的子任務而準備的Task對象集合

  • 創建Task的所需內存遠比ThreadPool.QueueUserWorkItem大

任務工廠
  •     Task t3 = new Task(() =>
      {
           var cts = new CancellationTokenSource();
           var tf = new TaskFactory<int>(
               cts.Token,
               TaskCreationOptions.AttachedToParent,
               TaskContinuationOptions.ExecuteSynchronously,
               TaskScheduler.Default
          );
         
           var childTask = new[]
          {
               tf.StartNew(() => Sum(5)),
               tf.StartNew(() => Sum(10)),
          };
      });
  • 用於創建一組共享相同配置的Task對象

    • CancellationToken

    • TaskScheduler

    • TaskCreationOptions

    • TaskContinuationOptions

  • 子任務通過StartNew來創建

任務調度器
  • 負責執行被調度的任務

  • FCL提供了兩個派生自TaskScheduler的類型

    • 線程池任務調度器(默認)

      • 將任務調度給線程池的工作者線程

    • 同步上下文任務調度器

Task的優勢
  • 使用內存、創建/銷毀的時間比線程少

  • 線程池可用CPU數量自動伸縮任務規模

  • 每個任務完成一個階段,運行任務的線程回到線程池,並等待接收新任務

  • 線程池是站在整個進程的高度觀察任務,所以它能更好地調度任務,減少進程中的線程數和上下文切換

Parallel.For/ForEach/Invoke
  • Parallel.For/ForEach

    • Parallel.For(0, 1000, i => Console.WriteLine(i));
      Parallel.ForEach(new[] { 1, 2, 3 }, item => Console.WriteLine(item));
    • 使用Parallel的靜態遍歷函數,可以多個線程來輔助遍歷提升效率

      • Parallel所有方法都讓調用線程參與處理

      • 使用前提

        • 工作項必須能并行執行

        • 避免會修改任何共享數據的工作項

      • 開銷

        • 委托對象必須分配,針對每個工作項都要調用一次這些委托

          • 如果每一個工作項都涉及大量工作,效率才會有所提升

          • 如果工作項很少,或工作項本身的處理效率很快,使用Parallel反而會降低性能

并行LINQ(PLINQ)
  • Parallel LINQ

    • 一般LINQ稱為「順序查詢」

    • 并行查詢則為PLINQ

  • PLINQ內部使用Task,將集合中的數據項的處理工作分散到多個CPU上

  • 和Parallel的方法類似,如果要同時處理大量項/每項處理過程耗時長,PLINQ能獲得更大收益

  • 實際使用上,是將LINQ的指令通過ParallelEnumberable.AsParallel使調用的命令轉換成并行版本的

    • 可以使用ParallelEnumberable.AsSequential把并行版本轉換成順序版本

    • 并行查詢返回一個ParallelQuery的結果,應使用ParallelEnumerable.ForAll來處理

  • PLINQ用多個線程處理查詢,返回的結果是無序的

    • 使用ParallelEnumerable.AsOrdered,線程會成組處理數據項,然後這些組被合并回去,同時保持順序,但會損害性能

    • 可以調用WithMergeOption,傳遞ParallelMergeOption標志,控制結果的緩沖和合并方式

      • namespace System.Linq
        {
           public enum ParallelMergeOptions
          {
               Default = 0,
               NotBuffered = 1,
               AutoBuffered = 2,
               FullyBuffered = 3
          }
        }
  • PLINQ會分析一個查詢,然後決定是順序處理還是并行處理

    • 可以調用WithExecutionMode,並傳遞ParallelExecutionMode標志來強迫使用并行方式處理查詢

      • namespace System.Linq
        {
           public enum ParallelExecutionMode
          {        
               Default = 0,
               ForceParallelism = 1
          }
        }
System.Threading.Timer定時操作
  • System.Threading.Timer

    • 讓一個線程池線程定時調用一個方法

    • 構造Timer實例等同告訴線程池:在將來某個時間回調你的一個方法

    • 回調方法委托類型:TimerCallback

      • delegate void TimerCallback(Object state);
  • 內部工作

    • 線程池為所有Timer對象只使用了一個線程,該線程知道下一個Timer對象的計時器還有多久觸發

    • Timer對象到期時,線程會被喚醒,調用ThreadPool.QueueUserWorkItem,將一個工作項添加到線程池的隊列中

    • 如果回調方法執行時間很長,可能會造成多個線程池線程同時執行該回調

      • 構造Timer時,應為period參數指定Timeout.Infinite,使計時器只觸發一次

      • 回調方法中調用Timer的對象方法Change,dueTime指定Timeout.Infinite

    • 使用Dispose方法完全取消計時器

      • Timer被GC時,其終結代碼告訴線程池取消計時器,對回調方法的調用也會停止

線程池的線程管理
  • 最好是將線程池看成一個黑盒

  • 線程池永遠都不應該設置線程數上限,因為可能發生飢餓或死鎖

  • CLR團隊一直增加線程池默認擁有的最大線程數

    • 32位進程最大有2GB地址空間

      • 加載了Win32、CLR DLLs,分配了本地堆和托管堆、用戶模式棧、線程環境塊後,大約能提供1360個線程

    • 64位進程最大有8TB地址空間

  • 具體的工作者線程管理

    • ThreadPool.QueueUserWorkItem和Timer類

      • 總是將工作項放到全局隊列

      • 工作者線程採用FIFO將工作項從隊列中取出

      • 所有工作者線程會競爭一個線程同步鎖,保証同一個工作項不會被多個線程獲取

        • 可能成為瓶頸

    • TaskScheduler對Task的調度

      • 非工作者線程調度Task時,該Task被添加至全局隊列

      • 每個工作者線程有自己的本地隊列,工作者線程調度一個Task時,Task加到本地隊列

        • 使用LIFO將任務從本地隊列取出

        • 只有自己訪問自己,無需同步鎖

        • 當發現本地隊列為空,會嘗試從其他工作者線程中的本地隊列尾部「偷」Task

          • 此時需要獲取一個同步鎖

      • 當所有本地隊列都為空,使用FIFO從全局隊列提取工作項

      • 當全局隊列為空,進入睡眠,一定時間後還沒被喚醒剛自己醒來並銷毀自身

參考書目

  • 《CLR via C#》(第4版) Jeffrey Richter