|
1 | 1 | use anyhow::{Context, Result}; |
2 | 2 | use axum::response::{IntoResponse, Response}; |
3 | | -use gas::prelude::*; |
4 | 3 | use rivet_api_builder::{ |
5 | 4 | ApiError, |
6 | 5 | extract::{Extension, Json, Query}, |
7 | 6 | }; |
8 | | -use rivet_api_types::pagination::Pagination; |
| 7 | +use rivet_api_types::{actors::list::*, pagination::Pagination}; |
9 | 8 | use rivet_api_util::fanout_to_datacenters; |
10 | | -use serde::{Deserialize, Serialize}; |
11 | | -use utoipa::{IntoParams, ToSchema}; |
12 | 9 |
|
13 | 10 | use crate::{actors::utils::fetch_actors_by_ids, ctx::ApiCtx, errors}; |
14 | 11 |
|
15 | | -#[derive(Debug, Serialize, Deserialize, Clone, IntoParams)] |
16 | | -#[serde(deny_unknown_fields)] |
17 | | -#[into_params(parameter_in = Query)] |
18 | | -pub struct ListQuery { |
19 | | - pub namespace: String, |
20 | | - pub name: Option<String>, |
21 | | - pub key: Option<String>, |
22 | | - /// Deprecated. |
23 | | - #[serde(default)] |
24 | | - pub actor_ids: Option<String>, |
25 | | - #[serde(default)] |
26 | | - pub actor_id: Vec<Id>, |
27 | | - pub include_destroyed: Option<bool>, |
28 | | - pub limit: Option<usize>, |
29 | | - pub cursor: Option<String>, |
30 | | -} |
31 | | - |
32 | | -#[derive(Serialize, Deserialize, ToSchema)] |
33 | | -#[serde(deny_unknown_fields)] |
34 | | -#[schema(as = ActorsListResponse)] |
35 | | -pub struct ListResponse { |
36 | | - pub actors: Vec<rivet_types::actors::Actor>, |
37 | | - pub pagination: Pagination, |
38 | | -} |
39 | | - |
40 | 12 | /// ## Datacenter Round Trips |
41 | 13 | /// |
42 | 14 | /// **If key is some & `include_destroyed` is false** |
@@ -80,7 +52,7 @@ async fn list_inner(ctx: ApiCtx, query: ListQuery) -> Result<ListResponse> { |
80 | 52 |
|
81 | 53 | // Parse query |
82 | 54 | let actor_ids = [ |
83 | | - query.actor_id, |
| 55 | + query.actor_id.clone(), |
84 | 56 | query |
85 | 57 | .actor_ids |
86 | 58 | .as_ref() |
@@ -128,15 +100,25 @@ async fn list_inner(ctx: ApiCtx, query: ListQuery) -> Result<ListResponse> { |
128 | 100 | .ok_or_else(|| namespace::errors::Namespace::NotFound.build())?; |
129 | 101 |
|
130 | 102 | // Fetch actors |
131 | | - let actors = fetch_actors_by_ids( |
| 103 | + let mut actors = fetch_actors_by_ids( |
132 | 104 | &ctx, |
133 | 105 | actor_ids, |
134 | 106 | query.namespace.clone(), |
135 | 107 | query.include_destroyed, |
136 | | - query.limit, |
| 108 | + None, // Don't apply limit in fetch, we'll apply it after cursor filtering |
137 | 109 | ) |
138 | 110 | .await?; |
139 | 111 |
|
| 112 | + // Apply cursor filtering if provided |
| 113 | + if let Some(cursor_str) = &query.cursor { |
| 114 | + let cursor_ts: i64 = cursor_str.parse().context("invalid cursor format")?; |
| 115 | + actors.retain(|actor| actor.create_ts < cursor_ts); |
| 116 | + } |
| 117 | + |
| 118 | + // Apply limit after cursor filtering |
| 119 | + let limit = query.limit.unwrap_or(100); |
| 120 | + actors.truncate(limit); |
| 121 | + |
140 | 122 | let cursor = actors.last().map(|x| x.create_ts.to_string()); |
141 | 123 |
|
142 | 124 | Ok(ListResponse { |
@@ -201,41 +183,25 @@ async fn list_inner(ctx: ApiCtx, query: ListQuery) -> Result<ListResponse> { |
201 | 183 | .build()); |
202 | 184 | } |
203 | 185 |
|
204 | | - // Prepare peer query for local handler |
205 | | - let peer_query = rivet_api_types::actors::list::ListQuery { |
206 | | - namespace: query.namespace.clone(), |
207 | | - name: Some(query.name.as_ref().unwrap().clone()), |
208 | | - key: query.key.clone(), |
209 | | - actor_ids: None, |
210 | | - actor_id: Vec::new(), |
211 | | - include_destroyed: query.include_destroyed, |
212 | | - limit: query.limit, |
213 | | - cursor: query.cursor.clone(), |
214 | | - }; |
| 186 | + let limit = query.limit.unwrap_or(100); |
215 | 187 |
|
216 | 188 | // Fanout to all datacenters |
217 | | - let mut actors = fanout_to_datacenters::< |
218 | | - rivet_api_types::actors::list::ListResponse, |
219 | | - _, |
220 | | - _, |
221 | | - _, |
222 | | - _, |
223 | | - Vec<rivet_types::actors::Actor>, |
224 | | - >( |
225 | | - ctx.into(), |
226 | | - "/actors", |
227 | | - peer_query, |
228 | | - |ctx, query| async move { rivet_api_peer::actors::list::list(ctx, (), query).await }, |
229 | | - |_, res, agg| agg.extend(res.actors), |
230 | | - ) |
231 | | - .await?; |
| 189 | + let mut actors = |
| 190 | + fanout_to_datacenters::<ListResponse, _, _, _, _, Vec<rivet_types::actors::Actor>>( |
| 191 | + ctx.into(), |
| 192 | + "/actors", |
| 193 | + query, |
| 194 | + |ctx, query| async move { rivet_api_peer::actors::list::list(ctx, (), query).await }, |
| 195 | + |_, res, agg| agg.extend(res.actors), |
| 196 | + ) |
| 197 | + .await?; |
232 | 198 |
|
233 | 199 | // Sort by create ts desc |
234 | 200 | actors.sort_by_cached_key(|x| std::cmp::Reverse(x.create_ts)); |
235 | 201 |
|
236 | 202 | // Shorten array since returning all actors from all regions could end up returning `regions * |
237 | 203 | // limit` results, which is a lot. |
238 | | - actors.truncate(query.limit.unwrap_or(100)); |
| 204 | + actors.truncate(limit); |
239 | 205 |
|
240 | 206 | let cursor = actors.last().map(|x| x.create_ts.to_string()); |
241 | 207 |
|
|
0 commit comments