System.Threading.Channels
Channels は一定量のデータを格納するコンテナとして使用でき、入れた内容は取り出すまで保存されます。Channels
キューといえば、複数スレッドから安全にデータを出し入れできる ConcurrentQueue
補足ですが、このクラスは、デザインパターンのプロデューサー・コンシューマーパターンを実現するための実装です。データを生成・追加するプロデューサー、データを取り出して処理するコンシューマーがそれぞれ別のスレッドで動作し追加・取り出しはデータをスレッドセーフで行うというパターンを実装します。
確認環境
- VisualStudio2022
- .NET 8(C#12)
- Windows11
コンソールアプリで動作を確認しています。
概要
Channels の動作イメージは以下の図の通りです。
読み書きは Channel クラスのメソッドに直接実装されていません。以下のように書き込みは Writer、読み取りは Reader というプロパティで取得できるクラスを経由して使用します。
// (1) インスタンスの作成 Channel<string> ch = new Channel.CreateUnbounded<string>(); // (2) データを書き込むときに使用するプロパティ Channel<string>.Writer // (3) データを読み取るときに使用するプロパティ Channel<string>.Reader
使用例
使い方は以下の通りです。
チャンネルの作成は以下のメソッド経由で行います。コンストラクターは直接呼び出さず、ファクトリーメソッドを使用します
- 最大容量あり= Channel.CreateBoundedメソッドで作成
- 最大容量なし= Channel.CreateUnboundedメソッドで作成
また、作成時にオプションクラス(BoundedChannelOptions / UnboundedChannelOptions)が指定でき、キューが一杯になったときにどのように動作するかが指定できます。
internal class Program { static async Task Main(string[] _) { BoundedChannelOptions options = new(5) // ← キューできる最大容量 { // 単一スレッドからの書き込みか? // true : 単一書き込み / false: 複数書き込み(規定値) // ** trueの場合、内部ロックが省略され性能が向上するが // 複数スレッドから書き込むとデータ不整合が出る可能性がある SingleWriter = false, // 単一スレッドからの読み込みかどうか // true: 単一読み取り / false: 複数読み取り(規定値) SingleReader = false, // await後の処理が同期的に実行されることを許可するか? // true: 許可する / false: しない(規定値) // ** 基本false AllowSynchronousContinuations = false, // キューに空きが無い場合にデータを追加しようとしたときの挙動の指定 // Wait: キューが空くまで書き込みを待機(規定値) // DropNewest: 一番最後に追加したデータを削除してから追加 // DropOldest: 一番先頭(古い)データを削除してから追加 // DropWrite: 書き込むデータを破棄(キューされない) // ** データが消えるのは困るが書き込み時にサブスクライバーがいない場合にWaitだと // そのままハングアップしたように見える可能性がある FullMode = BoundedChannelFullMode.Wait, }; // 容量制限のあるチャンネルを作成 Channel<string> channel = Channel.CreateBounded<string>(options); // 容量無限のチャンネルを作成 Channel<string> channel = Channel.CreateUnbounded<string>(); WriteChannelAsync(channel); // 書き込みを非同期で開始 await Task.Delay(2000); ReadChannelAsync(channel); // 一定時間待機後に読み取りを非同期で開始 Console.ReadLine(); // これ以降書き込みが無いことを通知 // → Readをawaitしている処理が終了する channel.Writer.Complete(); Console.ReadLine(); } // Channelにデータを追加する static async Task WriteChannelAsync(Channel<string> channel CancellationToken token = default) { Console.WriteLine("[Start] WriteChannelAsync"); // 100ミリ秒ごとにデータを連続して書き込む for (int i = 0; i < 20; i++) { string value = $"Item ID={i:D3}"; bool mode = true; if (mode) { // (1) 書き込みパターン1 // この書き方だとFullMode.Waitでもキューが一杯ならDrop bool isOk = channel.Writer.TryWrite(value); if (isOk) { Console.WriteLine("Add value. " + value); } else { Console.WriteLine("Drop value." + value); } } else { // (2) 書き込みパターン2 // Waitの場合キューに空きが無いと空くまで待機する // ** この動作は何が良いかは時と場合による await channel.Writer.WriteAsync(value, token); Console.WriteLine("WriteAsync. value=" + value); } await Task.Delay(100, token); } Console.WriteLine("[End] WriteChannelAsync"); } // チャンネルからメッセージを読みだす static async Task ReadChannelAsync(Channel<string> channel CancellationToken token = default) { Console.WriteLine("[Start] ReadChannelAsync"); await foreach (string value in channel.Reader.ReadAllAsync(token)) { Console.WriteLine("Process. value=" + value); await Task.Delay(50, token); } Console.WriteLine("[End] ReadChannelAsync"); } }