Skip to content

Commit 7085d38

Browse files
committed
Abstract Channel Initialization behind Client
Our server being yarpc means that there are mandatory headers needed on every request. To include these we need to add mandatory interceptors. For additional things like retries and error mapping we'll also want additional interceptors. GRPC's sync implementation allows for adding interceptors to an existing channel, while the async implementation does not. As a result, our client needs to be responsible for Channel creation. Add GRPC channel options to ClientOptions and create the Channel within Client. This approach largely matches how the Java client approaches it, although it does allow for overriding the Channel still.
1 parent 0039ea2 commit 7085d38

File tree

3 files changed

+65
-24
lines changed

3 files changed

+65
-24
lines changed

cadence/_internal/rpc/metadata.py renamed to cadence/_internal/rpc/yarpc.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,18 @@ class _ClientCallDetails(
1313
):
1414
pass
1515

16-
class MetadataInterceptor(UnaryUnaryClientInterceptor):
17-
def __init__(self, metadata: Metadata):
18-
self._metadata = metadata
16+
SERVICE_KEY = "rpc-service"
17+
CALLER_KEY = "rpc-caller"
18+
ENCODING_KEY = "rpc-encoding"
19+
ENCODING_PROTO = "proto"
20+
21+
class YarpcMetadataInterceptor(UnaryUnaryClientInterceptor):
22+
def __init__(self, service: str, caller: str):
23+
self._metadata = Metadata(
24+
(SERVICE_KEY, service),
25+
(CALLER_KEY, caller),
26+
(ENCODING_KEY, ENCODING_PROTO),
27+
)
1928

2029
async def intercept_unary_unary(
2130
self,

cadence/client.py

Lines changed: 49 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,40 @@
11
import os
22
import socket
3-
from typing import TypedDict
3+
from typing import TypedDict, Unpack, Any, cast
44

5+
from grpc import ChannelCredentials, Compression
6+
7+
from cadence._internal.rpc.yarpc import YarpcMetadataInterceptor
58
from cadence.api.v1.service_worker_pb2_grpc import WorkerAPIStub
6-
from grpc.aio import Channel
9+
from grpc.aio import Channel, ClientInterceptor, secure_channel, insecure_channel
710

811

912
class ClientOptions(TypedDict, total=False):
1013
domain: str
14+
target: str
1115
identity: str
16+
service_name: str
17+
caller_name: str
18+
channel_arguments: dict[str, Any]
19+
credentials: ChannelCredentials | None
20+
compression: Compression
21+
interceptors: list[ClientInterceptor]
22+
23+
_DEFAULT_OPTIONS: ClientOptions = {
24+
"identity": f"{os.getpid()}@{socket.gethostname()}",
25+
"service_name": "cadence-frontend",
26+
"caller_name": "cadence-client",
27+
"channel_arguments": {},
28+
"credentials": None,
29+
"compression": Compression.NoCompression,
30+
"interceptors": [],
31+
}
1232

1333
class Client:
14-
def __init__(self, channel: Channel, options: ClientOptions) -> None:
15-
self._channel = channel
16-
self._worker_stub = WorkerAPIStub(channel)
17-
self._options = options
18-
self._identity = options["identity"] if "identity" in options else f"{os.getpid()}@{socket.gethostname()}"
34+
def __init__(self, **kwargs: Unpack[ClientOptions]) -> None:
35+
self._options = _validate_and_copy_defaults(ClientOptions(**kwargs))
36+
self._channel = _create_channel(self._options)
37+
self._worker_stub = WorkerAPIStub(self._channel)
1938

2039

2140
@property
@@ -24,14 +43,35 @@ def domain(self) -> str:
2443

2544
@property
2645
def identity(self) -> str:
27-
return self._identity
46+
return self._options["identity"]
2847

2948
@property
3049
def worker_stub(self) -> WorkerAPIStub:
3150
return self._worker_stub
3251

33-
3452
async def close(self) -> None:
3553
await self._channel.close()
3654

55+
def _validate_and_copy_defaults(options: ClientOptions) -> ClientOptions:
56+
if "target" not in options:
57+
raise ValueError("target must be specified")
58+
59+
if "domain" not in options:
60+
raise ValueError("domain must be specified")
61+
62+
# Set default values for missing options
63+
for key, value in _DEFAULT_OPTIONS.items():
64+
if key not in options:
65+
cast(dict, options)[key] = value
66+
67+
return options
68+
69+
70+
def _create_channel(options: ClientOptions) -> Channel:
71+
interceptors = list(options["interceptors"])
72+
interceptors.append(YarpcMetadataInterceptor(options["service_name"], options["caller_name"]))
3773

74+
if options["credentials"]:
75+
return secure_channel(options["target"], options["credentials"], options["channel_arguments"], options["compression"], interceptors)
76+
else:
77+
return insecure_channel(options["target"], options["channel_arguments"], options["compression"], interceptors)

cadence/sample/client_example.py

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,14 @@
11
import asyncio
22

3-
from grpc.aio import insecure_channel, Metadata
43

5-
from cadence.client import Client, ClientOptions
6-
from cadence._internal.rpc.metadata import MetadataInterceptor
4+
from cadence.client import Client
75
from cadence.worker import Worker
86

97

108
async def main():
11-
# TODO - Hide all this
12-
metadata = Metadata()
13-
metadata["rpc-service"] = "cadence-frontend"
14-
metadata["rpc-encoding"] = "proto"
15-
metadata["rpc-caller"] = "nate"
16-
async with insecure_channel("localhost:7833", interceptors=[MetadataInterceptor(metadata)]) as channel:
17-
client = Client(channel, ClientOptions(domain="foo"))
18-
worker = Worker(client, "task_list")
19-
await worker.run()
9+
client = Client(target="localhost:7833", domain="foo")
10+
worker = Worker(client, "task_list")
11+
await worker.run()
2012

2113
if __name__ == '__main__':
2214
asyncio.run(main())

0 commit comments

Comments
 (0)