- 投稿日:2021-10-22T15:21:26+09:00
Couchbase Server Java SDK解説:分散トランザクション〜トランザクションライフサイクル
はじめに Couchbase Serverの分散トランザクションにおけるトランザクションライフサイクルは、基本的にRDB/SQLと同様のものとして理解することができますが、アプリケーション開発において利用するにあたって、独自の考慮点もあります。 Couchbase Serverにおけるトランザクションは、クラスターへの処理リクエスト実行レベルで選択することのできるオプションです。つまり、アプリケーションには、トランザクションアクターと非トランザクションアクターが存在します。 また、このことを別の視点から言い換えると、Couchbase Serverにおけるトランザクションは、SDK/クライアントが主体となって実現される機能であるといえます(中央集権的なプロセスは存在せず、そこから起因する性能影響もありません)。 トランザクションライフサイクル トランザクションの初期化 Transactionsオブジェクトの構築により、バックグラウンドプロセスが自動的に実行され、スレッドプールを含むいくつかのリソースが使用されます。 そのため、アプリケーションでは、Transactionsオブジェクトがひとつだけ作成されるようにすることが非常に重要です。 // Initialize the Couchbase cluster Cluster cluster = Cluster.connect("localhost", "username", "password"); Bucket bucket = cluster.bucket("travel-sample"); Scope scope = bucket.scope("inventory"); Collection collection = scope.collection("airport"); // Create the single Transactions object Transactions transactions = Transactions.create(cluster); 複数のTransactionsオブジェクト利用ケース 通常、アプリケーションに必要なTransactionsオブジェクトは1つだけです。ライブラリーは、ふたつ以上のオブジェクトが作成されると警告を発します。 アプリケーションが複数のTransactionsオブジェクトを作成する必要があるというまれな例外がひとつあります。それは、カスタムメタデータコレクションを利用する場合です。 コミット コミットは自動的に行われます。トランザクションロジックコールバックの最後にctx.commit()の明示的な呼び出しがなく、例外がスローされない場合、コミットされます。 非同期APIでは、commit()の明示的な呼び出しを省略した場合、メソッドチェーンの結果として、.then()を呼び出して、必要なMono<Void>タイプの戻り値に変換する必要がある場合があります。 Mono<TransactionResult> result = transactions.reactive().run((ctx) -> { return ctx.get(collection.reactive(), "anotherDoc").flatMap(doc -> { JsonObject content = doc.contentAs(JsonObject.class); content.put("transactions", "are awesome"); return ctx.replace(doc, content); }).then(); }); トランザクションがコミットされるとすぐに、そのトランザクションのすべての変更が他のトランザクションからの読み取りに対して、アトミックに表示されます。変更はコミット(または「アンステージング」)されるため、非トランザクションのアクターに結果整合性のある方法で表示されます。 コミットは最終的なものであり、トランザクションがコミットされた後は、ロールバックできず、それ以上の操作は許可されません。 トランザクションがコミットポイントに既に到達している場合、直後にアプリケーションがクラッシュした場合でも、非同期クリーンアッププロセスにより、トランザクションが事後的にコミットされます。 非トランザクション書き込みとの同時実行時の考慮 アプリケーションは、同じドキュメントに対して、非トランザクション書き込みがトランザクション書き込みと同時に実行されないようにする必要があります。 この要件は、Key-Value操作のパフォーマンスが損なわれないようにするためのものです。Couchbase Serverのトランザクションにおける重要な哲学は、「使用したものに対してのみ支払う」ということです。 もしも、このような2つ種類の書き込みが競合した場合、トランザクション書き込みは「勝ち」、非トランザクション書き込みを上書きします。 これは書き込みにのみ適用されることに注意してください。トランザクションと同時に行われる非トランザクション読み取りはすべて問題なく、リードコミットレベルの分離性でコントロールされます。 上記の協調に関する要件が満たされていることを検出するために、アプリケーションはクライアントのイベントロガーをサブスクライブし、次のようにIllegalDocumentStateイベントをチェックできます。 cluster.environment().eventBus().subscribe(event -> { if (event instanceof IllegalDocumentState) { // log this event for review } }); このイベントは、非トランザクション書き込みが、トランザクション書き込みにオーバーライドされたことが検出された場合に発生します。イベントには、アプリケーションのデバッグを支援するために、関連するドキュメントのキーが含まれています。 ロールバック トランザクションラムダブロックから(トランザクションまたはアプリケーションライブラリーのいずれかによって)例外がスローされた場合、その試行はロールバックされます。トランザクションロジックは、例外に応じて再試行される場合とされない場合があります。 トランザクションが再試行されない場合、TransactionFailed例外がスローされ、そのgetCauseメソッドを使用して失敗の詳細を確認できます。 アプリケーションはこの例外を使用して、ロールバックをトリガーした理由を次のように通知できます。 class BalanceInsufficient extends RuntimeException { } try { transactions.run((ctx) -> { TransactionGetResult customer = ctx.get(collection, "customer-name"); if (customer.contentAsObject().getInt("balance") < costOfItem) { throw new BalanceInsufficient(); } // else continue transaction }); } catch (TransactionCommitAmbiguous e) { // This exception can only be thrown at the commit point, after the // BalanceInsufficient logic has been passed, so there is no need to // check getCause here. System.err.println("Transaction possibly committed"); for (LogDefer err : e.result().log().logs()) { System.err.println(err.toString()); } } catch (TransactionFailed e) { if (e.getCause() instanceof BalanceInsufficient) { // Re-raise the error throw (RuntimeException) e.getCause(); } else { System.err.println("Transaction did not reach commit point"); for (LogDefer err : e.result().log().logs()) { System.err.println(err.toString()); } } } トランザクションは明示的にロールバックすることもできます。 transactions.run((ctx) -> { TransactionGetResult customer = ctx.get(collection, "customer-name"); if (customer.contentAsObject().getInt("balance") < costOfItem) { ctx.rollback(); } // else continue transaction }); この場合、ctx.rollback()に達すると、トランザクションは正常にロールバックされたと見なされ、TransactionFailed例外はスローされません。 トランザクションがロールバックされた後は、コミットできず、それ以上の操作は許可されず、ライブラリはコードブロックの最後でトランザクションを自動的にコミットしようとしません。 関連する論点 非同期APIによる並行操作 非同期APIを使用すると、トランザクション内で操作を同時に実行できるため、パフォーマンスを向上させることができます。アプリケーションが従う必要のある2つのルールがあります。 最初のミューテーションは、単独で、シリアルに、実行する必要があります。これは、最初のミューテーションがトランザクションのためのメタデータの作成をトリガーするためです。 トランザクションライブラリーが、障害が発生した場合にロールバックする必要のある操作を追跡できるように、すべての同時操作は、完全に完了できる必要があります。つまり、アプリケーションはエラーを「飲み込む」必要がありますが、エラーが発生したことを記録し、並行操作の最後にエラーが発生した場合は、エラーをスローしてトランザクションを再試行します。 これらのルールの実例は、以下のコードに示されています: List<String> docIds = Arrays.asList("doc1", "doc2", "doc3", "doc4", "doc5"); ReactiveCollection coll = collection.reactive(); TransactionResult result = transactions.reactive((ctx) -> { // Tracks whether all operations were successful AtomicBoolean allOpsSucceeded = new AtomicBoolean(true); // The first mutation must be done in serial, as it also creates a metadata // entry return ctx.get(coll, docIds.get(0)).flatMap(doc -> { JsonObject content = doc.contentAsObject(); content.put("value", "updated"); return ctx.replace(doc, content); }) // Do all other docs in parallel .thenMany(Flux.fromIterable(docIds.subList(1, docIds.size())) .flatMap(docId -> ctx.get(coll, docId).flatMap(doc -> { JsonObject content = doc.contentAsObject(); content.put("value", "updated"); return ctx.replace(doc, content); }).onErrorResume(err -> { allOpsSucceeded.set(false); // App should replace this with logging err.printStackTrace(); // Allow other ops to finish return Mono.empty(); }), // Run these in parallel docIds.size()) // The commit or rollback must also be done in serial ).then(Mono.defer(() -> { // Commit iff all ops succeeded if (allOpsSucceeded.get()) { return ctx.commit(); } else { throw new RuntimeException("Retry the transaction"); } })); }).block(); カスタムメタデータコレクション 前述のように、トランザクションは自動的にメタデータドキュメントを作成して使用します。デフォルトでは、これらはトランザクションで最初に変更されたドキュメントのバケットのデフォルトコレクションに作成されます。オプションとして、デフォルト以外のコレクションを指定してメタデータドキュメントを保存できます。ほとんどのユーザーはこの機能を使用する必要はなく、デフォルトの動作を引き続き使用できます。これらは、次のユースケース向けに提供されています。 メタデータドキュメントには、各トランザクションに関係するドキュメントについて、ドキュメントのキーとバケットの名前、スコープ、およびそれが存在するコレクションが含まれています。これらに機密データが含まれ、コレクションレベルのアクセス制御を行う必要がある可能性があります。 デフォルトのコレクションを削除したい場合。実行する前に、デフォルトのコレクションでメタデータドキュメントを使用する既存のすべてのトランザクションが終了していることを確認する必要があります。 使用法 カスタムメタデータコレクションは、次の方法で有効になります。 Collection metadataCollection = null; // this is a Collection opened by your code earlier Transactions transactions = Transactions.create(cluster, TransactionConfigBuilder.create().metadataCollection(metadataCollection)); コレクションを指定した場合: このTransactionsオブジェクトから作成されたトランザクションはすべて、そのコレクションにメタデータを作成して使用します。 このTransactionsオブジェクトによって開始された非同期クリーンアップは、このコレクションでのみ期限切れのトランザクションを検索します。 アプリケーションにRBACデータの読み取りおよび書き込み権限があることを確実する必要があります。また、既存のトランザクションに干渉する可能性があるため、コレクションを削除してはなりません。既存のコレクションを使用することも、新しいコレクションを作成することもできます。 複数のTransactionsオブジェクトを利用する正当性 通常、アプリケーションに必要なTransactionsオブジェクトは1つだけです。ただし、アプリケーションが複数のカスタムメタデータコレクションを持つ必要がある場合が考えられます。複数のTransactionsオブジェクトを作成することは、これを可能にする唯一の方法であるため、この場合は、複数のTransactionsオブジェクトを利用するのは妥当です。 ライブラリーは、このシナリオに基づいて作成されている複数のTransactionsオブジェクトについて警告しません。 遅延コミット 遅延コミット機能は現在アルファ版であり、APIは変更される可能性があります。 遅延コミットにより、コミットポイントの直前でトランザクションを一時停止できます。そのことにより、トランザクションを完了するために必要なすべてのものをコンテキストにバンドルして、文字列またはバイト配列にシリアル化し、他の場所(たとえば、別のプロセス)で逆シリアル化することができます。その後、トランザクションをコミットまたはロールバックできます。 この機能の背後にある目的は、複数のデータベースにまたがる可能性のある複数のトランザクションをコミットポイントの直前に移動し、すべてを一緒にコミットできるようにすることです。 最初のコミットを延期し、トランザクションをシリアル化する例を次に示します。 try { TransactionResult result = transactions.run((ctx) -> { JsonObject initial = JsonObject.create().put("val", 1); ctx.insert(collection, "a-doc-id", initial); // Defer means don't do a commit right now. `serialized` in the result will be // present. ctx.defer(); }); // Available because ctx.defer() was called assert (result.serialized().isPresent()); TransactionSerializedContext serialized = result.serialized().get(); // This is going to store a serialized form of the transaction to pass around byte[] encoded = serialized.encodeAsBytes(); } catch (TransactionFailed e) { // System.err is used for example, log failures to your own logging system System.err.println("Transaction did not reach commit point"); for (LogDefer err : e.result().log().logs()) { System.err.println(err.toString()); } } そして、後でトランザクションをコミットします。 TransactionSerializedContext serialized = TransactionSerializedContext.createFrom(encoded); try { TransactionResult result = transactions.commit(serialized); } catch (TransactionFailed e) { // System.err is used for example, log failures to your own logging system System.err.println("Transaction did not reach commit point"); for (LogDefer err : e.result().log().logs()) { System.err.println(err.toString()); } } または、トランザクションをロールバックすることもできます。 TransactionSerializedContext serialized = TransactionSerializedContext.createFrom(encoded); try { TransactionResult result = transactions.rollback(serialized); } catch (TransactionFailed e) { // System.err is used for example, log failures to your own logging system System.err.println("Transaction did not reach commit point"); for (LogDefer err : e.result().log().logs()) { System.err.println(err.toString()); } } トランザクションの有効期限タイマーは、トランザクションが開始されるとスタートし、トランザクションが遅延状態にある間は一時停止されません。 参考情報 トランザクションのドキュメントには、Couchbaseでトランザクションがどのように機能するかについての説明がたくさんあります。 トランザクションの例のリポジトリで、さらにコード例を見つけることができます。
- 投稿日:2021-10-22T14:45:21+09:00
Couchbase Server Java SDK解説:分散トランザクション〜ログとトレース
はじめに Couchbase Serverの分散トランザクションライブラリーにおける、ログとトレースについて、解説します。 ログ トラブルシューティングを支援するために、各トランザクションはログエントリのリストを維持します。そして、次のようにエラー発生時にログに記録できます。 同期APIの場合: } catch (TransactionCommitAmbiguous e) { // The application will of course want to use its own logging rather // than System.err System.err.println("Transaction possibly committed"); for (LogDefer err : e.result().log().logs()) { System.err.println(err.toString()); } } catch (TransactionFailed e) { System.err.println("Transaction did not reach commit point"); for (LogDefer err : e.result().log().logs()) { System.err.println(err.toString()); } } 非同期APIの場合: }).doOnError(err -> { if (err instanceof TransactionCommitAmbiguous) { System.err.println("Transaction possibly committed: "); } else { System.err.println("Transaction failed: "); } for (LogDefer err : ((TransactionFailed) e).result().log().logs()) { // System.err is used for example, log failures to your own logging system System.err.println(err.toString()); } }); 失敗したトランザクションには数十行、場合によっては数百行のログが含まれる可能性があります。そのため、アプリケーションは失敗したトランザクションを通常のエラーログとは別のファイルに書き込むことが望ましい場合があります。 便宜上、トランザクション関連のログを、他のエラーログと同じ扱いとする構成オプションもあります。この場合、失敗したトランザクションのすべてのログが次のようにWARNレベルでログに記録されます。 .logOnFailure(true, Event.Severity.WARN) Couchbase Server Java SDKのデフォルトでは、ロギングイベントバスは、クラスパスでSLF4J / logback、log4j1、およびlog4j2を検索して使用し、java.util.Loggingにフォールバックするように設定されています。 詳細については、Java SDK loggingドキュメントを参照してください。 java.util.Loggingを使用した例を示します final Logger LOGGER = Logger.getLogger("transactions"); try { TransactionResult result = transactions.run((ctx) -> { // ... transactional code here ... }); } catch (TransactionCommitAmbiguous err) { // The transaction may or may not have reached commit point LOGGER.info("Transaction returned TransactionCommitAmbiguous and" + " may have succeeded, logs:"); err.result().log().logs().forEach(log -> LOGGER.info(log.toString())); } catch (TransactionFailed err) { // The transaction definitely did not reach commit point LOGGER.info("Transaction failed with TransactionFailed, logs:"); err.result().log().logs().forEach(log -> LOGGER.info(log.toString())); } トレース テレメトリは、パフォーマンスの監視に特に役立ちます。 CouchbaseJava SDKがトレース用に構成されている場合、トランザクションを監視するために、それ以上の作業は必要ありません。トランザクションスパンが自動的に出力されます。これを構成する方法については、Couchbase Java SDK Request Tracing ドキュメンテーションを参照してください。 親スパン(Parent Span) トランザクションスパンを使用/出力するには上記で十分ですが、アプリケーションは、トランザクションがより大きなスパンの一部であることを示したい場合があります(たとえば、ユーザー要求)。これは、親スパンとして渡すことで実行できます。 既存のOpenTelemetryスパンがある場合は、それをCouchbase ServerのRequestSpanに簡単に変換して、トランザクションライブラリーに渡すことができます。 Span span = Span.current(); // this is a span created by your code earlier RequestSpan wrapped = OpenTelemetryRequestSpan.wrap(span); transactions.run((ctx) -> { // your transaction }, PerTransactionConfigBuilder.create().parentSpan(wrapped).build());
- 投稿日:2021-10-22T14:32:49+09:00
あったらいいなと思っていたスキルマップアプリをVueとGoで作ってみた
作ってみたもの 作ったWebアプリは Graphyee と名付けました。 技術と技術は関連しているものなので、「スキルマップをグラフ構造で表現できたら分かりやすくて面白いかな」と思いついたのが経緯です。 まだβ版としていますが、一旦使える感じになったのでノリと勢いで公開してみました。 こんな感じで サンプル は誰でも使えます。 サンプルは実績を入力しても保存できないのでご注意下さい。 ユーザ認証すると、より細かい スキルマップ が使えます。 ユーザ認証していただければ、入力した実績が保存されます。 上記リンクからだとAuth0の認証画面へのリダイレクトにやや時間がかかりますが、そのうち認証画面が出てきます。 技術構成 せっかく作ったので、どうやって作ったかを簡単にご紹介します。 フロントエンド Vue 2.6.11 Vuetify 2.4.0 Vuex 3.6.2 axios 0.21.1 Cytoscape.js 3.18.1 Auth0 VueでSPAを作り、UI周りのデザインはVuetifyを活用しました。バックエンドのAPI呼び出しはaxiosです。 Cytoscape.jsを使ってグラフ構造を実現しています。jsonでnodeとedgeを定義するとグラフが描画されます。 sample_node [{"id":"1","name":"node1"},{"id":"2","name":"node2"},{"id":"3","name":"node3"}] sample_edge [{"source":"1","target":"2"},{"source":"2","target":"3"}] 認証機能は、アプリから切り離したかったのでAuth0で実現しています。 Silent AuthenticationやRefresh Token Rotationはこのサイトが理解しやすかったです。 バックエンド Go 1.16 Gin 1.7.1 GORM 1.21.11 フロントエンドから呼び出されるAPIは、GoとGinで実装し、GORMでDBと接続しています。 Goはパッケージ管理の考え方がまだ模索中なのかな?って印象ですが、そこさえちゃんと理解できれば早く立ち上がるので気に入ってます。Ginも簡単なWebアプリ作るなら一瞬だったので、いい感じです。世の中にサンプルも多そうなでキャッチアップしやすい印象です。 GORMはいわゆるORMのクセみたいなのはありましたけど、普通に使う分には問題なく使えました。 最終的には、生のSQLを書きたくなるんですけどね。 インフラ インフラはAWSで構築しました。コンテナ化や触ったことないサービスを使ってみようかなとも思いましたが、個人的に一番立ち上がりが早いサービスを選択しています。 ALB+EC2+RDSというとてもシンプルな構成です。 ここらへんを見ながら何使うか一瞬悩みましたが、また今度挑戦しようかなと思います。 なぜ作ろうと思ったのか IT業界で働いていて、「結局、具体的に何をどこまで経験してきて、何が不足しているのか?」を常々考えていたのが根底にあります。さらに、ある技術を網羅的に学んで仕事に活かすことが、自分にとって「塗り絵」をしていくイメージだったのも強く影響しています。 ただ、いちいち思い出して次は何しようかを考えたり、実績を誰かに伝えたりすることは中々面倒くさいのです。そこで、スキルマップを分かりやすく管理できればいいのでは?と思い立ちました。 使えそうな場面 例えばこんなシーンで使えるのかなと想像しています。是非、継続的に使ってみてください。 特にこれから勉強しようとしている人、勉強中の人向けに使えるのかなと想像しています。 学習や経験が網羅的にできているか知りたい。 学習や経験を計画したり、振り返ったりしたい。 今後どんなことを学習・経験すべきか知りたい。 自分や誰かの足りない部分を発見したい。 使ってみた方へお願い 求められるものが何なのか、今後作り続ける価値があるのか知りたいので、よろしければアドバイスいただけると嬉しいです! 今後 やりたいことや検証してみたいことはたくさんあります。 もし良いフィードバックをいただけたら、下記以外にも色々試してみたいと思います。 コンテンツがJavaだけで寂しいので追加したい。 認知度を向上して色々フィードバックをもらいたい。 誰かのスキルマップとの比較機能を付けたらありがたがられるのか。 API Gateway+コンテナ(Lambda/Fargate)にしたら嬉しいのか。 Github ActionsでCI/CDしたら楽になるのか。 ランニングコストぐらい回収できる感じにしたい。 課金して使える機能と課金の仕組みを入れてみたい。 最後に 作っていて記事になりそうなネタがあればまた投稿するので、その時にまたお会いしましょう。最後までお読みいただき、ありがとうございます!
- 投稿日:2021-10-22T14:08:25+09:00
Couchbase Server Java SDK解説:分散トランザクション 〜N1QLクエリ
N1QLクエリ Couchbase Server 7.0以降、N1QLクエリはトランザクションラムダ内で使用できるようになりました。N1QLクエリとKey-Value操作とを同じラムダ内で組み合わせて利用することができます。 トランザクションの開始 Couchbase Serverでは、SDK利用に限らずに見た場合、トランザクションを開始するには、トランザクションライブラリーを使用する方法と、Queryサービスへのリクエストとして、BEGIN TRANSACTIONを使用する方法の2つがあります。後者の方法は、CLI、REST API、またはWebコンソールUIのクエリワークベンチを使用する場合を想定されています。アプリケーションでは、トランザクションライブラリーを使用します。トランザクションライブラリーは、下記の利点を持ちます。 エラーと再試行を自動的に処理します。 これにより、Key-Value操作とN1QLクエリを自由に組み合わせることができます。 自動的にBEGIN TRANSACTION、END TRANSACTION、COMMITおよびROLLBACKを発行します(ラムダ内でこれらのステートメントを使用すべきではありません)。 サポートされているN1QL N1QL DMLステートメントの大部分は、トランザクション内での使用が許可されます。具体的には、INSERT、UPSERT、DELETE、UPDATE、MERGE、およびSELECTがサポートされています。 CREATE INDEXなどのDDLステートメントの利用は許可されていません。 N1QLの使用 Java SDKのN1QLをすでに使用している場合、トランザクションでの使用は非常に似ています。同じQueryResultを返し、オプションとしてもほとんど同じものが利用できます。 異なる点としては、cluster.query()またはscope.query()ではなく、ラムダ内でctx.query()を使います。 以下は、travel-sampleバケットからいくつかの行を選択する例です。 transactions.run((ctx) -> { String st = "SELECT * FROM `travel-sample`.inventory.hotel WHERE country = $1"; QueryResult qr = ctx.query(st, TransactionQueryOptions.queryOptions() .parameters(JsonArray.from("United Kingdom"))); List<JsonObject> rows = qr.rowsAs(JsonObject.class); }); 完全に修飾されたキースペース名(「 travel-sample.inventory.hotel」)を指定するのではなく、コンテキスト情報(Scopeオブジェクトへの参照)を渡すことができます。 Bucket travelSample = cluster.bucket("travel-sample"); Scope inventory = travelSample.scope("inventory"); transactions.run((ctx) -> { QueryResult qr = ctx.query(inventory, "SELECT * FROM hotel WHERE country = $1", TransactionQueryOptions.queryOptions() .parameters(JsonArray.from("United States"))); List<JsonObject> rows = qr.rowsAs(JsonObject.class); }); ScopeをUPDATEにを使用することもできます。 String hotelChain = "http://marriot%"; String country = "United States"; transactions.run((ctx) -> { QueryResult qr = ctx.query(inventory, "UPDATE hotel SET price = $1 WHERE url LIKE $2 AND country = $3", TransactionQueryOptions.queryOptions() .parameters(JsonArray.from(99.99, hotelChain, country))); assert(qr.metaData().metrics().get().mutationCount() == 1); }); さらに、SELECTとUPDATEを組み合わせることもできます。ここに示すように、ラムダから通常のJavaメソッドを呼び出すことができ、複雑なロジックを実行できます。ラムダは複数回呼び出される可能性があるため、内部で使われているメソッドも複数回呼び出される可能性があることを意識する必要があります。 transactions.run((ctx) -> { // Find all hotels of the chain QueryResult qr = ctx.query(inventory, "SELECT reviews FROM hotel WHERE url LIKE $1 AND country = $2", TransactionQueryOptions.queryOptions() .parameters(JsonArray.from(hotelChain, country))); // This function (not provided here) will use a trained machine learning model to provide a // suitable price based on recent customer reviews. double updatedPrice = priceFromRecentReviews(qr); // Set the price of all hotels in the chain ctx.query(inventory, "UPDATE hotel SET price = $1 WHERE url LIKE $2 AND country = $3", TransactionQueryOptions.queryOptions() .parameters(JsonArray.from(updatedPrice, hotelChain, country))); }); Read Your Own Writes(自分自身の書き込みを読む) N1QLクエリはRead Your Own Writes(自分自身の書き込みを読む)をサポートします。 次の例は、ドキュメントを挿入してから選択する方法を示しています。 transactions.run((ctx) -> { ctx.query("INSERT INTO `default` VALUES ('doc', {'hello':'world'})"); // Performing a 'Read Your Own Write' String st = "SELECT `default`.* FROM `default` WHERE META().id = 'doc'"; QueryResult qr = ctx.query(st); assert(qr.metaData().metrics().get().resultCount() == 1); }); 挿入されたドキュメントは、その時点では、コミットされていないトランザクションとして、ステージングされます。トランザクションがまだコミットされていないため。他のトランザクション、および他の非トランザクションアクターは、このステージングされた挿入をまだ見ることができません。 ただし、同じトランザクション内では、ステージングされたミューテーション(変更)を読み取るため、SELECTが可能です。 Key-ValueとN1QLの混合 Key-Value操作とクエリは自由に組み合わせることができ、期待どおりに相互作用します。 この例では、Key-Valueを使用してドキュメントを挿入し、SELECTを使用してそれを読み取ります。 transactions.run((ctx) -> { ctx.insert(collection, "doc", JsonObject.create().put("hello", "world")); // Performing a 'Read Your Own Write' String st = "SELECT `default`.* FROM `default` WHERE META().id = 'doc'"; QueryResult qr = ctx.query(st); assert(qr.metaData().metrics().get().resultCount() == 1); }); 「Read Your Own Writes」の例と同様に、ここでは挿入はステージングされるだけなので、他のトランザクションや非トランザクションアクターには表示されません。一方、トランザクションラムダ内のクエリーは、同じトランザクション内のKey-Value操作によって挿入されたデータを表示します。 クエリオプション トランザクション(ctc.query)では、N1QLのクエリオプションを適用するために、TransactionsQueryOptionsを介して指定します。これは、QueryOptionsのオプションのサブセットです。 transactions.run((ctx) -> { ctx.query("INSERT INTO `default` VALUES ('doc', {'hello':'world'})", TransactionQueryOptions.queryOptions().profile(QueryProfile.TIMINGS)); }); サポートされているオプションは次のとおりです。 parameters scanConsistency flexIndex serializer clientContextId scanWait scanCap pipelineBatch pipelineCap readonly adhoc raw これらの詳細については、QueryOptionsのドキュメントを参照してください。 クエリの同時実行性 Couchbase Server SDKは、リアクティブ(非同期)プログラミングに対応しています。リアクティブAPIを使用すると、操作を複数同時に実行できますが、Queryサービスによって一度に実行されるクエリステートメントは1つだけです。 リアクティブAPIを使用すると、複数の同時クエリステートメントがリクエストされることになりますが、これにより、再試行により内部的にネットワークトラフィックが追加される可能性があり、クエリーの場合は、パフォーマンスが向上する可能性はほとんどありません(Key-Value操作では効果的に働きます)。 クエリのパフォーマンスアドバイス ここでは、トランザクションのパフォーマンスを最大化するための情報を整理します。 トランザクションラムダで、クエリステートメントを実行した場合、後続のKey-Value操作はN1QLに変換され、Key-Valueデータサービスではなくクエリサービスによって実行されます。操作は同じように動作し、次の2つの警告を除いて、この実装の詳細はほとんど無視できます。 クエリサービスは複数のドキュメントを含むステートメント用に最適化されているため、これらの変換されたKey-Value操作はわずかに遅くなる可能性があります。可能な限り最大のパフォーマンスを求めている場合は、可能な時には、ラムダ内の最初のクエリの前にKey-Value操作を配置することで性能を最適化できます。 リアクティブAPIを使用して同時実行を実現する場合は、変換されたKey-Value操作には、上記と同じ並列処理の制限が適用されることに注意してください。たとえば、クエリサービスによって並列に実行されることはありません。可能な時には、同時Key-Value操作はラムダ内の最初のクエリの前に配置するべきです。 シングルクエリトランザクション ここでは、大規模な一括読み込みトランザクションを実行する場合について扱います。 Queryサービスは、必要に応じて、トランザクション内の各ドキュメントのために、メモリを消費します。これは、コミットまたはロールバック時に解放されます。ほとんどのユースケースでは、これは問題になりませんが、非常に多くのドキュメントの一括読み込みなど、ワークロードによっては、サービスに割り当てられたリソースを超える可能性がありえます。これに対する解決策には、ワークロードをより小さなバッチに分割する、クエリサービスに追加のメモリを割り当てる、といったことが考えられます。 あるいは、ここで説明するシングルクエリトランザクションを使用することもできます。 シングルクエリトランザクションには、次の特性があります。 クエリサービス内のメモリ使用量が大幅に削減される。 名前が示すように、1つのクエリで構成され、Key-Value操作は含まれない。 Couchbaseドキュメントの他の場所でへの参照が表示されます。シングルクエリトランザクションは、内部的にtximplicitクエリパラメータをtrueに設定します。このパラメーターがtrueの場合、Queryサービスはトランザクションを開始し、ステートメントを実行します。実行が成功すると、Queryサービスはトランザクションをコミットします。それ以外の場合、トランザクションはロールバックされます。 シングルクエリトランザクションは、transactions.query()を介して、以下のように開始できます。 String bulkLoadStatement = "INSERT INTO `travel-sample`.inventory.landmark (KEY foo, VALUE bar) SELECT META(doc).id AS foo, doc AS bar FROM `beer-sample` AS doc WHERE type = 'brewery';"; // a bulk-loading N1QL statement try { SingleQueryTransactionResult result = transactions.query(bulkLoadStatement); QueryResult queryResult = result.queryResult(); } catch (TransactionCommitAmbiguous e) { System.err.println("Transaction possibly committed"); for (LogDefer err : e.result().log().logs()) { System.err.println(err.toString()); } } catch (TransactionFailed e) { System.err.println("Transaction did not reach commit point"); for (LogDefer err : e.result().log().logs()) { System.err.println(err.toString()); } } Scopeを指定して、実行することもできます。 Bucket travelSample = cluster.bucket("travel-sample"); Scope inventory = travelSample.scope("inventory"); transactions.query(inventory, bulkLoadStatement); 以下のようにSingleQueryTransactionConfigBuilderを使って、設定を行うことができます。 transactions.query(bulkLoadStatement, SingleQueryTransactionConfigBuilder.create() // Single query transactions will often want to increase the default timeout .expirationTime(Duration.ofSeconds(360)) .build()); 参考情報