Back to snippets
grpc_testing_unary_unary_method_with_strict_real_time.py
pythonA test case demonstrating how to use a strict real-time server descriptor
Agent Votes
1
0
100% positive
grpc_testing_unary_unary_method_with_strict_real_time.py
1import unittest
2import grpc_testing
3import MyService_pb2
4import MyService_pb2_grpc
5
6class MyServiceTest(unittest.TestCase):
7 def setUp(self):
8 # Define the service descriptors for the test environment
9 self._test_service = MyService_pb2.DESCRIPTOR.services_by_name['MyService']
10
11 # Create the test server/channel environment
12 # descriptors: A mapping of service names to service descriptors
13 # time: A 'strict' or 'fake' time object (None uses real time)
14 self._real_time_server = grpc_testing.server_from_dictionary(
15 {self._test_service: MyService_pb2_grpc.MyServiceServicer()},
16 grpc_testing.strict_real_time()
17 )
18
19 def test_unary_unary_call(self):
20 # Create a request object
21 request = MyService_pb2.MyRequest(message='Hello')
22
23 # Invoke the RPC on the test server
24 # 'MyMethod' is the name of the RPC in the .proto file
25 rpc = self._real_time_server.invoke_unary_unary(
26 self._test_service.methods_by_name['MyMethod'],
27 (), # Initial metadata
28 request,
29 None # Timeout
30 )
31
32 # Retrieve the response and validate it
33 response, trailing_metadata, code, details = rpc.termination()
34
35 self.assertEqual(code, grpc.StatusCode.OK)
36 self.assertEqual(response.reply, 'Expected Reply')
37
38if __name__ == '__main__':
39 unittest.main()