C#筆記 – 線程及異步(三)異步函數

異步編程
  • 基礎

    • 異步執行充斥了後續操作。在開始做一件事情時,要告知其操作完成後應進行哪些操作

    • 在.NET中,後續操作由委托加以表示,通常為接收異步操作結果的action

      • 因此異步方法需要包裝多個事件,表示在成功或失敗的情況下應執行哪段代碼

    • C#編譯器會對所有await都構建一個後續操作

  • 基於任務的異步模式

    • 不會將後續操作傳遞給異步操作,而是在異步操作開始時返回一個token

    • 這個token用於提供後續操作

      • 它表示正在進行的操作,可能已經完成,也可能正在處理

      • 想法是:在這個操作完成之前,不能進行下一步處理

      • token的形式通常為Task或Task<TResult>

    • C#5中的異步方法執行流

      • 執行某些操作

      • 開始異步操作,記住返回的token

      • 可能會執行其他操作(一般不會)

      • 等待異步操作完成(通過token)

      • 執行其他操作

      • 完成

  • 異步模型

C#異步函數
  • 用async修飾符聲明,包含await表達式的方法/匿名函數
    • 編譯器將方法代碼轉換成實現了「狀態機」的一個類型
    • await表達式
static async Task<string> TestReqAsync(string msg)
{
   using (var pipe = new MemoryStream())
  {
       byte[] req = Encoding.UTF8.GetBytes(msg);
       await pipe.WriteAsync(req, 0, req.Length); //Call ContinueWith On Task, Return From TestReqAsync

byte[] rsp = new byte[1000];

       int bytesRead = await pipe.ReadAsync(rsp, 0, rsp.Length); //Call ContinueWith On Task, Return From TestReqAsync
       return Encoding.UTF8.GetString(rsp);
  }
}
      • 這令該方法像迭代器一樣,不需要一直執行到結束才返回

      • 當線程調用TestReqAsync時,會構造出一個MemoryStream,並開始執行using塊裡面的代碼

      • 執行至WriteAsync時,WriteAsync會在內部分配一個Task對象

        • 這個Task對象會返回給TestReqAsync

        • C#的await操作符實際上會在Task對象上調用ContinueWith,向它傳遞用於恢復狀態機的方法

        • 然後線程從TestReqAsync返回

      • 當WriteAsync任務在將來某個時候完成時,一個線程池線程會通知Task對象,激活ContinueWith回調方法,恢復狀態機

        • 線程重新進入TestReqAsync,從await位置開始繼續執行後續代碼

      • 執行至ReadAsync時,同樣會在內部分配一個Task<int>對象,其餘操作與WriteAsync類似

      • 狀態機執行完畢後,GC進行回收

    • 實際上,一旦把方法標記為async,編譯器自動生成代碼,在狀態機開始執行時創建一個Task對象

      • IL_000e:  call       valuetype [System.Threading.Tasks]System.Runtime.CompilerServices.AsyncTaskMethodBuilder`1<!0> valuetype [System.Threading.Tasks]System.Runtime.CompilerServices.AsyncTaskMethodBuilder`1<string>::Create()
        IL_0013: stfld     valuetype [System.Threading.Tasks]System.Runtime.CompilerServices.AsyncTaskMethodBuilder`1<string> CLR_Ch28.Program/'<TestReqAsync>d__1'::'<>t__builder'
      • 這個對象在狀態機執行完畢時自動完成

      • TestReqAsync返回的類型為Task<string>,該Task<string>正是由編譯器生成的代碼為這個方法的調用者而創建的對象

Await表達式
  • 負責消費異步操作
  • 如果表達式等待的值還不可用,異步函數立即返回
  • 當值可用時,異步函數將回到離開的地方繼續執行(在適當的線程上)
  • await的任務類型是Task<類型參數>;整段await表達式類型就是該類型參數

    • await表達式執行的是「拆包」(unwrap)操作

    • await的主要目的是在等待耗時操作完成時避免阻塞

  • 方法在執行到await表達式時就返回了,在此之前都是同步執行的

    • 到達await後,代碼將檢查其結果是否存在

      • 不存在,會安排一個操作完成後的後續操作

        • 後續操作:在異步操作中完成時執行的回調程序。保留了方法的控制狀態。後續操作記住了它的位置,在執行時可回到原處。

        • Task類包含一個專門用於添加後續操作的方法(Task.ContinueWith)

    • 異步函數的返回類型只能為:

      • void

      • Task<TResult>

      • Task

    • Task表示一個可能還未完成的操作

      • Task<TResult>表示一個返回值為T類型的操作,而Task不需要產生返回值

      • 即使沒有返回值,Task本身可以使調用者在返回的任務上,附加後續操作

      • void是為了和事件處理程序兼容,只調用給定的事件處理程序,沒有必要知道事件甚麼時候真正處理完畢

    • 對於一個異步方法,只有作為事件訂閱者時才應該返回void,在其他不需要特定返回值的情況下,最好是返回Task。調用者就可以「等待操作完成」。

  • 可等待模式

    • 一個await操作包含了三種操作

      • 告知是否已經完成

      • 後續操作的添加

      • 獲取結果,返回值或執行成功/失敗的信息

    • 對於一個await表達式,編譯器生成的代碼會先用GetAwaiter(),然後使用awaiter的成員來等待結果

      • C#編譯器要求awaiter必須實現INotifyCompletion接口

        • //位於System.Runtime.CompilerServices命名空間
          public interface INotifyCompletion
          {
             void OnCompleted(Action contiuation);
          }
      • IsCompleted和GetResult必須是GetAwaiter()方法返回類型上的真正成員,能由包含await表達式的代碼訪問

      • 如果GetResult()返回void,整個await表達式就沒有類型

        • 否則await表達式的類型與GetResult()的返回類型相同

        • using(var client = new HttpClient())
          {
             Task<string> task = client.GetStringAsync(...);
             string result = await task;
          }
        • 如果await表達式不返回任何類型的值,那就不能將其分配給變量、作為方法實參、執行任何將表達式作為值的相關操作

Await表達式的流
  • await後面有時是方法調用的結果,有時是屬性

    • string pageText = await new HttpClient().GetStringAsync(url);
    • await只是在操作一個值,該代碼等價於

    • Task<string> task = new HttpClient().GetStringAsync(url);
      string pageText = await task;
  • await表達式的結果(如有)可以用作實參或作為其他表達式的一部分

    • AddPayment(await employee.GetHourlyRateAsync() * await timeSheet.GetHoursWorkedAsync(employee.Id));
  • 執行過程到達await表達式後,存在兩種可能

    • 等待中的異步操作已經完成

    • 等待中的異步操作尚未完成

  • 當異步操作仍在執行時,方法異步地等待操作完成,然後執行適當的上下文

    • 「異步等待」意味著方法不再執行,把其後續的操作附加了在異步操作上,然後返回

    • 然後等待完成後,再被異步操作確保方法能在正確的線程中恢復

    • 從一個異步方法「返回」的意味

      • 如果是第一個await表達式,原始調用者還位於棧中的某個位置(在到達await前,方法都是同步執行的)

        • 在這種情況,會將Task或Task返回給調用者

      • 已經等待了其他操作,因此處於由某個操作調用的「後續操作」中

從異步方法返回
  • public static async Task<int> GetPageLengthAsync()
    {
       using(HttpClient client = new HttpClient())
      {
           Task<string> fetchTextTask = client.GetStringAsync(url);
           int length = (await fetchTextTask).Length;
           return length;
      }
    }
  • 方法中,return的類型是int,而方法簽名中的返回類型為Task

  • 代表生成的代碼為此進行了包裝,調用者將得到一個Task並最終在方法完成時得到其返回值(int length)

  • 包裝的必要性

    • 在到達return語句前,幾乎必然會發生「返回到調用者」的行為(await),我們需要某種方式向調用者傳播信息,而Task就是通知調用者,對該方法未來生成的值或拋出的異常所做出的承諾

  • 如果return語句出現在有finally的try塊中,那用來計算返回值的表達式將立即被求值,但直到所有對象清理完畢後,才會作為任務結果

    • 同理,如果finally拋出一個異常,整個代碼都會失敗

異常
  • Task表示異常的方式

    • 異步操作失敗,任務的Status變為Faulted

    • Exception屬性返回一個AggregateException,包含所有造成任務失敗的異常;沒有錯誤則返回null

    • 如果任務最終狀態為錯誤,Wait()方法拋出一個AggregateException

    • TaskResult屬性拋出AggregateException

    • 通過CancellationTokenSource和CancellationToken來實現取消操作

      • 如果任務取消了,Wait()方法和Result屬性拋出包含OperationCanceledException的AggregateException

    • 在等待任務時,任務出錯/取消都拋出AggregateException中的第一個異常

  • 在等待時拆包異常

    • awaiter的GetResult可以獲取工作中的異常,將異常從異步操作傳遞回方法中

      • 但是單個Task可以表示多個操作,並導致多個失敗

    • async Task<string> FetchFirstSuccessfulAsync(IEnumerable<string> urls)
      {
         foreach(string url in urls)
        {
             try
            {
                 using(var client = new HttpClient())
                {
                     return await client.GetStringAsync(url);
                }
            }
             catch(WebException e) { ... };
             
             throw new WebException("...");
        }
      }
    • 捕獲WebException

      • GetStringAsync只是負責啟動操作,返回一個包含WebException的任務(Task)

      • 如果調用任務的Wait,則拋出一個包含WebException的AggregateException

      • 任務awaiter的GetResult方法拋出WebException,並被以上代碼捕獲

        • 但這會導致信息丟失

        • 如果錯誤的任務中包含多個異常,GetResult只能拋出第一個異常

  • 在拋出異常時進行包裝

    • 異步方法在調用時永遠不會直接拋出異常。

      • 方法返回一個Task/Task,其中拋出的任何異常都將簡單地傳遞給任務

      • 如果調用者直接等待(Wait())任務,則得到一個包含真正異常的AggregateException

      • 如果使用await,異常則會從任務中解包

      • 一般只需捕獲嵌套的異步方法所拋出的異常即可

    • 失效的參數驗證

      • static async Task MainAsync()
        {
           Task<int> task = ComputeLengthAsync(null);
           Console.WriteLine("Fetched the task");
        int length = await task;
           Console.WriteLine($"Length: {length}");
        }

        static async Task<int> ComputeLengthAsync(string text)
        {
           if(text == null) { throw new ArgumentNullException("text"); }
           await Task.Delay(500);
           return text.Length;
        }
      • 該代碼在失敗前會先輸出Fetched the task

        • 事實上,在輸出Fetched the task前,異常就已經拋出了。因為在text == null的驗證語句前不存在await表達式

        • 調用代碼直到等待返回的任務時,才能看到異常

      • 一般來說,應該迫使異常立即拋出,停止後續的操作。迫使異常立即拋出的方法有兩種:

        • 分離參數驗證和實現

          • static async Task<int> ComputeLengthAsync(string text)
            {
               //驗證
               if(text == null) { throw new ArgumentException("text"); }
               return ComputeLengthAsyncImpl(text);
            }
            static async Task<int> ComputeLengthAsyncImpl(string text)
            {
               //異步實現
               await Task.Delay(500);
               return text.Length;
            }
        • 異步匿名函數

          • static Task<int> ComputeLengthAsync(string text)
            {
               if(text == null) { throw new ArgumentException("text"); }
               Func<Task<int>> func = async () =>
              {
                   await Task.Delay(500);
                   return text.Length;
              };
               return func;
            }
          • 將工作包裝到一個異步匿名函數中,調用委托並返回結果

  • 處理取消

    • 取消模型

      • 創建一個CancellationTokenSource

      • 向其請求一個CancellationToken

      • 傳遞給異步操作

    • 取消token的方式

      • 調用ThrowIfCancellationRequested

      • 如果取消了token,並沒有其他操作,則拋出OperationCanceledException

    • 如果異步方法拋出OperationCanceledException,返回的任務最終狀態為Canceled

    • 等待一個取消了的操作,將拋出原始的OperationCanceledException,而從異步方法返回的任務同樣會被取消

      • 如果A操作等待B操作,而B操作取消了,A操作也被認為取消了

異步函數的狀態機
  • 如有以下異步函數:

    • internal sealed class Type1 { }
      internal sealed class Type2 { }

      static async Task<Type1> Method1Async()
      {
         Type1 t = new Type1();
         await new MemoryStream().WriteAsync(null, 0, 0);
         return t;
      }

      static async Task<Type2> Method2Async()
      {
         Type2 t = new Type2();
         await new MemoryStream().WriteAsync(null, 0, 0);
         return t;
      }

      static async Task<string> MyMethodAsync(int arg)
      {
         int local = arg;
         try
        {
             Type1 result1 = await Method1Async();
             for(int i = 0; i < 3; i++)
            {
                 Type2 result2 = await Method2Async();
            }
        }
         catch (Exception)
        {
             Console.WriteLine("Catch");
        }
         finally
        {
             Console.WriteLine("Finally");
        }
         return "Done";
      }
    • 查看IL代碼,可發現其方法所依賴的狀態機結構

  • 在MyMethodAsync類中

.method private hidebysig static class [System.Runtime]System.Threading.Tasks.Task`1<string> 
MyMethodAsync(int32 arg) cil managed
{
.custom instance void [System.Runtime]System.Runtime.CompilerServices.AsyncStateMachineAttribute::.ctor(class [System.Runtime]System.Type) = ( 01 00 24 43 4C 52 5F 43 68 32 38 2E 50 72 6F 67   // ..$CLR_Ch28.Prog
72 61 6D 2B 3C 4D 79 4D 65 74 68 6F 64 41 73 79   // ram+<MyMethodAsy
6E 63 3E 64 5F 5F 35 00 00 )                     // nc>d__5..
.custom instance void [System.Diagnostics.Debug]System.Diagnostics.DebuggerStepThroughAttribute::.ctor() = ( 01 00 00 00 )
// 程式碼大小       56 (0x38)
.maxstack 2
.locals init (class CLR_Ch28.Program/'<MyMethodAsync>d__5' V_0)
IL_0000: newobj     instance void CLR_Ch28.Program/'<MyMethodAsync>d__5'::.ctor()
IL_0005: stloc.0
IL_0006: ldloc.0
IL_0007: ldarg.0
IL_0008: stfld     int32 CLR_Ch28.Program/'<MyMethodAsync>d__5'::arg
IL_000d: ldloc.0
IL_000e: call       valuetype [System.Threading.Tasks]System.Runtime.CompilerServices.AsyncTaskMethodBuilder`1<!0> valuetype [System.Threading.Tasks]System.Runtime.CompilerServices.AsyncTaskMethodBuilder`1<string>::Create()
IL_0013: stfld     valuetype [System.Threading.Tasks]System.Runtime.CompilerServices.AsyncTaskMethodBuilder`1<string> CLR_Ch28.Program/'<MyMethodAsync>d__5'::'<>t__builder'
IL_0018: ldloc.0
IL_0019: ldc.i4.m1
IL_001a: stfld     int32 CLR_Ch28.Program/'<MyMethodAsync>d__5'::'<>1__state'
IL_001f: ldloc.0
IL_0020: ldflda     valuetype [System.Threading.Tasks]System.Runtime.CompilerServices.AsyncTaskMethodBuilder`1<string> CLR_Ch28.Program/'<MyMethodAsync>d__5'::'<>t__builder'
IL_0025: ldloca.s   V_0
IL_0027: call       instance void valuetype [System.Threading.Tasks]System.Runtime.CompilerServices.AsyncTaskMethodBuilder`1<string>::Start<class CLR_Ch28.Program/'<MyMethodAsync>d__5'>(!!0&)
IL_002c: ldloc.0
IL_002d: ldflda     valuetype [System.Threading.Tasks]System.Runtime.CompilerServices.AsyncTaskMethodBuilder`1<string> CLR_Ch28.Program/'<MyMethodAsync>d__5'::'<>t__builder'
IL_0032: call       instance class [System.Runtime]System.Threading.Tasks.Task`1<!0> valuetype [System.Threading.Tasks]System.Runtime.CompilerServices.AsyncTaskMethodBuilder`1<string>::get_Task()
IL_0037: ret
} // end of method Program::MyMethodAsync
  • 使用AysncTaskMethodBuilder構造了builder(狀態機訪問builder來設置Task完成/異常),並將state(狀態機位置)初始化

    • 然後調用Start啟動狀態機

    • 返回狀態機的Task(get_task)

  • MoveNext函數中,包含了整個狀態機的流程

    • 首先是初始化

.locals init (int32 V_0,
string V_1,
valuetype [System.Runtime]System.Runtime.CompilerServices.TaskAwaiter`1<class CLR_Ch28.Program/Type1> V_2,
class CLR_Ch28.Program/'<MyMethodAsync>d__5' V_3,
valuetype [System.Runtime]System.Runtime.CompilerServices.TaskAwaiter`1<class CLR_Ch28.Program/Type2> V_4,
int32 V_5,
bool V_6,
class [System.Runtime]System.Exception V_7)
IL_0000: ldarg.0
IL_0001: ldfld     int32 CLR_Ch28.Program/'<MyMethodAsync>d__5'::'<>1__state' //初始化位置
IL_0006: stloc.0
.try
{
IL_0007: ldloc.0
IL_0008: ldc.i4.1
IL_0009: ble.un.s   IL_000d
IL_000b: br.s       IL_000f
IL_000d: br.s       IL_001c
IL_000f: nop
IL_0010: ldarg.0
IL_0011: ldarg.0
IL_0012: ldfld     int32 CLR_Ch28.Program/'<MyMethodAsync>d__5'::arg //將實參Copy到狀態機字段
IL_0017: stfld     int32 CLR_Ch28.Program/'<MyMethodAsync>d__5'::'<local>5__1'
IL_001c: nop
//...
}
  • 然後狀態機中有著以下字段

  • 值得注意的是,每個await都有一個TaskAwaiter類型的字段

    • 在任何時候,都只有其中一個是重要的,這一個引用了最近執行、以異步方式完成的await

  • 然後這是異步方法的核心流程概要IL

.method private hidebysig newslot virtual final 
      instance void MoveNext() cil managed
{
.override [System.Runtime]System.Runtime.CompilerServices.IAsyncStateMachine::MoveNext
// 程式碼大小       448 (0x1c0)
//...
  .try
  {
    .try
    {
      //調用Method1Async並獲得其Awaiter
      IL_0030: call       class [System.Runtime]System.Threading.Tasks.Task`1<class CLR_Ch28.Program/Type1> CLR_Ch28.Program::Method1Async()
      IL_0035: callvirt   instance valuetype [System.Runtime]System.Runtime.CompilerServices.TaskAwaiter`1<!0> class [System.Runtime]System.Threading.Tasks.Task`1<class CLR_Ch28.Program/Type1>::GetAwaiter()
      //...
      //如果還沒完成,使用異步方式完成
      IL_003d: call       instance bool valuetype [System.Runtime]System.Runtime.CompilerServices.TaskAwaiter`1<class CLR_Ch28.Program/Type1>::get_IsCompleted()
      IL_0042: brtrue.s   IL_0087
      IL_0048: stfld     int32 CLR_Ch28.Program/'<MyMethodAsync>d__5'::'<>1__state'
      //...
      //保存awaiter以便將來返回
      IL_004f: stfld     valuetype [System.Runtime]System.Runtime.CompilerServices.TaskAwaiter`1<class CLR_Ch28.Program/Type1> CLR_Ch28.Program/'<MyMethodAsync>d__5'::'<>u__1'
      //...
      IL_0057: ldflda     valuetype [System.Threading.Tasks]System.Runtime.CompilerServices.AsyncTaskMethodBuilder`1<string> CLR_Ch28.Program/'<MyMethodAsync>d__5'::'<>t__builder'
      //...
      //await操作完成時調用MoveNext
      IL_0060: call       instance void valuetype [System.Threading.Tasks]System.Runtime.CompilerServices.AsyncTaskMethodBuilder`1<string>::AwaitUnsafeOnCompleted<valuetype [System.Runtime]System.Runtime.CompilerServices.TaskAwaiter`1<class CLR_Ch28.Program/Type1>,class CLR_Ch28.Program/'<MyMethodAsync>d__5'>(!!0&,!!1&)
      //...
      //第一個await的結果捕捉
      IL_008a: call       instance !0 valuetype [System.Runtime]System.Runtime.CompilerServices.TaskAwaiter`1<class CLR_Ch28.Program/Type1>::GetResult()
      //...
      //啟動循環
      IL_00ae: br         IL_0148
      //內部邏輯,與第一個await流程類似
      //...
      //結束循環
      IL_00ec: leave     IL_01bf
      //第二個await的結果
      IL_010f: ldloca.s   V_4
      IL_0111: call       instance !0 valuetype [System.Runtime]System.Runtime.CompilerServices.TaskAwaiter`1<class CLR_Ch28.Program/Type2>::GetResult()
      //...
    } // end .try
    catch [System.Runtime]System.Exception
    {
      //...
      IL_0166: ldstr     "Catch"
      //...
      } // end handler
    IL_0174: leave.s   IL_0188
  } // end .try
  finally
  {
    //...
    IL_017b: ldstr     "Finally"
    //...
    IL_0187: endfinally
  } // end handler
  IL_0188: ldstr     "Done"
  //...
} // end .try
catch [System.Runtime]System.Exception
{
//狀態機異常處理部分
  //...
  IL_01a2: call       instance void valuetype [System.Threading.Tasks]System.Runtime.CompilerServices.AsyncTaskMethodBuilder`1<string>::SetException(class [System.Runtime]System.Exception)
  IL_01a7: nop
  IL_01a8: leave.s   IL_01bf
} // end handler
//...
//無異常,返回結果
IL_01b9: call       instance void valuetype [System.Threading.Tasks]System.Runtime.CompilerServices.AsyncTaskMethodBuilder`1<string>::SetResult(!0)
IL_01be: nop
IL_01bf: ret
} // end of method '<MyMethodAsync>d__5'::MoveNext
  • 書中把IL編譯回C#的MoveNext概要

異步函數的限制
  • 不能將Main方法轉變為異步函數

    • 構造器、屬性訪問器、事件訪問器都不能

    • 如果Main方法中使用了await操作符,進程的主線程會在遇到第一個await操作符時立即從Main返回

      • 由於Main返回void,導致調用Main的代碼無法獲得一個可進行監視並等待完成的Task,所以進程直接終止

        • 異步函數不僅可以返回Task,也可以返回void

        • 對於返回void的異步函數,我們沒有辦法知道其狀態機在甚麼時候執行完畢

  • 不能使用任何out/ref參數

    • 原因參考:https://social.msdn.microsoft.com/Forums/en-US/d2f48a52-e35a-4948-844d-828a1a6deb74/why-async-methods-cannot-have-ref-or-out-parameters?forum=async

      • As for why async methods don’t support out-by-reference parameters? (or ref parameters?) That’s a limitation of the CLR. We chose to implement async methods in a similar way to iterator methods — i.e. through the compiler transforming the method into a state-machine-object. The CLR has no safe way to store the address of an “out parameter” or “reference parameter” as a field of an object. The only way to have supported out-by-reference parameters would be if the async feature were done by a low-level CLR rewrite instead of a compiler-rewrite. We examined that approach, and it had a lot going for it, but it would ultimately have been so costly that it’d never have happened.

  • 不能在catch、finally、unsafe、lock塊和非異步匿名函數中使用await操作符

    • catch和finally在C# 6.0時已經可以使用await操作符

  • 不能在await操作得之前獲得一個支持線程所有權或遞歸的鎖(lock或者Monitor.Enter/Exit),並在await操作符之後釋放它

    • await之前的代碼與await之後的代碼不一定由同一個線程執行

  • 在查詢表達式中,await操作符只能在初始from子句的第一個集合表達式/join子句的集合表達式中使用

參考書目

  • 《CLR via C#》(第4版) Jeffrey Richter
  • 《深入理解C#》(第3版) Jon Skeet