Cy#の河合です。Cysharp名義でGitHub/Cysharp上で公開しているOSSの数も19個となるのですが、今回新しく .NET/Unity で使えるハイパフォーマンスなメッセージングライブラリ「MessagePipe」をリリースしました。
主にインメモリでのPublish/Subscribeパターンを支援し、統一的なインターフェイスであらゆるケースを網羅しています。例えばリアルタイム通信におけるMagicOnionやSignalRでの異なるHub間の通信、WPFなどのGUIアプリケーション開発におけるV-VM間の疎結合のためのメッセンジャーパターン(PrismライブラリにおけるEventAggregator)、サーバーアプリケーションでのCQRSアーキテクチャのためのMediatorパターンの実装(ASP.NETでのMediatR)、Unityにおけるゲームオブジェクト間の通信としてZenjectのSignalBusやUniRxのMessageBrokerなどを代替することができます。
Cy#の提供するOSSのポリシーとして、徹底的に研究し最高のものを出す、というものがあります。今回は機能の網羅と、インターフェイスの統一による使いやすさ、そして最高の性能を目指すことも欠かせませんでした。

上のグラフはSubscriberが8個ぶら下がった状態でのPublishのパフォーマンスを示すものです。C#のevent構文よりも高速で、PrismのEventAggregatorと比較すると78倍も高速に動作します。勿論、Publish毎のメモリアロケーションもゼロに抑えられています。
MessagePipeの特徴として、Dependency Injectionが前提になっています。
DIに関しては賛否両論がありますが、 .NET 5においては多くのフレームワークが.NET Generic Hostの上で構築されていることもあり(例えば ASP.NET Core, MagicOnion, ConsoleAppFramework, そしてこれからはMAUIなどのデスクトップアプリケーションも)、DIの存在を前提に作ったほうが綺麗な設計になると考えています。
そのため、セットアップに関しては以下のように一行で済みます(UnityではZenject、またはVContainerを用います(外部DIライブラリ不要なBuiltinの簡易DIも搭載されているので、詳しくはReadMeを参照ください))。
using MessagePipe;
using Microsoft.Extensions.DependencyInjection;
Host.CreateDefaultBuilder()
.ConfigureServices((ctx, services) =>
{
services.AddMessagePipe(); // AddMessagePipe(options => { }) for configure options
})
そして発行側はIPublisher<T>を、購読側はISubscriber<T>をILogger<T>のようにインジェクションします。Tに関してはどのような型でも可能です、例えばprimitive(int, string, etc…)、struct, class, enumなど。
using MessagePipe;
public struct MyEvent { }
public class SceneA
{
readonly IPublisher<MyEvent> publisher;
public SceneA(IPublisher<MyEvent> publisher)
{
this.publisher = publisher;
}
void Send()
{
this.publisher.Publish(new MyEvent());
}
}
public class SceneB
{
readonly ISubscriber<MyEvent> subscriber;
readonly IDisposable disposable;
public SceneB(ISubscriber<MyEvent> subscriber)
{
var bag = DisposableBag.CreateBuilder(); // composite disposable for manage subscription
subscriber.Subscribe(x => Console.WriteLine("here")).AddTo(bag);
disposable = bag.Build();
}
void Close()
{
disposable.Dispose(); // unsubscribe event, all subscription **must** Dispose when completed
}
}
この例では、型によって区別され、SceneAから送信されたメッセージがSceneBに届きます。Subscribeの戻り値はIDisposableで、これによりeventと異なりDisposeするだけで購読を解除することができます。複数のIDisposableはDisposableBag(又はRxに同梱されているCompositeDisposable)によってまとめ上げることができるため、これにより紐付けたいオブジェクトのライフサイクルと合わせて管理することができます。
WPFではメッセンジャーでの購読を弱参照によって保持することが一般的ですが、これは私はアンチパターンだと考えています。弱参照には、GCオブジェクトと紐付けるための不明瞭で暗黙的なルールや、避けられないパフォーマンスの低下があります(ベンチマークでのPrismの許容できないほどの低い性能が示しています)。明示的な購読の管理は、RxによるCompositeDisposableの発明により一括管理ができるようになり、また .NET で広く見られる一般的なパターンとなっているため、すぐに馴染むことができるでしょう。明示的に紐付けるライフサイクルを選べることも、手間ではなくメリットだと考えます。欠点としては紐付けを忘れることによるサブスクリプションリークの発生がありますが、後述する購読管理マネージャーや、DIによるスコープの一括Dispose、そしてAnalyzerによるコンパイルエラー化でカバーしています。
DIを用いることでPublisher/Subscriberオブジェクトの取得がスムーズなことと、スコープ機能を利用することで、スコープ毎にメッセージの送受信者を切り分けられること、そしてスコープ終了時の一括Disposeによるサブスクリプションリークの防止が可能となっています。
上の例では、IPublisher<T>/ISubscriber<T>を用いた、最もシンプルなものですが、MessagePipeには似たようなインターフェイスとして IPublisher<TKey, TMessage>/ISubscriber<TKey, TMessage>というキー(PubSubパターンにおけるtopic)のあるインターフェイスも用意されています。
実際の例として、現在CysharpではUnityをMagicOnionで繋げて、その情報を更にBlazor(C#のみでSPAを作れるRailsのHotwireみたいなもの)でブラウザに届けるアプリケーションを開発中です。ここで問題になったのは、Blazorのページ(ブラウザのライフサイクル)と、MagicOnionのHub(Unityとの接続のライフサイクル)の間での、データの受け渡しです。
Browser <-> Blazor <- ??? -> MagicOnion <-> Unity
これを接続IDをキーにしてMessagePipeでデータを受け渡すことにしました。
Browser <-> Blazor <- [MessagePipe] -> MagicOnion <-> Unity
以下が、コード例です。
// MagicOnion(similar as SignalR, realtime event framework for .NET and Unity)
public class UnityConnectionHub : StreamingHubBase<IUnityConnectionHub, IUnityConnectionHubReceiver>, IUnityConnectionHub
{
readonly IPublisher<Guid, UnitEventData> eventPublisher;
readonly IPublisher<Guid, ConnectionClose> closePublisher;
Guid id;
public UnityConnectionHub(IPublisher<Guid, UnitEventData> eventPublisher, IPublisher<Guid, ConnectionClose> closePublisher)
{
this.eventPublisher = eventPublisher;
this.closePublisher = closePublisher;
}
override async ValueTask OnConnected()
{
this.id = Guid.Parse(Context.Headers["id"]);
}
override async ValueTask OnDisconnected()
{
this.closePublisher.Publish(id, new ConnectionClose()); // publish to browser(Blazor)
}
// called from Client(Unity)
public Task<UnityEventData> SendEventAsync(UnityEventData data)
{
this.eventPublisher.Publish(id, data); // publish to browser(Blazor)
}
}
// Blazor
public partial class BlazorPage : ComponentBase, IDisposable
{
[Parameter]
public Guid ID { get; set; }
[Inject]
ISubscriber<Guid, UnitEventData> UnityEventSubscriber { get; set; }
[Inject]
ISubscriber<Guid, ConnectionClose> ConnectionCloseSubscriber { get; set; }
IDisposable subscription;
protected override void OnInitialized()
{
// receive event from MagicOnion(that is from Unity)
var d1 = UnityEventSubscriber.Subscribe(ID, x =>
{
// do anything...
});
var d2 = ConnectionCloseSubscriber.Subscribe(ID, _ =>
{
// show disconnected thing to view...
subscription?.Dispose(); // and unsubscribe events.
});
subscription = DisposableBag.Create(d1, d2); // combine disposable.
}
public void Dispose()
{
// unsubscribe event when browser is closed.
subscription?.Dispose();
}
}
一つの .NET ソリューションに同居している2つのアプリケーション(MagicOnion, Blazor)を、MessagePipeを通して繋げることができました。連動している型は、同一のソリューションでホストしているUnityとも共有されているため、Unityクライアントからブラウザまでを、C#で統一することによって、シンプルに、一気通貫で接続しています。
ところで、MessagePipeのIPublisher/ISusbcriberがRxのSubjectと異なるのは、OnErrorとOnCompletedが存在しないことです。つまり、OnNextのみが存在する
IObservable<T>とみなせます。このことによって「終わらないことの保証」と「エラーで購読が切れないことの保証」ができます。イベントのハンドリングという観点では、OnError/OnCompletedにより購読が終了する可能性が存在することは、再購読の必要性の判断が生じたりと、考慮事項が非常に増えます。「終わらない」という、Rxよりも表現力の低い制約のある状態にすることで、扱いやすさを向上させています。高い表現力が必要なら、適宜AsObservableしてRxで使えるよう変換することも可能です。なお、これはRxSwiftに導入されている Relay(PublishRelay, BehaivorRelay)と同様の概念です。
キー無し/キー付きのほかに、async/awaitを活用した非同期ハンドラー(IAsyncPublisher/IAsyncSubscriber)や、RxにおけるBehaviorSubjectのように直近の値を保持する(IBufferedPublisher/IBufferedSubscriber)などのバリエーションがあります。
- sync/async
- keyed/keyless
- buffered/bufferless
- broadcast/response(+many)
- in-memory/distributed
全てのインターフェイスの組み合わせは、以下のようになっています。

数が多いのですが、似たような統一的なAPIでまとめてあるので、機能の多さが学習コストの増大に繋がらないようになっているはずです。例えばsync/asyncの違いは、Publish/PublishAsyncと、ハンドラーが非同期化していることだけであり、Publishでメッセージを送信し、Subscribeで購読、それの戻り値のIDisposableを管理する、という一連のフローは全て同一です。
// keyless-sync
public interface IPublisher<TMessage>
{
void Publish(TMessage message);
}
public interface ISubscriber<TMessage>
{
IDisposable Subscribe(IMessageHandler<TMessage> handler, params MessageHandlerFilter<TMessage>[] filters);
}
// keyless-async
public interface IAsyncPublisher<TMessage>
{
ValueTask PublishAsync(TMessage message, AsyncPublishStrategy publishStrategy, CancellationToken cancellationToken = default(CancellationToken));
}
public interface IAsyncSubscriber<TMessage>
{
IDisposable Subscribe(IAsyncMessageHandler<TMessage> asyncHandler, params AsyncMessageHandlerFilter<TMessage>[] filters);
}
Subscriptionの管理
MessagePipeにおいて戻り値のIDisposableは必ず、何らかの形でハンドリングする必要があります。無視すればリークします。しかし、どうしても漏れが発生する可能性はあります(IDisposableをライフサイクルオブジェクトに紐付けたとしても、そのライフサイクルオブジェクトのほうが適切に処理されないで購読が残ってしまうケースなどもあります)。そこで、MessagePipeでは中央で全ての購読数と、必要に応じてSubscribe時のスタックトレースを取得可能にしています。
public sealed class MessagePipeDiagnosticsInfo
{
/// <summary>Get current subscribed count.</summary>
public int SubscribeCount { get; }
/// <summary>
/// When MessagePipeOptions.EnableCaptureStackTrace is enabled, list all stacktrace on subscribe.
/// </summary>
public StackTraceInfo[] GetCapturedStackTraces(bool ascending = true);
/// <summary>
/// When MessagePipeOptions.EnableCaptureStackTrace is enabled, groped by caller of subscribe.
/// </summary>
public ILookup<string, StackTraceInfo> GetGroupedByCaller(bool ascending = true)
}
これでどのような情報が取れるかというと、Unityではエディタ拡張によって上記の管理クラスの結果を、簡単に可視化するMessagePipe Diagnostics Windowが付属しています。

これにより仮にリークがあっても一目瞭然で、すぐに対処することができるでしょう。
そもそもSubscribeの戻り値は必ずハンドリングしなければならないのなら、絶対に無視してはいけない(コンパイルエラーになる)と、より良いはずです。そこで MessagePipe.Analyzer というRoslyn Analyzerを公開しています。

Subscribeの戻り値をハンドリングしないとエラーになるため、これにより戻り値無視によるリークが100%防ぐことができます。
なお、AnalyzerはUnity 2020.2以降であればUnityでも利用可能です。

ただし現状ではUnity-IDEでのAnalyzerの連携が不足しているため、それを補う拡張 Cysharp/CsprojModifierを同時に公開しました。CsprojModifierを使うことでUnityでも確実にMessagePipe.Analyzerを使用することができます。
まとめ
今回紹介しませんでしたが、AsyncPublisherは、全てのSubscriberの非同期処理をawaitによって待つことができたり(これは通常のeventやRxでは不可能)、IDistributedPublisher/Subscriberによってインメモリだけではなくネットワーク越しのPubSubも実現できたり、Filterによって全ての送受信の前後をsync/asyncともにフックしたりできます。
// Filter機能による、同じ値が連続していたら値を送らないという例(RxのDistinctUntilChanged)
public class ChangedValueFilter<T> : MessageHandlerFilter<T>
{
T lastValue;
public override void Handle(T message, Action<T> next)
{
if (EqualityComparer<T>.Default.Equals(message, lastValue))
{
return;
}
lastValue = message;
next(message);
}
}
また、Publisher/Subscriberインターフェイスとは少し異なる IRequestHandler<TRequest, TResponse>では、MediatRのようにmediator patternの実装を支援します。
public interface IRequestHandler<in TRequest, out TResponse>
{
TResponse Invoke(TRequest request);
}
public interface IAsyncRequestHandler<in TRequest, TResponse>
{
ValueTask<TResponse> InvokeAsync(TRequest request, CancellationToken cancellationToken = default);
}
.NET では勿論のこと、Unityでも非常に有益なライブラリとなっていますので、ぜひとも試してみてください。
