VB のたまご

作成日: 2017/10/01, 更新日: 2017/10/01


非同期処理その1(2017年10月版)

  •  .NET Framework 4.5 で Async / Await + Task の組み合わせによる非同期処理ができるようになりました。 Async / Await 構文は、Task 型の非同期処理を簡単に扱えるようにした簡易構文と言えます。 これにより、同期処理と非同期処理の境目が自然なつながりになり、見た目的にも分かりやすいソースコードになります。

  •  ちょっと見てみましょうか。以下は、WPF アプリケーションにおける同期処理と非同期処理の比較です。
  • Imports System.Threading
    
    Private Sub Button_Click(sender As Object, e As RoutedEventArgs)
    
        Thread.Sleep(3000)
        Dim button1 = DirectCast(sender, Button)
        button1.Content = "3 秒後!"
    
    End Sub
    

  •  ボタンを押してから、3秒間待機、その後ボタン名が変わるという処理なのですが、3秒待機の処理により画面が応答なしになってしまいます。 これは、お客様が見たら、あれ、バグ?壊れた?と思ってしまうかもしれません。ほんのちょっとの待たされ時間が、お客様を苛立ちと不安にさせてしまうのです。

  •  あれ?反応が無い。(マウスクリックの連打)カチッカチッカチッカチッカチッカチッカチッカチッ・・・と、お客様は画面上の反応だけで(見える部分での評価だけで)判断されます。 よく見るサンプルですが、このように、アプリケーションにとって、ユーザー体験というのは非常に重要な部分と言えます。

  •  このプログラムを非同期処理に書き換えたのが、以下のサンプルです。
  • Imports System.Threading
    
    Private Async Sub Button_Click(sender As Object, e As RoutedEventArgs)
    
        Await Task.Run(Sub() Thread.Sleep(3000)) ' 非同期処理と完了待機
        Dim button1 = DirectCast(sender, Button) ' 以降、同期処理
        button1.Content = "3 秒後!"
    
    End Sub
    

  •  今度は、3秒待機中も画面の応答性があります。【処理中もサクサク動いてくれると使う側は安心】しますね。

  •  お客様がイメージする「応答なし」というのは、画面の裏で全身全霊をかけて処理中ですよ(つまり正常動作ですよ)。と思うわけが無く、 チーン!ナンマイダー・・・と、プログラムが停止した、何かおかしくなった状態を指します。 そして、もう一回ボタンを押せば、今度はうまく動くのではと考えて(どういう根拠?)、高橋名人張りにカチカチ連打して、トドメを刺すわけですね。

  •  まぁそれは置いておいて。このように、Async / Await を付けただけで、元のソースコードとそう大差なくユーザーを安心させることができるプログラムが簡単にできあがります。 一見、Async / Await に注目が集まってしまいますが、真に覚えるべきは非同期処理を提供する Task クラスです。 また、似たような非同期処理に Parallel クラスを使ったものもあります。違いを覚えて、非同期処理をマスターしましょう!

  •  環境
  •  ・Windows 8.1
  •  ・Visual Studio Community 2015 Update3
  •  ・.NET Framework4.5.2

  •  参考
  • Microsoft Reference Source
    ParallelLoopState.cs
    https://referencesource.microsoft.com/#mscorlib/system/threading/Tasks/ParallelLoopState.cs,09b84ab9267d2efa
    
    xin9le.net
    TPL入門
    http://blog.xin9le.net/entry/tpl-intro
    


コレクションデータの並列化(Parallel)

  •  以降の処理で登場する Thread.Sleep メソッドは、System.Threading 名前空間を使いますので、あらかじめインポートしておきます。 都度、各メソッドには書きませんので注意してください。
  • Imports System.Threading
    

  •  コレクションデータを非同期で処理したい場合は、Parallel クラスの For メソッド、ForEach メソッドを使います。
  • ' Parallel.For
    Public Sub Test1()
    
        Dim items = Enumerable.Range(0, 10).Select(Function(x) CStr(x) & "個")
    
        ' 同期処理(1人だけで、1つずつ順番にデータを扱う → 後ろで待っているデータに待ち時間ができる)
        For i As Integer = 0 To items.Count() - 1
            Console.Write($"{i} = {items(i)}, ")
        Next
    
        ' 非同期処理(複数人で手分けして、データを同時に扱う → 後ろで待っているデータの待ち時間が少ない)
        ' (別々に動く各処理が全て完了するまで、For メソッド処理が待機されて、それ以降の処理には進まない)
        Parallel.For(0, items.Count(), Sub(i)
                                           Console.Write($"{i} = {items(i)}, ")
                                       End Sub)
    
        ' 0 = 0個, 1 = 1個, 2 = 2個, 3 = 3個, 4 = 4個, 5 = 5個, 6 = 6個, 7 = 7個, 8 = 8個, 9 = 9個, 
        ' 0 = 0個, 1 = 1個, 3 = 3個, 2 = 2個, 6 = 6個, 5 = 5個, 8 = 8個, 7 = 7個, 9 = 9個, 4 = 4個,
    
    End Sub
    

  •  For ループを非同期で実行したい場合は、Parallel.For メソッドを使います。 渡す引数は、開始インデックス、終了インデックス(未満として扱われるので、ー1を明示的に記載するのは不要)、ループ内で実行する処理内容、を渡します。 ループするのに必要な物を渡しただけという、まさにそのままですね。

  •  非同期処理で大事な事は、どのように動くかをイメージできることです。 第三引数で渡した(ジェネリックデリゲートに包まれた)ラムダ式は、ソースコードを見ると1つのメソッドにしか見えませんが、実行時は、いろんな人がこのメソッドを同時に呼び出すことになります。 同時に動くということは、他の人・相手を気遣いながら実装していかなければいけないということです。

  •  これは、SQL Server への同時データ登録(トランザクションしっかりね)や、 タイマーイベントを扱う時の、イベント処理中に次のイベントが発生して入ってきた時の考慮(処理中は、処理が重ならないように冒頭でフラグ制御するとかね) と似ています。

  •  自分の事だけ考えればいい同期処理と違い、チームプレーとして動く非同期処理は、この点が扱いづらさを引き起こしています。 だから、処理結果がバラバラに出力されているんですね。と言ってもそんなに難しい話ではなく、慣れちゃえば簡単ですよ。

  •  続いては、ForEach バージョンです。
  • ' Parallel.ForEach
    Public Sub Test2()
    
        Dim items = Enumerable.Range(0, 10).Select(Function(x) CStr(x) & "個")
    
        ' 同期処理(1人だけで、1つずつ順番にデータを扱う → 後ろで待っているデータに待ち時間ができる)
        For Each item In items
            Console.Write($"{item}, ")
        Next
    
        ' 非同期処理(複数人で手分けして、データを同時に扱う → 後ろで待っているデータの待ち時間が少ない)
        ' (別々に動く各処理が全て完了するまで、ForEach メソッド処理が待機されて、それ以降の処理には進まない)
        Parallel.ForEach(items, Sub(item)
                                    Console.Write($"{item}, ")
                                End Sub)
    
        ' 0個, 1個, 2個, 3個, 4個, 5個, 6個, 7個, 8個, 9個, 
        ' 0個, 1個, 5個, 2個, 9個, 3個, 6個, 8個, 7個, 4個,
    
    End Sub
    

  •  こちらもそのままですね。コレクションデータと、ループ内で実行する処理を渡して使います。

  •  続いては、コレクションデータの合計を計算してみます。
  • ' Parallel.For / コレクションデータの合計
    Public Sub Test3()
    
        Dim sw = New Stopwatch
        Dim items = Enumerable.Range(0, 10000)
        Dim total = 0
    
        ' 同期処理
        sw.Start()
        total = 0
        For i As Integer = 0 To items.Count() - 1
            total += items(i)
        Next
        sw.Stop()
        Console.WriteLine($"{total} - {sw.Elapsed.TotalSeconds} 秒")
    
    
    
        ' 非同期処理
        ' (複数の処理が、1つの変数に同時アクセスしても壊れないようにロックを書けて、値更新)
        sw.Restart()
        total = 0
        Parallel.For(0, items.Count(), Sub(i)
                                           'total += items(i) ' total は同時書き込みが保証されていない
                                           Interlocked.Add(total, items(i))
                                       End Sub)
        sw.Stop()
        Console.WriteLine($"{total} - {sw.Elapsed.TotalSeconds} 秒")
    
    
    
        ' 非同期処理
        ' (スレッドローカル変数を利用した、処理速度向上のための奥義。うまく説明できずすみません)
        ' 引数
        '  開始インデックス
        '  終了インデックス(未満)
        '  各タスクのローカル データの初期状態を返す関数デリゲート
        '  1 回の反復処理につき 1 回呼び出されるデリゲート
        '  各タスクのローカル状態に対して最後の操作を行うデリゲート
    
        ' 意味不明だと思いますので、以下複数のコメントアウトを外して、どう動くのかを見てください。
        'items = Enumerable.Range(0, 10)
        sw.Restart()
        total = 0
        Parallel.For(0,
                     items.Count(),
                     Function()
                         'Console.WriteLine("localInit()")
                         Return 0
                     End Function,
                     Function(i, state, local)
                         'Console.WriteLine("body()")
                         local += items(i)
                         Return local
                     End Function,
                     Sub(local)
                         'Console.WriteLine("localFinally()")
                         Interlocked.Add(total, local)
                     End Sub)
    
        sw.Stop()
        Console.WriteLine($"{total} - {sw.Elapsed.TotalSeconds} 秒")
    
        ' 1000 個の場合
        ' 499500 - 0.0142173 秒
        ' 499500 - 0.0845307 秒
        ' 499500 - 0.0251626 秒
    
        ' 10000 個の場合(データ量が多いほど、非同期が強くなる。非同期処理全般に言える事)
        ' 49995000 - 0.6904793 秒
        ' 49995000 - 0.2388322 秒
        ' 49995000 - 0.1745578 秒
    
    End Sub
    

  •  同期処理は簡単にできますが、非同期処理はちょっとひと手間必要になります。 今回は1つの変数 total に対して、複数人が(ほぼ)同時に値を足そうとします。非同期処理の場合、これでは正しい答えにはなりません。 このことは、SQL Server にある DB のあるテーブルのある行へ、複数のアプリケーションが同時に更新しようとする場面に似ています。これのメモリ上の値版です。

  •  このような場合は、Interlocked クラスの適切なメソッドを利用して、トランザクション的な役割をしてくれる(同時アクセス時に調整してくれる)機能を経由して、足し算をします。 非同期処理を SQL Server に重ねて見ると、どのように動くものなのかが分かりやすいのではないでしょうか。

  •  2つ目の Parallel.For メソッドに関しては、私の理解不足によりうまく説明できないのですが(すみません)、 スレッドローカルという各スレッド内にローカル変数を準備したものを特殊な空間上で扱うことで、同期的に扱うためのオーバーヘッドを軽減することができ、 これが速度向上につながるという仕組みらしいです。

  •  それぞれの引数は左から順に、開始インデックス、終了インデックス(未満扱い)、スレッドローカル変数の初期状態、ループ内で実行する処理、 スレッドローカル変数の最後に実行する処理、になります。スレッドとかスレッドローカルとか、言葉が難しい上に、アーキテクチャの全体イメージ、全体の動きが、 全然見えてこないと思います。この部分は理解でき次第書き換える予定です。

  •  次は、ForEach ループバージョンです。
  • ' Parallel.ForEach
    ' 計測は省略
    Public Sub Test4()
    
        Dim items = Enumerable.Range(0, 10000)
        Dim total = 0
    
        ' 同期処理
        total = 0
        For Each item In items
            total += item
        Next
        Console.WriteLine(total)
    
    
    
        ' 非同期処理(簡略、遅い版)
        total = 0
        Parallel.ForEach(items, Sub(item)
                                    Interlocked.Add(total, item)
                                End Sub)
        Console.WriteLine(total)
    
    
    
        ' 非同期処理(複雑、速い版)
        total = 0
        Parallel.ForEach(items,
                         Function()
                             Return 0
                         End Function,
                         Function(item, state, local)
                             local += item
                             Return local
                         End Function,
                         Sub(local)
                             Interlocked.Add(total, local)
                         End Sub)
        Console.WriteLine(total)
    
    End Sub
    

  •  続いては、非同期処理中の途中終了を見ていきます。途中終了には種類があり、キャンセル(ユーザー側の意思による途中終了)、 中断・停止(プログラム側の都合による途中終了)があります。

  •  最初は、キャンセル処理からです。
  • ' 以降は簡略のため、For のみ
    
    ' 途中でキャンセル(キャンセルのタイミングは、ユーザーが決める)
    Public Sub Test5()
    
        Dim source = New CancellationTokenSource
        Dim options = New ParallelOptions
        options.CancellationToken = source.Token
    
        Try
    
            ' 3秒後にユーザーがキャンセルしたという設定
            Task.Run(Sub()
                         Thread.Sleep(3000)
                         source.Cancel()
                     End Sub)
    
            ' 1つのデータを1秒かけて処理x100個
            Parallel.For(0,
                         100,
                         options,
                         Sub(i, state)
    
                             options.CancellationToken.ThrowIfCancellationRequested()
                             Console.Write($"{i}, ")
                             Thread.Sleep(1000)
    
                         End Sub)
    
        Catch ex As OperationCanceledException
            ' キャンセルした
            Console.WriteLine("OperationCanceledException: キャンセル")
    
        Catch ex As AggregateException
            ' その他
            Console.WriteLine("AggregateException: その他")
    
        End Try
    
        ' キャンセルしたという通知として例外エラーを投げるというやり方
        ' ただし、Catch 内では通常の例外エラー対処をするのではなく、正常処理の一部としてキャンセル時の任意処理を行うべき
        ' 0, 12, 24, 36, 48, ~, OperationCanceledException: キャンセル
    
    End Sub
    

  •  キャンセル処理は、CancellationTokenSource クラスと ParallelOptions クラスの組み合わせでおこないます。 CancellationTokenSource クラスは、ユーザーのキャンセル指示を受ける仕事を、ParallelOptions クラスは非同期処理に、キャンセルを伝える仕事を担当します。 そのため、ParallelOptions クラスのインスタンスを非同期処理に渡して使います。2つのクラスは Token プロパティを共有することで共同動作することができます。

  •  このサンプルでは、非同期処理が始まってから3秒後にキャンセル要求を出して途中終了しています。 キャンセルするという表現を、例外エラーを投げるという動作としておこないます。一般的にキャンセル処理は、異常処理ではなく通常処理です。 Catch 内で補足してキャンセルされた事を識別しますが、この中ではただの例外エラー対処をするのではなく、正常動作としてキャンセル時の処理を書いてください。 キャンセルはエラーではありません。

  •  続いて、中断と停止です。
  • ' 途中で中断、停止(プログラム処理内で決める)
    Public Sub Test6()
    
        Dim ret = Parallel.For(0,
                               10000,
                               Sub(i, state)
    
                                   If i = 12 Then
                                       Console.Write("12が出た中断!, ")
                                       state.Break()
                                   End If
    
                                   'If i = 12 Then
                                   '    Console.Write("12が出た停止!, ")
                                   '    state.Stop()
                                   'End If
    
                                   ' Stop() の場合、直接呼んだ処理はもちろん、他で動作中の処理からも停止確認を検知できる
                                   ' プログラマーによる自主的な停止向け処理が可能
                                   If state.IsStopped Then
    
                                   End If
    
                                   Console.Write($"{i}, ")
    
                               End Sub)
    
        If ret.IsCompleted Then
            ' 完了した
            Console.WriteLine("完了")
    
        ElseIf ret.LowestBreakIteration.HasValue Then
            ' 中断した
            Console.WriteLine("中断")
    
        Else
            ' 停止した
            Console.WriteLine("停止")
    
        End If
    
        ' Break() は、現在動作中の他の処理はそのまま実行させて、まだ開始していない他の処理は実行させない
        ' さらには、Break() を実行したインデックスよりも小さいインデックスを全て実行させてから停止する
        ' 例えば、インデックスが12の場合、0-12 までのインデックスを実行してから停止する
        ' →どういう想定での動作なのか不明(うまく説明できずすみません)
        ' 0, 1, 2, 3, 4, 5, 6, 36, 24, 12が出た中断!, 7, 8, 9, 10, 11, 12, 37, 25, 48, 中断
    
        ' Stop() は、現在動作中の他の処理はそのまま実行させて、まだ開始していない他の処理は実行させない
        ' 0, 1, 24, 25, 26, 27, 28, 29, 12が出た停止!, 2, 30, 12, 36, 48, 停止
    
    End Sub
    

  •  中断や停止を行いたい場合は、ループの処理内容となるラムダ式に、ParallelLoopState クラスを指定します。 このクラスを使うことで、同時に動く他の複数の処理と対話できるようになります。

  •  中断と停止は、動作結果がよく似ていて区別がつきにくいので混乱しやすいです。 非同期処理は自分だけではなく複数人が同時に動いているので、あるタイミングで途中終了した場合は、周りで動いている実行中の処理は止められない場合があります。 ただし、まだ実行していない開始待ちの処理は動かさないように指示することができます。これは、いくら同時に複数の処理を動かせると言っても、現実的には、 CPU のコアやスレッド数分しか同時に動かせないからです。このような途中終了の仕方をするのが停止(Stop メソッド)です。

  •  対して、中断(Break メソッド)は、停止時の動作に加えて、中断を実行したインデックスよりも小さいインデックスへの処理を、(例えまだ未実施であっても)完了するのを待機してから途中終了します。 このサンプルでは、12 というインデックスが来たら Break() を呼び出していますが、12よりも小さいインデックス(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11)を全て実行させてから停止しています。 これについても、どういう場面で使うことがあるのか理解できずうまく説明できません、すみません。

  •  また、停止だけにある機能に、ParallelLoopState クラスの Isstopped プロパティを参照することができます。 自動停止を待つだけではなく、ユーザーコード側でも、停止命令が他の処理から出されたら感知して、自分のところの処理で停止する時の専用処理を組むことができます。

  •  続いては、非同期処理中に発生した例外エラーの補足方法です。
  • ' 並列処理中の例外エラーを捕捉する
    Public Sub Test7()
    
        Try
    
            Parallel.For(0, 10, Sub(i)
    
                                    If i Mod 3 = 0 Then
                                        Throw New ArgumentNullException("aaa")
                                    ElseIf i Mod 2 = 0 Then
                                        Throw New IndexOutOfRangeException("bbb")
                                    Else
                                        Throw New InvalidOperationException("ccc")
                                    End If
    
                                End Sub)
    
        Catch ex As AggregateException
            ' 各処理から発生した例外エラーを全て補足するため、
            ' 各例外エラーをまとめて AggregateException に含めて投げられる
            ' よって、詳しく見る場合は、例外エラーのコレクションを取り出して見る
            For Each inner In ex.InnerExceptions
                Console.WriteLine($"{inner.Message}: {inner.GetType()}")
            Next
    
        End Try
    
        ' ccc: System.InvalidOperationException
        ' 値を Null にすることはできません。
        ' パラメーター名:aaa: System.ArgumentNullException
    
    End Sub
    

  •  非同期処理は複数同時に実行します。そのため、発生する例外エラーはタイミングによっては1つだけではなく、複数の例外エラーが発生することもありえます。 そのため、それぞれの例外エラーをコレクションデータとしてまとめておき、AggregateException という1つの例外エラーとしてスローする仕組みになっています。

  •  最後を締めくくるのは、複数メソッドの並列実行です。Parallel.Invoke メソッドにジェネリックデリゲートを渡して実行させます。 ただし、戻り値無し・引数無しのメソッドが対象という制限があります。以下のサンプルでは、引数を受け取るメソッドは、ラムダ式で包んで渡すことで実行させています。
  • ' メソッド実行の並列化
    Public Sub Test8()
    
        Dim method = Sub()
                         Thread.Sleep(2000)
                         Console.WriteLine("Completed!")
                     End Sub
    
        ' 引数無しで戻り値無しのメソッドを並列実行できる
        ' 引数があるメソッドを実行したい場合、引数無しで戻り値無しのラムダ式に包んで渡すことで実行可能
        Parallel.Invoke(method, method, AddressOf Test8Core, method, Sub() Test8Main("aaa"))
        Console.WriteLine("OK")
    
        ' Completed!
        ' Completed!
        ' Completed!
        ' Asyncronous!
        ' Parallel!
        ' OK
    
    End Sub
    
    Private Sub Test8Core()
        Thread.Sleep(3000)
        Console.WriteLine("Parallel!")
    End Sub
    
    Private Sub Test8Main(s As String)
        Thread.Sleep(2500)
        Console.WriteLine("Asyncronous!")
    
    End Sub
    

  •  Invoke() も For() や ForEach() 同様、全ての処理が完了するまで待機するので、それまでは以降の処理には進みません。 つまり、渡したジェネリックデリゲートのうち、一番遅いメソッドの処理時間分の待機と考えることができます。

  •  非同期処理その2(2017年10月版)に続きます。