【C#】System.Threading.Channels<T>の使い方

System.Threading.Channels の使い方を紹介したいと思います。

Channels は一定量のデータを格納するコンテナとして使用でき、入れた内容は取り出すまで保存されます。Channels は非同期操作に対応している FIFO(キュー)構造でスレッドセーフを保証しつつ、入れた順にデータを取り出すことができます。

キューといえば、複数スレッドから安全にデータを出し入れできる ConcurrentQueue クラスもありますが、こちらは非同期操作をサポートしていないので、async/await を使ってデータの出し入れをしたい場合 Queue/ConcurrentQueue ではなく Channels を使用します。

補足ですが、このクラスは、デザインパターンのプロデューサー・コンシューマーパターンを実現するための実装です。データを生成・追加するプロデューサー、データを取り出して処理するコンシューマーがそれぞれ別のスレッドで動作し追加・取り出しはデータをスレッドセーフで行うというパターンを実装します。

確認環境

  • VisualStudio2022
  • .NET 8(C#12)
  • Windows11

コンソールアプリで動作を確認しています。

概要

Channels の動作イメージは以下の図の通りです。

読み書きは Channel クラスのメソッドに直接実装されていません。以下のように書き込みは Writer、読み取りは Reader というプロパティで取得できるクラスを経由して使用します。

// (1) インスタンスの作成
Channel<string> ch = new Channel.CreateUnbounded<string>();

// (2) データを書き込むときに使用するプロパティ
Channel<string>.Writer

// (3) データを読み取るときに使用するプロパティ
Channel<string>.Reader

使用例

使い方は以下の通りです。

チャンネルの作成は以下のメソッド経由で行います。コンストラクターは直接呼び出さず、ファクトリーメソッドを使用します

  • 最大容量あり= Channel.CreateBoundedメソッドで作成
  • 最大容量なし= Channel.CreateUnboundedメソッドで作成

また、作成時にオプションクラス(BoundedChannelOptions / UnboundedChannelOptions)が指定でき、キューが一杯になったときにどのように動作するかが指定できます。

internal class Program
{
    static async Task Main(string[] _)
    {
        BoundedChannelOptions options = new(5) // ← キューできる最大容量
        {
            // 単一スレッドからの書き込みか?
            // true : 単一書き込み / false: 複数書き込み(規定値)
            // ** trueの場合、内部ロックが省略され性能が向上するが
            //    複数スレッドから書き込むとデータ不整合が出る可能性がある
            SingleWriter = false,

            // 単一スレッドからの読み込みかどうか
            // true: 単一読み取り / false: 複数読み取り(規定値)
            SingleReader = false,

            // await後の処理が同期的に実行されることを許可するか?
            // true: 許可する / false: しない(規定値)
            // ** 基本false
            AllowSynchronousContinuations = false,

            // キューに空きが無い場合にデータを追加しようとしたときの挙動の指定
            // Wait: キューが空くまで書き込みを待機(規定値)
            // DropNewest: 一番最後に追加したデータを削除してから追加
            // DropOldest: 一番先頭(古い)データを削除してから追加
            // DropWrite: 書き込むデータを破棄(キューされない)
            // ** データが消えるのは困るが書き込み時にサブスクライバーがいない場合にWaitだと
            //    そのままハングアップしたように見える可能性がある
            FullMode = BoundedChannelFullMode.Wait,
        };

        // 容量制限のあるチャンネルを作成
        Channel<string> channel = Channel.CreateBounded<string>(options);

        // 容量無限のチャンネルを作成
        Channel<string> channel = Channel.CreateUnbounded<string>();

        WriteChannelAsync(channel); // 書き込みを非同期で開始

        await Task.Delay(2000);

        ReadChannelAsync(channel); // 一定時間待機後に読み取りを非同期で開始

        Console.ReadLine();

        // これ以降書き込みが無いことを通知
        // → Readをawaitしている処理が終了する
        channel.Writer.Complete();
        
        Console.ReadLine();
    }

    // Channelにデータを追加する
    static async Task WriteChannelAsync(Channel<string> channel
                                        CancellationToken token = default)
    {
        Console.WriteLine("[Start] WriteChannelAsync");

        // 100ミリ秒ごとにデータを連続して書き込む
        for (int i = 0; i < 20; i++)
        {
            string value = $"Item ID={i:D3}";
            
            bool mode = true;
            if (mode)
            {
                // (1) 書き込みパターン1
                // この書き方だとFullMode.Waitでもキューが一杯ならDrop
                bool isOk = channel.Writer.TryWrite(value);
                if (isOk)
                {
                    Console.WriteLine("Add value. " + value);
                }
                else
                {
                    Console.WriteLine("Drop value." + value);
                }
            }
            else
            {
                // (2) 書き込みパターン2
                // Waitの場合キューに空きが無いと空くまで待機する
                // ** この動作は何が良いかは時と場合による
                await channel.Writer.WriteAsync(value, token);
                Console.WriteLine("WriteAsync. value=" + value);
            }
            await Task.Delay(100, token);
        }

        Console.WriteLine("[End] WriteChannelAsync");
    }

    // チャンネルからメッセージを読みだす
    static async Task ReadChannelAsync(Channel<string> channel
                                       CancellationToken token = default)
    {
        Console.WriteLine("[Start] ReadChannelAsync");

        await foreach (string value in channel.Reader.ReadAllAsync(token))
        {
            Console.WriteLine("Process. value=" + value);
            await Task.Delay(50, token);
        }

        Console.WriteLine("[End] ReadChannelAsync");
    }
}