【C#】コマンド実行用のバッファリングキューを実装する

バッファーがいっぱいになるまではデータをバッファリングをしながらバックグランドで1つずつ順番にデータを処理して、バッファーがいっぱいになったら空きができるまで待機となるコマンド実行用のデータキューイングクラスの実装例の紹介です。

確認環境

  • .NET Framwork 4.8.1
  • C# 7.3
  • Visual Studio 2022

IDE上のデバッグ実行で動作を確認。

少し古めの環境で作成したので .NET のほぼすべての環境で使用できると思います。

使い方

先に使い方を紹介したいと思います。このクラスの使用方法は、Init で初期化した後、MaxCapacity でバッファー最大値を指定、ProcItem に要素の処理方法を指定してから EnqueueAsync でデータをバッファリングしながら処理するように指示をします。

using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

internal class AppMain
{
    static readonly BufferdQueue<int> _queue = new BufferdQueue<int>();

    private static void Main(string[] args)
    {
        // 使う前に初期化する
        _queue.Init();

        // バッファーの最大容量を50に変更
        _queue.MaxCapacity = 50;

        // バッファリングしたデータに対する処理の登録
        _queue.ProcItem += item =>
        {
            Thread.Sleep(10); // ちょっと遅い処理
            Trace.WriteLine($"Proc={item}");
        };

        RunFireAndforget();

        Console.ReadLine();

        // バッファリング処理を終了するときに呼び出す
        _queue.Terminate();
    }

    // 非同期で処理を実行する
    private static void RunFireAndforget()
    {
        Task.Run(async () =>
        {
            for (int i = 0; i < 500; i++)
            {
                // キューが一杯になるまではどんどんバッファリングされていく
                // いっぱいになると空きができるまで待機になる
                await _queue.EnqueueAsync(i);
            }
        });
    }
}

実装コード

実装コードは以下の通りです。2つの待機ハンドルを使ってバックグラウンドのスレッドの進行制御とユーザーがデータを追加したときの進行制御を行っています。

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

// ユーザー側の処理速度のほうがバックエンドの処理速度より速いときに
// いちいちメソッドをブロックキングで待機すると
// 全体の処理速度が低下するのをバッファリングすることで緩和する事を目的としたクラス
//
// メモ:
// バッファーがいっぱいになると同期実行に切り替わって実行速度が低下するので
// 処理に局所性があるもに対して十分なキャパシティを確保して使用すること
public class BufferdQueue<T>
{
    readonly Queue<T> _queue = new Queue<T>();
    // キューの排他アクセス用
    readonly object _lockObject = new object();
    // バックエンドのスレッド用の待機ハンドル
    readonly EventWaitHandle _ProcActionHandle 
        = new EventWaitHandle(false, EventResetMode.ManualReset);
    // フロントのキュー用の待機ハンドル
    readonly EventWaitHandle _EnqueueHandle
        = new EventWaitHandle(true, EventResetMode.ManualReset);

    // Queueに蓄積できる最大容量
    int _maxCapacity = 1000;
    // Queueがバッファリングを再開する閾値
    int _threthold;
    // 終了要求を受け付けたかどうか
    // true: 受け付けた / false: それ以外
    bool _isTerminate;
    // バックエンドのバッファを処理するスレッド
    Thread _procThread;
    // 中断を受け付けたかどうかのフラグ
    // true: 受け付けた / false: それ以外
    bool _isAbort;

    // キューの最大容量
    public int MaxCapacity
    {
        get => _maxCapacity;
        set
        {
            if (_maxCapacity < 16) // あまりに小さい数値は受け付けない
            {
                throw new InvalidOperationException("Less than 16 cannot be specified.");
            }
            lock (_lockObject)
            {
                _threthold = value - 1;
                _maxCapacity = value;
            }
        }
    }

    // キューから取り出したデータに対する処理を行う時に発生する
    public event Action<T> ProcItem;

    // Constructors - - - - - - - - - -

    public BufferdQueue()
    {
        _threthold = _maxCapacity - 1;
    }

    // Public Methods - - - - - - - - - -

    // 使う前に呼び出すこと
    public void Init()
    {
        _procThread = new Thread(new ThreadStart(ProcAction))
        {
            IsBackground = true
        };
        _procThread.Start();
    }

    // バッファーに余裕があるときはitemをキューイングして即座に終了
    // or
    // バッファーがいっぱいならバッファーに空きが出るまでブロックして待機する
    public void Enqueue(T item)
    {
        CheckInitializedIfThrowException();

        lock (_lockObject)
        {
            _queue.Enqueue(item);
            _ProcActionHandle.Set();
            if (_queue.Count > MaxCapacity)
            {
                _EnqueueHandle.Reset();
                //Trace.WriteLine("Max capacity1");
            }
        }
        _EnqueueHandle.WaitOne(); // バックグラウンドで処理がはけるまで待機
    }

    public async Task EnqueueAsync(T item)
    {
        CheckInitializedIfThrowException();

        await Task.Run(() =>
        {
            lock (_lockObject)
            {
                _queue.Enqueue(item);
                _ProcActionHandle.Set();
                if (_queue.Count >= MaxCapacity)
                {
                    _EnqueueHandle.Reset();
                }
            }
            _EnqueueHandle.WaitOne(); // バックグラウンドで処理がはけるまで待機
        });
    }

    public void Terminate()
    {
        CheckInitializedIfThrowException();
        _isAbort = true;
        _EnqueueHandle.Set();

        while (!_isTerminate)
        {
            Thread.Sleep(1);
        }
    }

    // Private Methods - - - - - - - - - -

    private void CheckInitializedIfThrowException()
    {
        if (_procThread == null)
        {
            throw new InvalidOperationException("Object not initialized.");
        }
    }

    private void ProcAction()
    {
        try
        {
            //Trace.WriteLine("ProcAction Start");

            while (true)
            {
                _ProcActionHandle.WaitOne();

                if (_isAbort)
                {
                    lock (_lockObject)
                    {
                        _queue.Clear();
                    }
                    break;
                }

                T item;
                lock (_lockObject)
                {
                    if (_queue.Count == 0)
                    {
                        _ProcActionHandle.Reset();
                        continue;
                    }
                    else
                    {
                        item = _queue.Dequeue();
                    }

                    if (_queue.Count <= _threthold)
                    {
                        _EnqueueHandle.Set();
                        //Trace.WriteLine($"Free capacity={_queue.Count}");
                    }
                }
                
                try
                {
                    ProcItem?.Invoke(item);
                }
                catch (Exception) { }
            }
            _queue.Clear(); // abortで抜けたら要素は全て解放
        }
        finally
        {
            _isTerminate = true;
            //Trace.WriteLine("ProcAction End");
        }
    }
}