【C#】QueueとConcurrentQueueの使い方

C# で キューというデータ構造を扱う Queue<T> クラスと、スレッド排他制御機能付きの ConcurrentQueue<T> クラスの使い方の紹介をしたいと思います。

2つのQueue

まず「Queue」ですが一言で言うと、入れたデータが入れた順番に取り出せる入れ物の事を指します。以下のような1本のパイプのイメージです。

こういったデータ構造を先入れ先出し(First In First Out)を略して FIFO と呼んだりします。C# にはこの FIFO をサポートする Queue<T> というクラスがありその使い方の紹介になります。

また「ConcurrentQueue<T>」はこの Queue が複数のスレッドから同時にアクセスしても安全なスレッドセーフという特徴を持っています。まとめるとこんな感じです。

クラス 説明
Queue<T> 通常の Queue
ConcurrentQueue スレッドセーフな Queue

ちなみに操作方法は「ほぼ」同じです。

尚この記事では System.Linq で定義されている Linq の拡張メソッドについては範囲が広大になるため言及しません。

確認環境

この記事は以下環境で動作を確認しています。

  • VisualSturio2019
  • .NET 5
  • C# 9.0

Queue<T>の使い方

宣言

Queue<T> は完全名が「System.Collections.Generic.Queue<T>」のため最初に以下のように最初に using を宣言します。

// 最初に宣言する
using System.Collections.Generic;

// アセンブリ
// System.Collections.dll
// 
// クラスの宣言
// public class Queue<T> : IEnumerable<T>, IEnumerable, IReadOnlyCollection<T>, ICollection

基本的な操作

生成と値の出し入れは以下の通りです。List<T> と違って FIFO と用途が限定されているため利用できるメソッドが少なめです。

//
// (1)生成
// - - - - - - - - - - - - - - - - - - - -

// Queueのオブジェクトを新規作成
Queue<int> queue = new();

// 初期容量を指定してオブジェクトを新規作成
Queue<int> queue2 = new(256);

//
// (2)値の出し入れ
// - - - - - - - - - - - - - - - - - - - -

// ★★値を入れる

queue.Enqueue(0);
queue.Enqueue(10);
queue.Enqueue(100);
queue.Enqueue(1000);
queue.Enqueue(10000);
// この時点でのqueueの中身=[0, 10, 100, 1000, 10000]

// ★★値を取り出す

int a = queue.Dequeue();
// a=0, queueの中身=[10, 100, 1000, 10000]
int b = queue.Dequeue();
// b=10, queueの中身=[100, 1000, 10000]
int c = queue.Dequeue();
// c=100, queueの中身=[1000, 10000]
int d = queue.Dequeue();
// d=1000, queueの中身=[10000]
int e = queue.Dequeue();
// d=10000, queueの中身=[(カラ)]

int f = queue.Dequeue();
// 空のQueueから更に取り出そうとすると
// System.InvalidOperationException: 'Queue empty.' が発生する

// ★★値の安全な取り出し

// ★Queueが空でも例外を出さずに安全に値を取り出す
if (queue.TryDequeue(out int f2))
{
    // 値が取得できればここに入る
}
else if (queue.Count != 0)
{
    int f3 = queue.Dequeue();  // この処理方法でも同じ結果が得られる
}

//
// (3)その他の操作
// - - - - - - - - - - - - - - - - - - - -

// ★指定した要素が存在するか確認する
bool contains = queue.Contains(100);
// > contains=true
// true: 存在する / false: 存在しない

// ★保持しているすべての要素を消去する
queue.Clear();

// ★内部バッファーを縮小する
// 大量の要素を入れた場合最大要素数分+αのメモリ領域が確保されっぱなしになるのを開放できる
queue.TrimExcess();

特殊な操作

値の出し入れ以外に出来る操作は以下の通りです。

// 準備
Queue<int> queue = new();
queue.Enqueue(0);
queue.Enqueue(10);
queue.Enqueue(100);
queue.Enqueue(1000);
queue.Enqueue(10000);

//
// 特殊な操作
// - - - - - - - - - - - - - - - - - - - -

// ★★中身は減らさず先頭の値を取得する

// ★減らさずに取り出す
int a = queue.Peek();
// a=0, queueの中身=[10, 100, 1000, 10000]
int b = queue.Peek();
// b=0, queueの中身=[10, 100, 1000, 10000]
int c = queue.Peek();
// c=0, queueの中身=[10, 100, 1000, 10000]

// ★Queueが空でも例外を出さずに安全に値を取り出す
if (queue.TryPeek(out int f1))
{
    // 値が取得できればここに入る
}
else if (queue.Count != 0)
{
    int f3 = queue.Peek(); // この処理方法でも同じ結果が得られる
}

// (2)内容物を配列に変換する
int[] queueArray = queue.ToArray();
// queueArray=[10, 100, 1000, 10000]

// (3)Queue内の全要素を列挙する(取り出さない)
foreach (int item in queue)
{
    Console.WriteLine(item); // 10 > 100 > 1000...
}

ConcurrentQueue<T>の使い方

ConcurrentQueue ですが Queue と「ほぼ」同じです。ただ取り出すときの「Dequeue」と「Peek」が存在せず取り出すときは「TryDequeue」と「TryPeek」のみが存在します。これはこの Queue を使用するときは常に自分以外のスレッドから値が取り出されて、直前までは値があったのに自分が取り出すときに存在しないことがあるため「安全に中身を取り出す」ために Try~ 系で取り出すことになります。

宣言

ConcurrentQueue<T> は完全名が「System.Collections.Generic.ConcurrentQueue<T>」のため最初に以下のように最初に using を宣言します。

// 最初に宣言する
using System.Collections.Generic;

// アセンブリ
// System.Collections.dll
// 
// クラスの宣言
// public class ConcurrentQueue<T> :
//     IProducerConsumerCollection<T>, IEnumerable<T>,
//     IEnumerable, ICollection, IReadOnlyCollection<T>

なんか色々インターフェースを継承していますが、スレッドセーフですよーの目印の「IProducerConsumerCollection」を継承しています。まぁでもこれに大した意味はないです。

各種操作

Queue とほぼ同じなのでざっくり以下の通りになります。

ConcurrentQueue<int> queue = new();

queue.Enqueue(0); // 値を入れるときは同じ
queue.Enqueue(1);

// 値を取り出すときはTryXXXを使う
if (queue.TryDequeue(out int a))
{
    Console.WriteLine($"a={a}"); // 取り出せた時だけ処理する
}
if (queue.TryPeek(out int b))
{
    Console.WriteLine($"b={b}"); // 取り出せた時だけ処理する
}

スレッドセーフとは?

最後に Queue と ConcurrentQueue をマルチスレッドで使用したときの挙動の違いを確認します。まずは以下のコードとコメントを確認してください。

private static void Main(string[] args)
{

    QueueMultiThreadTest();
    ConcurrentQueueMultiThreadTest();
}

// (1) ConcurrentQueueをマルチスレッドで動かす
private static void QueueMultiThreadTest()
{
    Queue<int> queue = new();

    // 0~5までの6個の数字をマルチスレッドでQueueに入れる
    Parallel.For(0, 6, i =>
    {
        queue.Enqueue(i);
        // ★例外が出たり,
        // > System.ArgumentException: 'Destination array was not long enough.
        // Check the destination index, length, and the array's lower bounds.Arg_ParamName_Name'
    });

    foreach (var item in queue)
    {
        Console.WriteLine(item);
        // > 0
        // > 0
        // > 1
        // > 5
        // > 2
        // > 4
        // ★同じ値が複数入ったりする
        // ** 出力が順不同なのは仕様
    }

    Console.WriteLine();

    Queue<int> queue2 = new();
    queue2.Enqueue(0);
    queue2.Enqueue(1);
    queue2.Enqueue(2);
    queue2.Enqueue(3);
    queue2.Enqueue(4);
    queue2.Enqueue(5);

    Parallel.For(0, queue2.Count, i =>
    {
        int cnt = queue2.Dequeue();
        Console.WriteLine(cnt);
        // > 0
        // > 1
        // > 2
        // > 4
        // > 3
        // > 2
        // ★同じ値が複数取得されてることがある
        // ** 出力が順不同なのは仕様
    });
}

// (2) ConcurrentQueueをマルチスレッドで動かす
private static void ConcurrentQueueMultiThreadTest()
{
    ConcurrentQueue<int> queue = new();

    // 0~5までの6個の数字をマルチスレッドでConcurrentQueueに入れる
    Parallel.For(0, 6, i =>
    {
        queue.Enqueue(i); // エラーは出ない
    });

    foreach (var item in queue)
    {
        Console.WriteLine(item);
        // > 0
        // > 2
        // > 1
        // > 3
        // > 4
        // > 5
        // 順不同だが全て操作が完了する
        // ★★同じ値が複数回入ったりしない
    }

    Console.WriteLine();

    ConcurrentQueue<int> queue2 = new();
    queue2.Enqueue(0);
    queue2.Enqueue(1);
    queue2.Enqueue(2);
    queue2.Enqueue(3);
    queue2.Enqueue(4);
    queue2.Enqueue(5);

    Parallel.For(0, queue2.Count, i =>
    {
        if (queue2.TryDequeue(out int cnt)) // Try系で取り出す
        {
            Console.WriteLine(cnt);
        }
        // > 0
        // > 1
        // > 3
        // > 2
        // > 4
        // > 5
        // 順不同だが正常に処理が完了する
        // ★★同じ値が複数回取れたりはしない
    });
}

コード中のコメントに書きましたが Queue はマルチスレッドで使用すると内容が滅茶苦茶になります。途中で例外が出ることもあります。逆に ConcurrentQueue は内容に一貫性がある状態を保っています。複数のスレッドから同時に操作しようとしたときに安全かそうでないかが確認できました。

最後に

Queueでできない事

余談ですが Queue でできない事を以下に紹介します。

FIFO で最初に入れたものが最初に取れるという順序が保証できなくなる操作は提供されていません。

//
// Queueでできない事
// - - - - - - - - - - - - - - - - - - -

// ★★初期値を指定してオブジェクトの初期化はできない
Queue<int> queue = new Queue<int>()
{
    0, 10, 100
    // CS1061:
    // 'Queue<int>' に 'Add' の定義が含まれておらず、型 'Queue<int>'
    // の最初の引数を受け付けるアクセス可能な拡張メソッド 'Add' が見つかりませんでした。
};

// ★★インデックスアクセスはできない
int a = queue[1];
// CS0021:
// 角かっこ[] 付きインデックスを 'Queue<int>' 型の式に適用することはできません

// ★★ソートはできない
queue.Sort();
// CS1061:
// 'Queue<int>' に 'Sort' の定義が含まれておらず、型 'Queue<int>'
// の最初の引数を受け付けるアクセス可能な拡張メソッド 'Sort' が見つかりませんでした。

List<T>との使い分け

Queue の機能ですが基本的に List<T> でも同じことができます。List の部分的な機能が Queue と言ってもいいかもしれません。

// 以下のようにすると Queue と同じことができる
List<int> list = new()
{
    0, 10, 100
};

// 末尾に追加
list.Add(1000); 

// 先頭から取り出し & 削除
int i = list[0];
list.RemoveAt(0);

さて、List でも同じようなことができるのに Queue を使用する意義ですが、「ここは FIFO で順序を保証します」という設計意図が保証できます。この制限を他人に強制できてクラス自体余計な操作ができなので List のような柔軟な操作を禁止できます。

この制限により間に値を挿入したり先頭に値を追加することができません。List クラスは柔軟で動的な操作が可能かつ Linq も組み合わせると多様な機能が提供されている反面、自由すぎて実装意図を読み取るのは結構難しいケースがあるため実装の意図の明確化として有用なのではないかと思います。

ただ、最初は Queue で操作を制限していても途中で割り込み挿入が入ったり、ソートが必要等で仕様が変わると結局 List になってしまう事も多いので純粋な Queue が最後まで維持される事があまりないのも印象的です。

以上です。