From 12f7012d9d4eb160da2aa95460cc450fe819033a Mon Sep 17 00:00:00 2001 From: quettabit Date: Sun, 10 Aug 2025 18:01:43 -0600 Subject: [PATCH] initial commit --- src/streamstore/_client.py | 10 ++++++---- src/streamstore/_mappers.py | 17 ++++++++--------- src/streamstore/schemas.py | 3 ++- 3 files changed, 16 insertions(+), 14 deletions(-) diff --git a/src/streamstore/_client.py b/src/streamstore/_client.py index 62e2fe3..c9c98c9 100644 --- a/src/streamstore/_client.py +++ b/src/streamstore/_client.py @@ -435,7 +435,7 @@ async def issue_access_token( timeout=self._config.rpc.timeout, metadata=self._config.rpc.metadata, ) - return response.token + return response.access_token @fallible async def list_access_tokens( @@ -449,7 +449,9 @@ async def list_access_tokens( start_after: Filter to access tokens whose ID lexicographically starts after this value. limit: Number of results, up to a maximum of 1000. """ - request = ListAccessTokensRequest(prefix, start_after, limit) + request = ListAccessTokensRequest( + prefix=prefix, start_after=start_after, limit=limit + ) response = await self._retrier( self._stub.ListAccessTokens, request, @@ -457,7 +459,7 @@ async def list_access_tokens( metadata=self._config.rpc.metadata, ) return schemas.Page( - items=[access_token_info_schema(info) for info in response.tokens], + items=[access_token_info_schema(info) for info in response.access_tokens], has_more=response.has_more, ) @@ -469,7 +471,7 @@ async def revoke_access_token(self, id: str) -> schemas.AccessTokenInfo: Args: id: Access token ID. """ - request = RevokeAccessTokenRequest(id) + request = RevokeAccessTokenRequest(id=id) response = await self._retrier( self._stub.RevokeAccessToken, request, diff --git a/src/streamstore/_mappers.py b/src/streamstore/_mappers.py index d6ed45c..46336b8 100644 --- a/src/streamstore/_mappers.py +++ b/src/streamstore/_mappers.py @@ -132,18 +132,15 @@ def stream_config_message( delete_on_empty_min_age = config.delete_on_empty_min_age if storage_class is not None: paths.append(f"{mask_path_prefix}storage_class") - stream_config.storage_class = msgs.StorageClass(storage_class.value) + stream_config.storage_class = storage_class.value if retention_age is not None: paths.append(f"{mask_path_prefix}retention_policy") stream_config.age = retention_age if timestamping is not None: paths.append(f"{mask_path_prefix}timestamping") - stream_config.timestamping = msgs.StreamConfig.Timestamping() if timestamping.mode is not None: paths.append(f"{mask_path_prefix}timestamping.mode") - stream_config.timestamping.mode = msgs.TimestampingMode( - timestamping.mode.value - ) + stream_config.timestamping.mode = timestamping.mode.value if timestamping.uncapped is not None: paths.append(f"{mask_path_prefix}timestamping.uncapped") stream_config.timestamping.uncapped = timestamping.uncapped @@ -176,7 +173,7 @@ def basin_config_message( default_stream_config = cast( msgs.StreamConfig, stream_config_message(config.default_stream_config) ) - basin_config.default_stream_config = default_stream_config + basin_config.default_stream_config.CopyFrom(default_stream_config) if config.create_stream_on_append is not None: basin_config.create_stream_on_append = config.create_stream_on_append paths.append("create_stream_on_append") @@ -266,7 +263,7 @@ def permissions(perm: Permission) -> msgs.ReadWritePermissions: case Permission.READ_WRITE: read = True write = True - return msgs.ReadWritePermissions(read, write) + return msgs.ReadWritePermissions(read=read, write=write) def permitted_op_groups( op_group_perms: OperationGroupPermissions | None, @@ -288,13 +285,15 @@ def permitted_op_groups( streams=resource_set(scope.streams), access_tokens=resource_set(scope.access_tokens), op_groups=permitted_op_groups(scope.op_group_perms), - ops=(msgs.Operation(op.value) for op in scope.ops) if scope.ops else None, + ops=(op.value for op in scope.ops), ), ) def access_token_info_schema(info: msgs.AccessTokenInfo) -> AccessTokenInfo: - def resource_match_rule(resource_set: msgs.ResourceSet) -> ResourceMatchRule: + def resource_match_rule(resource_set: msgs.ResourceSet) -> ResourceMatchRule | None: + if not resource_set.HasField("matching"): + return None match resource_set.WhichOneof("matching"): case "exact": return ResourceMatchRule(ResourceMatchOp.EXACT, resource_set.exact) diff --git a/src/streamstore/schemas.py b/src/streamstore/schemas.py index 4e0dbb3..e3f4b9a 100644 --- a/src/streamstore/schemas.py +++ b/src/streamstore/schemas.py @@ -258,6 +258,7 @@ class TimestampingMode(DocEnum): The arrival time is always in milliseconds since Unix epoch. """ + UNSPECIFIED = 0, "Defaults to ``CLIENT_PREFER``." CLIENT_PREFER = ( 1, "Prefer client-specified timestamp if present, otherwise use arrival time.", @@ -414,7 +415,7 @@ class AccessTokenScope: #: #: Note: #: A union of allowed operations and groups is used as the effective set of allowed operations. - ops: list[Operation] | None = None + ops: list[Operation] = field(default_factory=list) @dataclass(slots=True)