【C#】MessagePipeのプロセス間通信で戻り値を受け取る方法

MessagePipe を利用したプロセス間通信で戻り値を受け取る方法の紹介です。

注意:

本記事は MessagePipe と .NETのDI (ServiceCollection) の事をある程度知っている前提で書いているため前提説明が不足している可能性があります。ご了承ください。

前提

まず、前提として、MessagePipe のパイプを使用したプロセス間通信の実装の基本の説明では戻り値を受け取ることができません。

// 送信側の定義
public interface IDistributedPublisher<TKey, TMessage>
{
    ValueTask PublishAsync(TKey key, // awaitはできるが戻り値は受け取れない
        TMessage message,
        CancellationToken cancellationToken = default(CancellationToken));
}


// 受信側の定義
public interface IDistributedSubscriber<TKey, TMessage>
{
    ValueTask<IAsyncDisposable> SubscribeAsync(TKey key, 
        IMessageHandler<TMessage> handler, // これもreturnで値を戻せない
        CancellationToken cancellationToken = default(CancellationToken));

    ValueTask<IAsyncDisposable> SubscribeAsync(TKey key,
        IMessageHandler<TMessage> handler,
        MessageHandlerFilter<TMessage>[] filters, 
        CancellationToken cancellationToken = default(CancellationToken));

    ValueTask<IAsyncDisposable> SubscribeAsync(TKey key,
        IAsyncMessageHandler<TMessage> handler,
        CancellationToken cancellationToken = default(CancellationToken));

    ValueTask<IAsyncDisposable> SubscribeAsync(TKey key,
        IAsyncMessageHandler<TMessage> handler,
        AsyncMessageHandlerFilter<TMessage>[] filters,
        CancellationToken cancellationToken = default(CancellationToken));
}

今回は IDistributedPublisherIDistributedSubscriber 意外を使用して戻り値を受け取る方法を紹介しようと思います。

確認環境

  • .NET8
  • MessagePipe 1.8.1
  • MessagePipe.Interprocess 1.8.1
  • VisualStudio 2022

実装例

あらかじめ MessagePipeMessagePipe.Interprocess を NuGet でプロジェクトに追加しておきます。

クライアント側

まずは送信側です。クライアント側は IRemoteRequestHandler' を取得してしてメッセージをInvokeAsync` でメッセージを送信します。

// MessagePipeのセットアップ
var sc = new ServiceCollection();
IMessagePipeBuilder pb = sc.AddMessagePipe();
pb.AddNamedPipeInterprocess("pipe-name"); // Pipeを使ったプロセス間通信

// 以下でサーバー側のSubを呼び出して引数を受け取れるpubを取得できる
var pub = provider.GetRequiredService<IRemoteRequestHandler<int, int>>(); // <引数の型, 戻り値の型>

// 実際の呼び出し
int ret = await pub.InvokeAsync(10, cts.Token);

// ただし、サーバー側が存在しない場合応答が二度と帰ってこなくなるし
// TokenをCancelしてもキャンセルされない

コメントにもありますが、サーバーが居ないと await が一生戻ってこなくなります。あと固まってしまうと引数に渡した CancellationToken はいうことをきかないです。

これをキャンセルするには以下の通り元の管理オブジェクトを Dispose するしかないようです。

// タイムアウトしたらNamedPipeWorkerをDisposeする。
// CancellationTokenSource cts = new();
var __ = Task.Run(() =>
{
    Thread.Sleep(1000);
    Console.WriteLine("Canceled");
    // 効かない
    // cts.Cancel();
    // 内部の管理オブジェクトをDisposeするとInvokeAsyncの行でキャンセル例外が発生する
    provider.GetRequiredService<NamedPipeWorker>()?.Dispose();
});

停止したタスクを放っておいていいなら以下のように拡張メソッドを定義してタイムアウトしたらタスクをほったらかしにして先に進んでもいいですが、長時間実行するサービスで以下のような実装をするとリソースリークとなる可能性が高いので注意が必要です。

// 一定時間で停止する拡張メソッドの定義
public static class IRemoteRequestHandlerExtensions
{
    public static async Task<TResponse>
        InvokeAsyncWithTimeout<TRequest, TResponse>(
            this IRemoteRequestHandler<TRequest, TResponse> self, 
            TRequest request, // 送信する引数
            TimeSpan timeout) // タイムアウトの時間の指定
    {
        using var cts = new CancellationTokenSource();

        // await InvokeAsyncはキャンセルしても停止できない、対向が存在しないと無限待ちとなるため
        // ユーザー側でキャンセル処理を実装する
        Task<TResponse> task = self.InvokeAsync(request, cts.Token).AsTask();
        var delayTask = Task.Delay(timeout, cts.Token);

        // どちらか早く完了した方を待つ
        var completedTask = await Task.WhenAny(task, delayTask);
        if (completedTask == delayTask)
        {
            // 対向が存在しない、サーバーで処理に時間がかかってるを区別できないため一律Timeout例外とする
            throw new TimeoutException(
                "Request timed out(1). Service not found." +
                "or The process is taking a long time.");
        }
        else
        {
            // 例外が発生した場合に備えてタスクを完了させる
            cts.Cancel();
            return await task;
        }
    }
}

// ----

// タイムアウト指定ありのメソッド呼び出し
pub = provider.GetRequiredService<IRemoteRequestHandler<int, int>>();
Console.WriteLine(">>>");
try
{
    var ret = await pub.InvokeAsyncWithTimeout(2, 3.0);
    Console.WriteLine("Result2=" + ret);
}
catch (TimeoutException ex)
{
    // タイムアウトしたときの処理
    Console.WriteLine(ex.ToString());
}
finally
{
    Console.WriteLine("<<<");
}

サーバー側

次はサーバー側の実装です。

メッセージを受信したときに動作する型を以下の通りあらかじめ作成しておきます。

// まずクライアントで指定したIRemoteRequestHandler<,>に一致する型のクラスを作成する
public class MyAsyncHandler : IAsyncRequestHandler<int, int>
{
    public async ValueTask<int> InvokeAsync(int request,
        CancellationToken cancellationToken = default)
    {
        await Task.Delay(1);
        return request + request;
    }
}

次にこの型を DI に登録します。

var sc = new ServiceCollection();
// MessagePipeのセットアップ
IMessagePipeBuilder pb = sc.AddMessagePipe();
pb.AddNamedPipeInterprocess("pipe-name");
pb.AddAsyncRequestHandler(typeof(MyAsyncHandler)); // 作った型を登録する

// 1件ハンドラーを追加しておく
// ** なぜか追加しないとMyAsyncHandlerがメッセージを受け取れない
ServiceProvider provider = sc.BuildServiceProvider();
provider 
    .GetRequiredService<IDistributedSubscriber<int, int>>()
        .SubscribeAsync(int.MinValue, _ =>
        {
            Console.WriteLine("DummyHander");
        });

これで自動的にメッセージを受信するとサーバー側で処理が呼び出されて戻り値が受け取れます。

まだ、少々問題が残っていますが今回は以上です。