grpc-dartのInterceptorを使う

この記事は 10Xアドベントカレンダー2023 9日目の記事です。
昨日はSREの@_tapihさんによるKubernetes のインスタンスコストを 0.6x した話 - 10X Product Blog でした


各言語のgRPC実装にはInterceptor(インターセプター)という機能が存在します。
gRPC Serviceの呼び出しに対してInterceptorが自動で実行される仕組みなので、gRPCのリクエストに対して共通で適用したい処理を記述するのに便利です。

「gRPC interceptor」で検索すると認証・認可やロギングの用途でInterceptorを利用している方の記事がいくつか見つかります。

自分は仕事でgrpc-dartを使っているため、その使い方や実装方法を調べてみました。
今回はUnary RPC前提の話です。

Interceptorの使い方

ドキュメントはこちら

Interceptor typedef - grpc library - Dart API

An interceptor is called before the corresponding ServiceMethod invocation.

と記述がある通り、gRPC Serviceの実行の前にInterceptorが呼び出されます。
Interceptorがnullを返せばそのまま処理を続行します。

InterceptorはgRPC Serverのコンストラクタに渡すことができます。

Server.create constructor - Server - grpc library - Dart API

実装してみる

特定のService呼び出しを検知するInterceptorを実装してみます。

まずはInterceptorの作成。 FutureOr<GrpcError?> を返す関数オブジェクトをつくります。

import 'dart:async';

import 'package:grpc/grpc.dart';

const allowedMethodList = [
  'Hoge',
];

class FilterServiceInterceptor {
  FutureOr<GrpcError?> call(ServiceCall call, ServiceMethod method) {
    print('Called interceptor');

    if (!allowedMethodList.contains(method.name)) {
      throw GrpcError.permissionDenied('${method.name}は指定されたエンドポイントではありません');
    }

    print('${method.name} is defined in allowedMethodList');

    return null;
  }
}

Dartには call() メソッドを実装したクラスのオブジェクトは関数になるという仕様があり、そのテクニックを使用しています。

Callable objects | Dart

つづいてInterceptorをサーバーに渡します。

import 'package:grpc/grpc.dart';

import 'package:sample_rpc/src/handler/hoge_service_handler.dart';
import 'package:sample_rpc/src/interceptor/allowed_service.dart';

Future<void> main(List<String> args) async {
  final server = Server(
    [HogeServiceHandler()],
    <Interceptor>[FilterServiceInterceptor()],
    CodecRegistry(codecs: const [GzipCodec(), IdentityCodec()]),
  );
  await server.serve(port: 50051);
  print('Server listening on port ${server.port}...');
}

これでgRPCサーバーを起動し HogeService.Hoge に対してリクエストを送ってみます。
以下のようにInterceptorが呼び出されたメソッドの名前を見てログを出力します。

Server listening on port 50051...
Called interceptor
Hoge is defined in allowedMethodList

InterceptorにはServiceCallオブジェクトも渡すのでHTTP Headerにある情報は clientMetadata 経由で取得が可能です。
これでたいていの認証や認可ロジックは実装できるはずです。

Interceptorのテスト

Interceptorに引数として渡すServiceCallとServiceMethodさえ用意してしまえば単体テストが書けます。
単機能に絞ればテストも書きやすいです。

import 'dart:io';

import 'package:grpc/grpc.dart';
import 'package:sample_rpc/src/interceptor/allowed_service.dart';
import 'package:test/test.dart';

void main() {
  group('allowed_service#call', () {
    late MockServiceCall serviceCall;
    late ServiceMethod serviceMethod;
    late String methodName;

    setUp(() {
      serviceCall = MockServiceCall(null, null, null, null, false, false);
    });

    test('指定されたエンドポイントであるときnullを返す', () {
      methodName = 'Hoge';
      serviceMethod = ServiceMethod(methodName, () => {}, false, false,
          (List<int> req) {}, <int>(dynamic response) => [1]);

      final result = FilterServiceInterceptor()(serviceCall, serviceMethod);
      expect(result, isNull);
    });

    test('指定されたエンドポイントでないときpermissionDeniedをあげる', () {
      methodName = 'unknown method name';
      serviceMethod = ServiceMethod(methodName, () => {}, false, false,
          (List<int> req) {}, <int>(dynamic response) => [1]);

      expect(() => FilterServiceInterceptor()(serviceCall, serviceMethod),
          throwsA(TypeMatcher<GrpcError>()));
    });
  });
}

class MockServiceCall implements ServiceCall {
  @override
  final Map<String, String>? clientMetadata;

  @override
  final Map<String, String>? headers;

  @override
  final Map<String, String>? trailers;

  @override
  final DateTime? deadline;

  @override
  final bool isTimedOut;

  @override
  final bool isCanceled;

  MockServiceCall(
    this.clientMetadata,
    this.headers,
    this.trailers,
    this.deadline,
    this.isTimedOut,
    this.isCanceled,
  );

  @override
  X509Certificate? get clientCertificate => throw UnimplementedError();

  @override
  void sendHeaders() {}

  @override
  void sendTrailers({int? status, String? message}) {}

  @override
  InternetAddress? get remoteAddress => throw UnimplementedError();
}

できないこと

Interceptorのドキュメントにも書かれていましたが、grpc-dartのInterceptorはgRPC Service実行の "前に" 呼び出されます。
そのためServiceから返却されたレスポンスや、Serviceの処理の終了をフックしてInterceptorに渡すことができません。(2023年11月現在)

実装を読んでみるとgRPCのリクエストからServiceとmethodを特定したあと
_startStreamingRequest() でSerivceを呼び出す前に await _applyInterceptors() でInterceptorを実行していることがわかります。

https://github.com/grpc/grpc-dart/blob/d1e6c8ce1111b958b0c84d44957ef5f39c530ed0/lib/src/server/handler.dart#L177-L194

他の言語ではどうなっているのか疑問だったのでGolang実装を読んでみると
Interceptorに handler という引数でService Methodをラップしたオブジェクトが渡されていました。

https://github.com/grpc/grpc-go/blob/master/interceptor.go#L83-L87

Interceptorの関数内でこのhandlerを実行できるため、リクエスト前後の処理を記述することが可能です。
実際テストコード内にもInterceptor内でレスポンスを受け取って処理しているコードが登場します。

https://github.com/grpc/grpc-go/blob/02ea031697b40d379d7364ef75d0b131590557d8/test/server_test.go#L87

この調査をしている最中に発見しましたが、grpc-dartのリポジトリにはこの問題を解決するためのIssueが存在しており
help wanted なラベルが付いています。困ってる人はいそうです。

[Feature Request] server-side "wrapper" interceptors · Issue #591 · grpc/grpc-dart · GitHub

おわりに

grpc-dartのInterceptorの活用法についてまとめてみました。
gRPCリクエストに対して共通処理を記述したい場合の選択肢としてはとても便利なものの、Interceptorを使ったレスポンスタイムの出力などはまだできないので工夫が必要です。

10XのgRPCサーバー実装では1つのInterceptorにいろんな役割の処理が実装されているため、まずは単体テストを書きつつそれぞれの機能を個別のInterceptorに分割していくところから改善をしていこうと思っています。


10Xアドベントカレンダー2023 は25日まで続きます。

明日は ひさいち さんによる記事です。お楽しみに。