このマニュアルは進行中の作業であり、現在不完全です。
改善に協力していただける場合は、READMEをご覧ください。

15 RxJava

優れた RxJava は、非同期操作をエレガントに合成するために Ratpack アプリケーションで使用できます。

ratpack-rx2 モジュールは、Ratpack プロミスを RxJava の Observable に適応するための静的メソッドを提供する RxRatpack クラスを提供します。

2.0.0-rc-1 以降の ratpack-rx2 モジュールは、RxJava 2.2.21 に基づいて構築されています(および依存しています)。

1.15 初期化

RxRatpack.initialize() を呼び出して完全に統合を有効にする必要があります。このメソッドは JVM のライフタイムに対して一度だけ呼び出す必要があります。

2.15 Ratpack の監視

この統合は、RxRatpack.single()RxRatpack.observe() の静的メソッドに基づいています。これらのメソッドは Ratpack のプロミス型をオブザーバブルに変換し、その後、RxJava が提供するすべてのオブザーバブル演算子で使用できます。

たとえば、ブロッキング操作は簡単に観測できます。

import ratpack.exec.Promise;
import ratpack.exec.Blocking;
import ratpack.test.handling.HandlingResult;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static ratpack.rx2.RxRatpack.single;
import static ratpack.test.handling.RequestFixture.requestFixture;

public class Example {
  public static void main(String... args) throws Exception {
    HandlingResult result = requestFixture().handle(context -> {
      Promise<String> promise = Blocking.get(() -> "hello world");
      single(promise).map(String::toUpperCase).subscribe(context::render);
    });

    assertEquals("HELLO WORLD", result.rendered(String.class));
  }
}

3.15 暗黙的なエラー処理

RxJava 統合の主な機能の 1 つは、暗黙的なエラー処理です。すべてのオブザーバブルシーケンスには、例外を実行コンテキストエラーハンドラに転送するという暗黙的な既定のエラー処理戦略があります。これは実際には、エラーハンドラはオブザーバブルシーケンスに対してめったに定義する必要がないことを意味します。

import ratpack.core.error.ServerErrorHandler;
import ratpack.rx2.RxRatpack;
import ratpack.test.handling.RequestFixture;
import ratpack.test.handling.HandlingResult;
import io.reactivex.Observable;

import static org.junit.jupiter.api.Assertions.assertEquals;

public class Example {
  public static void main(String... args) throws Exception {
    RxRatpack.initialize(); // must be called once per JVM

    HandlingResult result = RequestFixture.requestFixture().handleChain(chain -> {
      chain.register(registry ->
          registry.add(ServerErrorHandler.class, (context, throwable) ->
              context.render("caught by error handler: " + throwable.getMessage())
          )
      );

      chain.get(ctx -> Observable.<String>error(new Exception("!")).subscribe((s) -> {}));
    });

    assertEquals("caught by error handler: !", result.rendered(String.class));
  }
}

この場合、ブロッキング操作中にスローされたスロー可能なものは、現在の ServerErrorHandler に転送されます。これはおそらく応答にエラーページを表示します。サブスクライバーがエラー処理戦略を実装する場合、それは暗黙的なエラーハンドラの代わりに使用されます。

暗黙的なエラー処理は、Ratpack で管理されるスレッドで作成された すべての オブザーバブルに適用されます。Ratpack プロミスによってサポートされるオブザーバブルに限定されません