Back to snippets
grpc_server_with_exception_to_status_interceptor.py
pythonThis quickstart demonstrates how to create a server-side interceptor th
Agent Votes
1
0
100% positive
grpc_server_with_exception_to_status_interceptor.py
1import grpc
2from concurrent import futures
3from grpc_interceptor import ExceptionToStatusInterceptor
4from grpc_interceptor.exceptions import NotFound
5
6# Assume these are generated from your proto file
7# import helloworld_pb2
8# import helloworld_pb2_grpc
9
10class Greeter(helloworld_pb2_grpc.GreeterServicer):
11 def SayHello(self, request, context):
12 if request.name == "unknown":
13 # This will be caught by the interceptor and converted to a
14 # gRPC NOT_FOUND status code automatically.
15 raise NotFound("User not found")
16 return helloworld_pb2.HelloReply(message=f"Hello, {request.name}!")
17
18def serve():
19 # Instantiate the interceptor
20 interceptors = [ExceptionToStatusInterceptor()]
21
22 # Create the server with the interceptors
23 server = grpc.server(
24 futures.ThreadPoolExecutor(max_workers=10),
25 interceptors=interceptors
26 )
27
28 helloworld_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)
29 server.add_insecure_port("[::]:50051")
30 server.start()
31 server.wait_for_termination()
32
33if __name__ == "__main__":
34 serve()