この記事は 10Xアドベントカレンダー2023 9日目の記事です。
昨日はSREの@_tapihさんによるKubernetes のインスタンスコストを 0.6x した話 - 10X Product Blog でした
各言語のgRPC実装にはInterceptor(インターセプター)という機能が存在します。
gRPC Serviceの呼び出しに対してInterceptorが自動で実行される仕組みなので、gRPCのリクエストに対して共通で適用したい処理を記述するのに便利です。
「gRPC interceptor」で検索すると認証・認可やロギングの用途でInterceptorを利用している方の記事がいくつか見つかります。
自分は仕事でgrpc-dartを使っているため、その使い方や実装方法を調べてみました。
今回はUnary RPC前提の話です。
Interceptorの使い方
ドキュメントはこちら
Interceptor typedef - grpc library - Dart API
An interceptor is called before the corresponding ServiceMethod invocation.
と記述がある通り、gRPC Serviceの実行の前にInterceptorが呼び出されます。
Interceptorがnullを返せばそのまま処理を続行します。
InterceptorはgRPC Serverのコンストラクタに渡すことができます。
Server.create constructor - Server - grpc library - Dart API
実装してみる
特定のService呼び出しを検知するInterceptorを実装してみます。
まずはInterceptorの作成。
FutureOr<GrpcError?>
を返す関数オブジェクトをつくります。
import 'dart:async'; import 'package:grpc/grpc.dart'; const allowedMethodList = [ 'Hoge', ]; class FilterServiceInterceptor { FutureOr<GrpcError?> call(ServiceCall call, ServiceMethod method) { print('Called interceptor'); if (!allowedMethodList.contains(method.name)) { throw GrpcError.permissionDenied('${method.name}は指定されたエンドポイントではありません'); } print('${method.name} is defined in allowedMethodList'); return null; } }
Dartには call()
メソッドを実装したクラスのオブジェクトは関数になるという仕様があり、そのテクニックを使用しています。
つづいてInterceptorをサーバーに渡します。
import 'package:grpc/grpc.dart'; import 'package:sample_rpc/src/handler/hoge_service_handler.dart'; import 'package:sample_rpc/src/interceptor/allowed_service.dart'; Future<void> main(List<String> args) async { final server = Server( [HogeServiceHandler()], <Interceptor>[FilterServiceInterceptor()], CodecRegistry(codecs: const [GzipCodec(), IdentityCodec()]), ); await server.serve(port: 50051); print('Server listening on port ${server.port}...'); }
これでgRPCサーバーを起動し HogeService.Hoge
に対してリクエストを送ってみます。
以下のようにInterceptorが呼び出されたメソッドの名前を見てログを出力します。
Server listening on port 50051... Called interceptor Hoge is defined in allowedMethodList
InterceptorにはServiceCallオブジェクトも渡すのでHTTP Headerにある情報は clientMetadata
経由で取得が可能です。
これでたいていの認証や認可ロジックは実装できるはずです。
Interceptorのテスト
Interceptorに引数として渡すServiceCallとServiceMethodさえ用意してしまえば単体テストが書けます。
単機能に絞ればテストも書きやすいです。
import 'dart:io'; import 'package:grpc/grpc.dart'; import 'package:sample_rpc/src/interceptor/allowed_service.dart'; import 'package:test/test.dart'; void main() { group('allowed_service#call', () { late MockServiceCall serviceCall; late ServiceMethod serviceMethod; late String methodName; setUp(() { serviceCall = MockServiceCall(null, null, null, null, false, false); }); test('指定されたエンドポイントであるときnullを返す', () { methodName = 'Hoge'; serviceMethod = ServiceMethod(methodName, () => {}, false, false, (List<int> req) {}, <int>(dynamic response) => [1]); final result = FilterServiceInterceptor()(serviceCall, serviceMethod); expect(result, isNull); }); test('指定されたエンドポイントでないときpermissionDeniedをあげる', () { methodName = 'unknown method name'; serviceMethod = ServiceMethod(methodName, () => {}, false, false, (List<int> req) {}, <int>(dynamic response) => [1]); expect(() => FilterServiceInterceptor()(serviceCall, serviceMethod), throwsA(TypeMatcher<GrpcError>())); }); }); } class MockServiceCall implements ServiceCall { @override final Map<String, String>? clientMetadata; @override final Map<String, String>? headers; @override final Map<String, String>? trailers; @override final DateTime? deadline; @override final bool isTimedOut; @override final bool isCanceled; MockServiceCall( this.clientMetadata, this.headers, this.trailers, this.deadline, this.isTimedOut, this.isCanceled, ); @override X509Certificate? get clientCertificate => throw UnimplementedError(); @override void sendHeaders() {} @override void sendTrailers({int? status, String? message}) {} @override InternetAddress? get remoteAddress => throw UnimplementedError(); }
できないこと
Interceptorのドキュメントにも書かれていましたが、grpc-dartのInterceptorはgRPC Service実行の "前に" 呼び出されます。
そのためServiceから返却されたレスポンスや、Serviceの処理の終了をフックしてInterceptorに渡すことができません。(2023年11月現在)
実装を読んでみるとgRPCのリクエストからServiceとmethodを特定したあと
_startStreamingRequest()
でSerivceを呼び出す前に await _applyInterceptors()
でInterceptorを実行していることがわかります。
他の言語ではどうなっているのか疑問だったのでGolang実装を読んでみると
Interceptorに handler
という引数でService Methodをラップしたオブジェクトが渡されていました。
https://github.com/grpc/grpc-go/blob/master/interceptor.go#L83-L87
Interceptorの関数内でこのhandlerを実行できるため、リクエスト前後の処理を記述することが可能です。
実際テストコード内にもInterceptor内でレスポンスを受け取って処理しているコードが登場します。
この調査をしている最中に発見しましたが、grpc-dartのリポジトリにはこの問題を解決するためのIssueが存在しており
help wanted
なラベルが付いています。困ってる人はいそうです。
[Feature Request] server-side "wrapper" interceptors · Issue #591 · grpc/grpc-dart · GitHub
おわりに
grpc-dartのInterceptorの活用法についてまとめてみました。
gRPCリクエストに対して共通処理を記述したい場合の選択肢としてはとても便利なものの、Interceptorを使ったレスポンスタイムの出力などはまだできないので工夫が必要です。
10XのgRPCサーバー実装では1つのInterceptorにいろんな役割の処理が実装されているため、まずは単体テストを書きつつそれぞれの機能を個別のInterceptorに分割していくところから改善をしていこうと思っています。
10Xアドベントカレンダー2023 は25日まで続きます。
明日は ひさいち さんによる記事です。お楽しみに。