20210411のC#に関する記事は4件です。

【C#】C#のバージョンと.NETのバージョンを確認する方法

C#のバージョン確認方法 コードのどこかに#error versionを入力する ↓こんな感じで入力すると ↓こんな風に、Visual Studio の警告に出てくる これだと私の場合はC#8.0 .NETのバージョン確認方法 System.Runtime.InteropServices をusingして RuntimeInformation.FrameworkDescriptionを参照すれば良い using System; using System.Runtime.InteropServices; class Class1 { static void Main(string[] args) { Console.WriteLine(RuntimeInformation.FrameworkDescription); } } 結果: .NET Core 3.1.8 参考
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

Microsoft Orleans チュートリアル:最小の Orleans アプリケーションを作成する

堅牢でスケーラブルな分散アプリケーションのためのクロスプラットフォームフレームワーク Microsoft Orleans のチュートリアル Tutorial One - Creating a Minimal Orleans Application に従ってアプリケーションを実装する手順をメモっとく。 検証環境 Visual Studio 2019 16.9.3 インストール済みワークロード:.NET Core クロスプラットフォームの開発 Orleans 3.4.2 プロジェクトをセットアップする 下記に示す4つのプロジェクトを作成する。 Grain のインターフェイスを含む GrainInterfaces クラスライブラリプロジェクト Grain の実装を含む Grains クラスライブラリプロジェクト Silo をホストする Silo コンソールプロジェクト クライアントになる Client コンソールプロジェクト 本節のセットアップが完了するとソリューションは最終的に下記のようになる。 構造を作成する Silo コンソールアプリケーションを作成する。ソリューション名は Orleans Basics にする。ターゲットフレームワークは .NET Core 3.1 にする。 ソリューションを右クリックして、「追加」から「新しいプロジェクト」を選択し、Client コンソールアプリケーションを作成する。ターゲットフレームワークは .NET Core 3.1 にする。 同様に、GrainInterfaces クラスライブラリを作成する。ターゲットフレームワークは .NET Standard 2.1 にする。 同様に、Grains クラスライブラリを作成する。ターゲットフレームワークは .NET Standard 2.1 にする。 デフォルトソースファイルを削除する GrainInterfaces プロジェクトの Class1.cs ファイルと Grains プロジェクトの Class1.cs ファイルを削除する。 参照を追加する Grains プロジェクトの依存関係を右クリックし、「プロジェクト参照の追加」を選択する。Grains プロジェクトが GrainInterfaces プロジェクトを参照するように設定する。 同様に、Silo プロジェクトが GrainInterfaces プロジェクトと Grains プロジェクトを参照するように設定する。 同様に、Client プロジェクトが GrainInterfaces プロジェクトを参照するように設定する。 NuGet パッケージを追加する 下記の表にあるとおり各プロジェクトに NuGet パッケージを追加する。 プロジェクト NuGet パッケージ Silo Microsoft.Orleans.Server Silo Microsoft.Extensions.Logging.Console Client Microsoft.Extensions.Logging.Console Client Microsoft.Orleans.Client GrainInterfaces Microsoft.Orleans.Core.Abstractions GrainInterfaces Microsoft.Orleans.CodeGenerator.MSBuild Grains Microsoft.Orleans.CodeGenerator.MSBuild Grains Microsoft.Orleans.Core.Abstractions Grains Microsoft.Extensions.Logging.Abstractions 例えば、Silo プロジェクトに Microsoft.Orleans.Server パッケージを追加するときは、Silo プロジェクトを右クリックし、「NuGet パッケージの管理」を選択する。「参照」タブを選択し、検索キーワードとして Microsoft.Orleans.Server を入力し、Microsoft.Orleans.Server パッケージを選択し、インストールをクリックする。 Grain インターフェイスを定義する GrainInterfaces プロジェクトを右クリックし、「追加」から「新しい項目」を選択する。IHello インターフェイスを IHello.cs という名前で追加する。 IHello.cs を下記のように修正する。 using System.Threading.Tasks; namespace OrleansBasics { public interface IHello : Orleans.IGrainWithIntegerKey { Task<string> SayHello(string greeting); } } Grain クラスを定義する Grains プロジェクトを右クリックし、「追加」から「新しい項目」を選択する。Grains プロジェクトに HelloGrain クラスを HelloGrain.cs という名前で追加する。 HelloGrain.cs を下記のように修正する。 using Microsoft.Extensions.Logging; using System.Threading.Tasks; namespace OrleansBasics { public class HelloGrain : Orleans.Grain, IHello { private readonly ILogger logger; public HelloGrain(ILogger<HelloGrain> logger) { this.logger = logger; } Task<string> IHello.SayHello(string greeting) { logger.LogInformation($"\n SayHello message received: greeting = '{greeting}'"); return Task.FromResult($"\n Client said: '{greeting}', so HelloGrain says: Hello!"); } } } Silo を作成する Silo プロジェクトの Program.cs を下記のように修正する。 using System; using System.Threading.Tasks; using Microsoft.Extensions.Logging; using Orleans; using Orleans.Configuration; using Orleans.Hosting; namespace OrleansBasics { public class Program { public static int Main(string[] args) { return RunMainAsync().Result; } private static async Task<int> RunMainAsync() { try { var host = await StartSilo(); Console.WriteLine("\n\n Press Enter to terminate...\n\n"); Console.ReadLine(); await host.StopAsync(); return 0; } catch (Exception ex) { Console.WriteLine(ex); return 1; } } private static async Task<ISiloHost> StartSilo() { // define the cluster configuration var builder = new SiloHostBuilder() .UseLocalhostClustering() .Configure<ClusterOptions>(options => { options.ClusterId = "dev"; options.ServiceId = "OrleansBasics"; }) .ConfigureApplicationParts(parts => parts.AddApplicationPart(typeof(HelloGrain).Assembly).WithReferences()) .ConfigureLogging(logging => logging.AddConsole()); var host = builder.Build(); await host.StartAsync(); return host; } } } Client を作成する Client プロジェクトの Program.cs を下記のように修正する。 using Microsoft.Extensions.Logging; using Orleans; using Orleans.Configuration; using System; using System.Threading.Tasks; namespace OrleansBasics { public class Program { static int Main(string[] args) { return RunMainAsync().Result; } private static async Task<int> RunMainAsync() { try { using (var client = await ConnectClient()) { await DoClientWork(client); Console.ReadKey(); } return 0; } catch (Exception e) { Console.WriteLine($"\nException while trying to run client: {e.Message}"); Console.WriteLine("Make sure the silo the client is trying to connect to is running."); Console.WriteLine("\nPress any key to exit."); Console.ReadKey(); return 1; } } private static async Task<IClusterClient> ConnectClient() { IClusterClient client; client = new ClientBuilder() .UseLocalhostClustering() .Configure<ClusterOptions>(options => { options.ClusterId = "dev"; options.ServiceId = "OrleansBasics"; }) .ConfigureLogging(logging => logging.AddConsole()) .Build(); await client.Connect(); Console.WriteLine("Client successfully connected to silo host \n"); return client; } private static async Task DoClientWork(IClusterClient client) { // example of calling grains from the initialized client var friend = client.GetGrain<IHello>(0); var response = await friend.SayHello("Good morning, HelloGrain!"); Console.WriteLine($"\n\n{response}\n\n"); } } } アプリケーションを実行する Silo プロジェクトを右クリックし、「デバッグ」から「新しいインスタンスの開始」を選択する。そうすると、下記のようなコンソールが表示され、大量の起動ログと共に Silo アプリケーションが起動する。 Silo アプリケーションを実行した状態で、折りたたまれているソリューションエクスプローラーにある Client プロジェクトを右クリックし、「デバッグ」から「新しいインスタンスの開始」を選択する。そうすると、下記のようなコンソールが表示され、Client アプリケーションが起動する。起動すると即座に Silo アプリケーションで動作している HelloGrain に SayHello メッセージを送信する。Client は "Good morning, HelloGrain!" をメッセージの引数として送信し、HelloGrain は "Hello!" と返信している。 Silo アプリケーションのコンソールにもログが記録され、"Good morning, HelloGrain!" という引数を伴う SayHello メッセージが届いたことを記録している。 今回は堅牢でスケーラブルな分散アプリケーションのためのクロスプラットフォームフレームワーク Microsoft Orleans のチュートリアル Tutorial One - Creating a Minimal Orleans Application に従ってアプリケーションを実装する手順について説明した。Orleans の有用性を示す実用的な例ではないので、今後は Orleans の実用性について理解を深めたい。 参考文献 https://github.com/dotnet/orleans https://dotnet.github.io/orleans/
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

C# EventStoreDB で始めるイベントソーシング

このドキュメントの内容 EventStoreDB で簡単なイベントデータの読み書きを実装してみます。 EventStoreDB とは EventStoreDB は EventStore Ltd. がリリースしているオープンソースのイベントソーシングデータベースです。 サポートされているプラットフォーム 公式のダウンロードページでは次のバイナリが提供されています。 Windows 64-bit (.NET Core 3.1) Linux Ubuntu 16.04 64-bit Ubuntu 18.04 64-bit Ubuntu 20.04 64-bit 今回は Windows のバージョン 20.10 を使用します。 サポートされている開発言語 公式のダウンロードページでは次のクライアントが提供されています。 .NET Java Go Node.js Rust Hashell バージョン 20.10 で gRPC を用いたクライアント API が追加されたようです。 今回は .NET クライアントの gRPC API を使用します。 サーバー実行環境の構築 公式のダウンロードページからバイナリをダウンロードします。Windows の場合は ZIP 圧縮ファイルです。 https://www.eventstore.com/downloads 実行ディレクトリを作ります。おそらくディレクトリ構成に決まりはありません。 E:\ProgramFiles\EventStoreDB : ルートディレクトリ bin : 実行ファイルを配置するディレクトリ cert : 証明書ファイルを配置するディレクトリ ca : ルート証明書ファイルを配置するディレクトリ data : データファイルが出力されるディレクトリ index : インデックスファイルが出力されるディレクトリ log : ログファイルが出力されるディレクトリ SSL を有効にする場合、サーバー証明書(とルート証明書)を準備します。gRPC で SSL を有効にするときと同じです。cert ディレクトリにサーバー証明書を配置します。cert\ca ディレクトリにルート証明書を配置します。 ZIP ファイルを解凍し、実行ファイルを bin ディレクトリに配置します。 構成ファイルを作成します。公式サイトにウィザードがあります。ウィザードに沿って値を設定していくと、構成ファイルの中身を作成してくれます。作成された内容をコピーしたファイルをルートディレクトリに配置します。 https://developers.eventstore.com/server/v20.10/docs/installation/ SampleEventStore.conf --- # Paths Db: E:\ProgramFiles\EventStoreDB\Data Index: E:\ProgramFiles\EventStoreDB\Index Log: E:\ProgramFiles\EventStoreDB\Log # Certificates configuration CertificateFile: E:\ProgramFiles\EventStoreDB\cert\TestServer.crt CertificatePrivateKeyFile: E:\ProgramFiles\EventStoreDB\cert\TestServer.pem TrustedRootCertificatesPath: E:\ProgramFiles\EventStoreDB\cert\ca # Network configuration IntIp: 127.0.0.1 ExtIp: 127.0.0.1 HttpPort: 2113 IntTcpPort: 1112 ExtTcpPort: 1113 EnableExternalTcp: true EnableAtomPubOverHTTP: true # Projections configuration RunProjections: All コマンドプロンプトで次のコマンドを実行すると、サーバーが起動します。 ルートディレクトリで実行するときの例 .\bin\EventStore.ClusterNode.exe --config SampleEventStore.conf SSLを無効にする場合(insecureを指定する) .\bin\EventStore.ClusterNode.exe --config SampleEventStore.conf --insecure ["127.0.0.1:2113"] Sub System '"Projections"' initialized. のようなメッセージが表示されれば起動完了です。終了させる場合は CTRL+C キーを押します。 クライアントアプリケーションの実装 NuGet で次のパッケージをインストールします。 EventStore.Client EventStore.Client.Grpc.Streams 必要に応じて JSON シリアライザをインストールします。今回は Utf8Json を使用することにしました。 イベントデータ 次のコマンドを定義しました。 特定のインターフェースを実装する必要はありませんが、プロダクトではロギングなどの共通処理を透過的に実装できるようにするためにインターフェースを実装することが多いと思います。 public interface IEventCommand { Guid ID { get; } DateTimeOffset CreateAt { get; } } public sealed class SampleEventCommand : IEventCommand { private SampleEventCommand(Guid id, DateTimeOffset createAt, string message) { ID = id; CreateAt = createAt; Message = message; } /// <summary> /// 新規インスタンスを生成します。 /// </summary> /// <param name="message">メッセージ</param> /// <returns></returns> public static SampleEventCommand Create(string message) { return new SampleEventCommand(Guid.NewGuid(), DateTimeOffset.UtcNow, message); } public Guid ID { get; private set; } public DateTimeOffset CreateAt { get; private set; } public string Message { get; set; } public override string ToString() { return $"{CreateAt}: {Message}"; } } シリアライザ Utf8Json を使用しています。 特定のシリアライザへの依存性を軽減させるため、インターフェースを実装しています。 using Utf8Json; using Utf8Json.Resolvers; public interface IEventDataSerializer { ReadOnlyMemory<byte> Serialize<T>(T obj); T Deserialize<T>(ReadOnlyMemory<byte> data); } public class SampleSerializer : IEventDataSerializer { public readonly static SampleSerializer Default = new SampleSerializer(); public ReadOnlyMemory<byte> Serialize<T>(T obj) { return JsonSerializer.Serialize(obj, StandardResolver.AllowPrivate); } public T Deserialize<T>(ReadOnlyMemory<byte> data) { return JsonSerializer.Deserialize<T>(data.ToArray(), StandardResolver.AllowPrivate); } } クライアントプロキシの生成 EventStoreDB クライアントプロキシを生成するメソッドです。 using System.IO; using Grpc.Core; /// <summary> /// EventStoreDB クライアントを生成します。 /// </summary> /// <returns></returns> private EventStoreClient CreateEventStoreClient() { // gRPC で接続するときの接続文字列 var settings = EventStoreClientSettings.Create(@"esdb://127.0.0.1:2113?tls=true"); // ルート証明書を明示的に設定する(自己証明書の場合のみ?) // これを設定しない場合、gRPC のエラー "failed to connect to all addresses." が発生した settings.ChannelCredentials = new SslCredentials(File.ReadAllText(@".\Certs\TestCA.crt")); return new EventStoreClient(settings); } イベントデータの書き込み EventStoreDB にイベントデータを書き込むメソッドです。 イベントタイプとストリーム名を指定する必要があります。 using EventStore.Client; /// <summary> /// 指定されたコマンドをイベントソースに書き込みます。 /// </summary> /// <param name="command">コマンド</param> /// <returns></returns> private Task<IWriteResult> WriteCommandAsync(SampleEventCommand command) { // TODO: コマンドの型からイベントタイプとストリーム名を特定できるようにするのが望ましい return WriteCommandAsync("sampleEvent", "sampleStream", command); } /// <summary> /// 指定されたコマンドをイベントソースに書き込みます。 /// </summary> /// <typeparam name="TCommand">コマンドの型</typeparam> /// <param name="eventType">イベントタイプ</param> /// <param name="streamName">ストリーム名</param> /// <param name="command">コマンド</param> /// <returns>書き込み結果</returns> private Task<IWriteResult> WriteCommandAsync<TCommand>(string eventType, string streamName, TCommand command) where TCommand : IEventCommand { return WriteCommandsAsync(eventType, streamName, new[] { command }); } /// <summary> /// 指定されたコマンドをイベントソースに書き込みます。 /// </summary> /// <typeparam name="TCommand">コマンドの型</typeparam> /// <param name="eventType">イベントタイプ</param> /// <param name="streamName">ストリーム名</param> /// <param name="commands">コマンド</param> /// <returns>書き込み結果</returns> private async Task<IWriteResult> WriteCommandsAsync<TCommand>(string eventType, string streamName, IEnumerable<TCommand> commands) where TCommand : IEventCommand { // コマンドを格納したイベントデータを列挙するメソッド static IEnumerable<EventData> ToEventData(string eventType, IEnumerable<TCommand> commands) { foreach (var obj in commands) { yield return new EventData( Uuid.NewUuid() , eventType , m_Serializer.Serialize(obj) ); } } // 頻繁に書き込みを行うアプリケーションの場合、クライアントの生成と破棄を繰り返さないほうがよいと思われる await using EventStoreClient client = CreateEventStoreClient(); return await client.AppendToStreamAsync( streamName , StreamState.Any , ToEventData(eventType, commands) ).ConfigureAwait(false); } イベントデータの読み込み gRPC クライアントではイベントデータを読み込む二つのメソッドが提供されています。 指定したストリームからイベントデータを読み込む ReadStreamAsync メソッド 全てのストリームから全てのイベントデータを読み込む ReadAllAsync メソッド EventStoreClient.ReadStreamAsync メソッド 特定のイベントデータを読み込むときに使用します。 読み込み開始位置を指定する場合、シーケンシャルに付番されるイベント番号を指定します。 今回は読み込んだ際に取得された読み込み位置をアプリケーション内のフィールドに保存しています。プロダクトではデータベースなどに永続化することになります。 最後に読み込んだ位置から次の読み込み位置を取得する using EventStore.Client; /// <summary> /// ストリーム名と最後に読み込んだ位置の組み合わせ /// </summary> private Dictionary<string, StreamPosition> LastStreamPosition { get; } = new Dictionary<string, StreamPosition>(); /// <summary> /// 指定されたストリームの次の読み込み位置を取得します。 /// </summary> /// <param name="streamName">ストリーム名</param> /// <returns>位置</returns> private StreamPosition GetNextStreamPosition(string streamName) { if (LastStreamPosition.TryGetValue(streamName, out StreamPosition position)) { return position.Next(); } else { return StreamPosition.Start; } } ストリームからコマンドを読み込む using EventStore.Client; private static readonly IEventDataSerializer Serializer = SampleSerializer.Default; /// <summary> /// 指定されたストリームからコマンドを読み込みます。 /// </summary> /// <typeparam name="TCommand">コマンドの型</typeparam> /// <param name="streamName">ストリーム名</param> /// <returns>非同期ストリーム</returns> private IAsyncEnumerable<TCommand> ReadCommandsAsync<TCommand>(string streamName) where TCommand : IEventCommand { // 次の読み込み位置を指定する return ReadCommandsAsync<TCommand>(streamName, GetNextStreamPosition(streamName)); } /// <summary> /// 指定されたストリームからコマンドを読み込みます。 /// </summary> /// <typeparam name="TCommand">コマンドの型</typeparam> /// <param name="streamName">ストリーム名</param> /// <param name="startPosition">読み込み開始位置</param> /// <returns>非同期ストリーム</returns> private async IAsyncEnumerable<TCommand> ReadCommandsAsync<TCommand>(string streamName, StreamPosition startPosition) where TCommand : IEventCommand { // 頻繁に書き込みを行うアプリケーションの場合、クライアントの生成と破棄を繰り返さないほうがよいと思われる await using var client = CreateEventStoreClient(); await using var stream = client.ReadStreamAsync(Direction.Forwards, streamName, startPosition); if (await stream.ReadState == ReadState.Ok) { while (await stream.MoveNextAsync().ConfigureAwait(false)) { yield return Serializer.Deserialize<TCommand>(stream.Current.Event.Data); // 最後に読み込んだ位置を格納する LastStreamPosition[streamName] = stream.Current.Event.EventNumber; } } } EventStoreClient.ReadAllAsync メソッド 全てのイベントデータを読み込むときに使用します。読み込まれたイベントタイプからイベントを特定してハンドリングすることになります。 読み込み開始位置を指定する場合、トランザクションログ上のバイト位置を指定します。次のバイト位置を取得する方法はわかりませんでした。最後に読み込んだ位置を開始位置に指定し、取得された位置が最後に読み込んだ位置と一致する場合はスキップしています。イベントデータの削除や再配置を行ったときにバイト位置が変わる可能性があり、この方法が正しいかどうかは検証が必要です。 今回は読み込んだ際に取得された読み込み位置をアプリケーション内のフィールドに保存しています。プロダクトではデータベースなどに永続化することになります。 読み込まれるイベントデータには管理イベントも含まれるため、管理者権限が要求されます。 最後に読み込んだ位置を取得する using EventStore.Client; /// <summary> /// 最後に読み込んだ位置 /// </summary> private Position? LastTransactionPosition = null; /// <summary> /// 最後に読み込んだ位置を取得します。 /// </summary> /// <returns></returns> private Position GetLastTransactionPosition() { return LastTransactionPosition ?? Position.Start; } トランザクションログから全てのデータを読み込んでコマンドを列挙する using EventStore.Client; private static readonly IEventDataSerializer Serializer = SampleSerializer.Default; /// <summary> /// トランザクションログからコマンドを読み込みます。 /// </summary> /// <returns>非同期ストリーム</returns> private IAsyncEnumerable<IEventCommand> ReadAllCommandsAsync() { // 最後の読み込み位置を指定する return ReadAllCommandsAsync(GetLastTransactionPosition()); } /// <summary> /// トランザクションログからコマンドを読み込みます。 /// </summary> /// <param name="lastPosition">最後の読み込み位置</param> /// <returns>非同期ストリーム</returns> private async IAsyncEnumerable<IEventCommand> ReadAllCommandsAsync(Position lastPosition) { // 頻繁に書き込みを行うアプリケーションの場合、クライアントの生成と破棄を繰り返さないほうがよいと思われる await using EventStoreClient client = CreateEventStoreClient(); await foreach (var result in client.ReadAllAsync( Direction.Forwards , lastPosition , userCredentials: new UserCredentials("admin", "changeit"))) { // TODO: 最後に読み込んだときの位置と同じ場合はスキップする。位置が不変であるのか調査が必要。 if (lastPosition.CommitPosition >= result.Event.Position.CommitPosition) { continue; } // イベントからコマンドを取得して列挙する if (TryGetCommand(@event, out IEventCommand cmd)) { yield return cmd; } // 最後に読み込んだ位置を格納する LastTransactionPosition = result.Event.Position; } } /// <summary> /// 指定されたイベントからコマンドを取得します。 /// </summary> /// <param name="event">イベント</param> /// <param name="command">コマンド</param> /// <returns>取得できた場合、true を返します。</returns> private bool TryGetCommand(ResolvedEvent @event, out IEventCommand command) { // TODO: イベントタイプからコマンドの型を特定できるようにすることが望ましい if (@event.Event.EventType == "sampleEvent") { command = Serializer.Deserialize<SampleEventCommand>(@event.Event.Data); return true; } command = null; return false; } /// <summary> /// 指定されたイベントからコマンドを取得します。 /// </summary> /// <param name="event">イベント</param> /// <param name="command">コマンド</param> /// <returns>取得できた場合、true を返します。</returns> private bool TryGetCommand<TCommand>(ResolvedEvent @event, out TCommand command) where TCommand : IEventCommand { // TODO: イベントタイプからコマンドの型を特定できるようにすることが望ましい if (@event.Event.EventType == "sampleEvent") { command = (TCommand)(IEventCommand)Serializer.Deserialize<SampleEventCommand>(@event.Event.Data); return true; } command = default; return false; } イベントデータの購読 Pub/Sub メッセージングモデルの購読側です。Reactive に処理を行う必要がある場合は、読み込みよりも購読が適しています。 イベントデータに対する処理自体は読み込みとほぼ同じです。 前述の読み込みと同様、gRPC クライアントではイベントデータを購読する二つのメソッドが提供されています。 指定したストリームからイベントデータを購読する ReadStreamAsync メソッド 全てのストリームから全てのイベントデータを購読する SubscribeToAllAsync メソッド 購読の状態を管理するオブジェクトを用いて、購読の終了処理をカプセル化しました。クライアントとキャンセルトークンは購読開始メソッド内のローカル変数としています。 EventStoreClient.SubscribeToAllAsync メソッド 購読の開始と終了 // 購読の状態を管理するオブジェクト private StreamSubscriptionState m_SubcribeSampleCommandState; /// <summary> /// コマンドの購読を開始します。 /// </summary> /// <returns></returns> private async Task StartSubcribeSampleCommandAsync() { // コマンドを受け取ったときの処理 Task onReceiveCommandAsync(SampleEventCommand command) { System.Diagnostics.Debug.WriteLine($"onReceiveCommandAsync: {command}"); return Task.CompletedTask; } m_SubcribeSampleCommandState = await SubcribeCommandAsync<SampleEventCommand>("sampleStream", onReceiveCommandAsync).ConfigureAwait(false); } /// <summary> /// コマンドの購読を停止します。 /// </summary> private void StopSubcribeSampleCommand() { m_SubcribeSampleCommandState?.Dispose(); } 主処理 /// <summary> /// 指定されたコマンドの購読を開始します。 /// </summary> /// <typeparam name="TCommand">コマンドの型</typeparam> /// <param name="streamName">ストリーム名</param> /// <param name="startPosition">読み込み開始位置</param> /// <param name="onReceiveAsync">コマンドを受信したときに実行するメソッド</param> /// <returns>購読の状態を管理するオブジェクト</returns> private async Task<StreamSubscriptionState> SubcribeCommandAsync<TCommand>(string streamName, StreamPosition startPosition, Func<TCommand, Task> onReceiveAsync) where TCommand : IEventCommand { // 購読をキャンセルするためのトークン CancellationTokenSource cancellation = new CancellationTokenSource(); EventStoreClient client = CreateEventStoreClient(); // イベントを受け取ったときの処理 async Task OnEventAsync(StreamSubscription subscription, ResolvedEvent @event, CancellationToken cancellation) { // イベントからコマンドを取得する // SubscribeToStreamAsync メソッドの引数にはイベントタイプがない // 購読対象でない型のコマンドである可能性がある if (TryGetCommand(@event, out TCommand cmd)) { await onReceiveAsync(cmd).ConfigureAwait(false); } } var subscription = await client.SubscribeToStreamAsync( streamName , startPosition , OnEventAsync , cancellationToken: cancellation.Token ).ConfigureAwait(false); // 購読に関連するオブジェクトをまとめた状態オブジェクトを返す return new StreamSubscriptionState(client, subscription, cancellation); } EventStoreClient.SubscribeToAllAsync メソッド 購読の開始と終了 // 購読の状態を管理するオブジェクト private StreamSubscriptionState m_SubcribeAllCommandState; /// <summary> /// 全てのコマンドの購読を開始します。 /// </summary> /// <returns></returns> private async Task StartSubcribeAllCommandAsync() { // コマンドを受け取ったときの処理 Task onReceiveAllCommandAsync(IEventCommand command) { System.Diagnostics.Debug.WriteLine($"onReceiveAllCommandAsync: {command}"); return Task.CompletedTask; } m_SubcribeAllCommandState = await SubcribeAllCommandAsync(onReceiveAllCommandAsync).ConfigureAwait(false); } /// <summary> /// 全てのコマンドの購読を停止します。 /// </summary> private void StopSubcribeAllCommand() { m_SubcribeAllCommandState?.Dispose(); } 主処理 /// <summary> /// 全てのコマンドの購読を開始します。 /// </summary> /// <param name="lastPosition">最後の読み込み位置</param> /// <param name="onReceiveAsync">コマンドを受信したときに実行するメソッド</param> /// <returns>購読の状態を管理するオブジェクト</returns> private async Task<StreamSubscriptionState> SubcribeAllCommandAsync(Position lastPosition, Func<IEventCommand, Task> onReceiveAsync) { // 購読をキャンセルするためのトークン CancellationTokenSource cancellation = new CancellationTokenSource(); EventStoreClient client = CreateEventStoreClient(); // イベントを受け取ったときの処理 async Task OnEventAsync(StreamSubscription subscription, ResolvedEvent @event, CancellationToken cancellation) { // TODO: 最後に読み込んだときの位置と同じ場合はスキップする。位置が不変であるのか調査が必要。 if (lastPosition.CommitPosition >= @event.Event.Position.CommitPosition) { return; } // イベントからコマンドを取得する if (TryGetCommand(@event, out IEventCommand cmd)) { await onReceiveAsync(cmd).ConfigureAwait(false); } } var subscription = await client.SubscribeToAllAsync( lastPosition , OnEventAsync , cancellationToken: cancellation.Token , userCredentials: new EventStore.Client.UserCredentials("admin", "changeit") ).ConfigureAwait(false); // 購読に関連するオブジェクトをまとめた状態オブジェクトを返す return new StreamSubscriptionState(client, subscription, cancellation); } 購読の状態を管理するオブジェクト IDisposable インターフェースを実装し、Dispose メソッドで購読に関連する各オブジェクトの破棄を行っています。 /// <summary> /// 購読中の状態を管理します。 /// </summary> public class StreamSubscriptionState : IDisposable { public StreamSubscriptionState( EventStoreClient client , StreamSubscription subscription , CancellationTokenSource cancellation , bool clientDisposable = true) { m_Client = client; m_Subscription = subscription; m_CancellationToken = cancellation; m_ClientDisposable = clientDisposable; } private readonly EventStoreClient m_Client; private readonly StreamSubscription m_Subscription; private readonly CancellationTokenSource m_CancellationToken; private readonly bool m_ClientDisposable; public void Dispose() { if (!m_CancellationToken.IsCancellationRequested) { m_CancellationToken.Cancel(); } m_Subscription.Dispose(); if (m_ClientDisposable) { m_Client.Dispose(); } m_CancellationToken.Dispose(); } } イベントデータの内容 読み込みや購読で取得されるイベントデータには、EventRecord オブジェクトが格納されています。 プロパティ 内容 ContentType 常に "application/json" ? Created 生成された日時。UTC。 Data コマンドなどをシリアライズしたバイト配列。 EventId 書き込み時に指定したイベントID。UUID は時系列にソート可能なID。 EventNumber シーケンシャルに設定されるイベント番号。Stream 系の読み込み/購読メソッドで開始位置を指定する場合はこの型の値で指定する。 EventStreamId 書き込み時に指定したストリーム名。 EventType 書き込み時に指定したイベントタイプ。 Metadata メタデータをシリアライズしたバイト配列。 Position トランザクションログ上の位置。おそらく何バイト目かを表している。All 系の読み込み/購読メソッドで開始位置を指定する場合はこの型の値で指定する。 参考:Kafka との比較 次のコラムによると 読み書きの機能は EventStoreDB が優れている。 スループットとスケーラビリティは Kafka が優れている。 併用することによって両者のメリットを得られる。
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

検索ワードの人気からみる C# と VB.net

C#とVB.netといえば同じ.NET(Framework)で動き、相互変換できるので 兄弟のような言語です。 ビルドすると、どちらもDLLという実行ファイルになったり プロジェクトの中にC#とVB.netが混在できたりと 非常に密接な関係になってます。 VB.netは独特な機能や文法ゆえに、C#と比べると大変不評ですが、、 Googleトレンドで比較 Googleトレンドとは Googleトレンド って何や? という人向けに簡単に説明すると Google先生でどのくらい検索されているかが見れる、Googleが提供しているサービスです。 期間とか地域とかで色々見れちゃいます! 過去1年を比較 全世界 青がC#、赤がvb.net 平均:C# = 87, VB.net = 6 あ、あれ( ;゚д゚) こんなに差があるのか、、 平均値で見ると、C#はVB.netの14.5倍の人気という結果に これは全世界なので、日本だけにしてみます。 日本 平均:C# = 78, VB.net = 9 全世界よりも少しだけグラフが近づいて、平均でC#が8.6倍となりましたね 世界よりも日本はVB.netの人気があるのかもしれません お隣の最大人口を誇る国家も見てみます。 中国 平均:C# = 59, VB.net = 1 中国でVB.netはほぼ使われてないみたいです 笑 アメリカ 平均:C# = 83, VB.net = 5 アメリカさんは世界のトレンドとほぼ変わらないようですね。 さすがはアメリカです。 過去17年を比較 過去1年だと正直良く分からん! ということで 一番古いデータの2004年から現在までを見てみます。 全世界 平均:C# = 62, VB.net = 14 正直17年間の平均とかあまり意味がない気がしますが 上で書いた手前一応書いてます。 どちらも人気が下降していて、2004~2005付近が一番人気が高かったようです。 2004~2005と現在を比べてみると C#は 1/3 VB.netは 1/15 程下がっていて、VB.netの下降具合は顕著でした。 日本 平均:C# = 55, VB.net = 11 先程の全世界比較すると、日本では下降し始めるのが遅いですね。 C#は2006~2009、VB.netは2005~2009付近が高かったようです。 高かった時期と現在だと C#は 1/3近く VB.netは 1/6 ~ 1/7 程度は下がっていますね。 やはり、日本はC#とVB.netの差が世界よりは少ないです。 感想 予想通りというか当然というか 圧倒的C#が人気でしたね。 VB.netとC#両方書いている身としては、C#の方が読みやすくて使いやすいので C#を推していきたいです。
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む