Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions bin/dev-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilte

fn main() {
let tree_layer = tracing_tree::HierarchicalLayer::new(2)
.with_writer(std::io::stdout)
.with_bracketed_fields(true)
.with_deferred_spans(false)
.with_wraparound(25)
Expand Down
1 change: 1 addition & 0 deletions bin/router/src/pipeline/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ pub async fn execute_plan(
query_plan: query_plan_payload,
projection_plan: &normalized_payload.projection_plan,
variable_values: &variable_payload.variables_map,
upstream_headers: req.headers(),
extensions,
introspection_context: &introspection_context,
operation_type_name: normalized_payload.root_type_name,
Expand Down
1 change: 1 addition & 0 deletions lib/executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ async-trait = { workspace = true }
futures = { workspace = true }
http = { workspace = true }
http-body-util = { workspace = true }
ntex-http = "0.1.15"
hyper = { workspace = true, features = ["client"] }
serde = { workspace = true }
sonic-rs = { workspace = true }
Expand Down
7 changes: 7 additions & 0 deletions lib/executor/src/execution/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use hive_router_query_planner::planner::plan_nodes::{
ConditionNode, FetchNode, FetchRewrite, FlattenNode, FlattenNodePath, ParallelNode, PlanNode,
QueryPlan, SequenceNode,
};
use ntex_http::HeaderMap;
use serde::Deserialize;
use sonic_rs::ValueRef;

Expand Down Expand Up @@ -36,6 +37,7 @@ pub struct QueryPlanExecutionContext<'exec> {
pub query_plan: &'exec QueryPlan,
pub projection_plan: &'exec Vec<FieldProjectionPlan>,
pub variable_values: &'exec Option<HashMap<String, sonic_rs::Value>>,
pub upstream_headers: &'exec HeaderMap,
pub extensions: Option<HashMap<String, sonic_rs::Value>>,
pub introspection_context: &'exec IntrospectionContext<'exec, 'static>,
pub operation_type_name: &'exec str,
Expand All @@ -60,6 +62,7 @@ pub async fn execute_query_plan<'exec>(
ctx.introspection_context.metadata,
// Deduplicate subgraph requests only if the operation type is a query
ctx.operation_type_name == "Query",
ctx.upstream_headers,
);
executor
.execute(&mut exec_ctx, ctx.query_plan.node.as_ref())
Expand All @@ -83,6 +86,7 @@ pub struct Executor<'exec> {
schema_metadata: &'exec SchemaMetadata,
executors: &'exec SubgraphExecutorMap,
dedupe_subgraph_requests: bool,
upstream_headers: &'exec HeaderMap,
}

struct ConcurrencyScope<'exec, T> {
Expand Down Expand Up @@ -150,12 +154,14 @@ impl<'exec> Executor<'exec> {
executors: &'exec SubgraphExecutorMap,
schema_metadata: &'exec SchemaMetadata,
dedupe_subgraph_requests: bool,
upstream_headers: &'exec HeaderMap,
) -> Self {
Executor {
variable_values,
executors,
schema_metadata,
dedupe_subgraph_requests,
upstream_headers,
}
}

Expand Down Expand Up @@ -533,6 +539,7 @@ impl<'exec> Executor<'exec> {
operation_name: node.operation_name.as_deref(),
variables: None,
representations,
upstream_headers: self.upstream_headers,
},
)
.await,
Expand Down
2 changes: 2 additions & 0 deletions lib/executor/src/executors/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::{collections::HashMap, sync::Arc};

use async_trait::async_trait;
use bytes::Bytes;
use ntex_http::HeaderMap;

#[async_trait]
pub trait SubgraphExecutor {
Expand All @@ -21,6 +22,7 @@ pub type SubgraphExecutorBoxedArc = Arc<Box<SubgraphExecutorType>>;
pub struct HttpExecutionRequest<'a> {
pub query: &'a str,
pub dedupe: bool,
pub upstream_headers: &'a HeaderMap,
pub operation_name: Option<&'a str>,
// TODO: variables could be stringified before even executing the request
pub variables: Option<HashMap<&'a str, &'a sonic_rs::Value>>,
Expand Down
91 changes: 39 additions & 52 deletions lib/executor/src/executors/dedupe.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use ahash::AHasher;
use bytes::Bytes;
use http::{HeaderMap, Method, StatusCode, Uri};
use ntex_http::HeaderMap as NtexHeaderMap;
use std::collections::BTreeMap;
use std::hash::{BuildHasherDefault, Hash, Hasher};
use std::hash::{BuildHasher, BuildHasherDefault, Hash, Hasher};

#[derive(Debug, Clone)]
pub struct SharedResponse {
Expand All @@ -11,66 +12,52 @@ pub struct SharedResponse {
pub body: Bytes,
}

#[derive(Debug, Clone, Eq)]
pub struct RequestFingerprint {
method: Method,
url: Uri,
/// BTreeMap to ensure case-insensitivity and consistent order for hashing
headers: BTreeMap<String, String>,
body: Vec<u8>,
}
pub fn request_fingerprint(
method: &Method,
url: &Uri,
req_headers: &HeaderMap,
upstream_headers: &NtexHeaderMap,
body_bytes: &[u8],
fingerprint_headers: &[String],
) -> u64 {
let build_hasher = ABuildHasher::default();
let mut hasher = build_hasher.build_hasher();

// BTreeMap to ensure case-insensitivity and consistent order for hashing
let mut headers = BTreeMap::new();
if fingerprint_headers.is_empty() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm pondering on what should be the default case when the list is set to empty value. should it be all headers? or no headers at all?

// fingerprint all headers

impl RequestFingerprint {
pub fn new(
method: &Method,
url: &Uri,
req_headers: &HeaderMap,
body_bytes: &[u8],
fingerprint_headers: &[String],
) -> Self {
let mut headers = BTreeMap::new();
if fingerprint_headers.is_empty() {
// fingerprint all headers
for (key, value) in req_headers.iter() {
for (key, value) in req_headers.iter() {
if let Ok(value_str) = value.to_str() {
headers.insert(key.as_str(), value_str);
}
}
for (key, value) in upstream_headers.iter() {
if let Ok(value_str) = value.to_str() {
headers.insert(key.as_str(), value_str);
Copy link
Member

@ardatan ardatan Sep 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we respect the gateway request headers in the fingerprint?
If a subgraph request doesn't use anything from the gateway request, then they can be deduplicated safely. So I think we should only respect subgraph request's headers. A subgraph request that doesn't use anything from GW request can be shared with other GW request that doesn't have a subgraph request uses the GW request headers no?

GW Request from A client -> Subgraph request w/ nothing propagated from A but the body etc are the same with the one below
GW Request from B client -> Subgraph request w/ nothing propagated from B but the body etc are the same with one above

And those subgraph requests can be deduplicated. But if we add GW request headers to the fingerprint then it won't be possible to dedupe these two.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if the gateway receives a request with Authorization: Bearer xyz, but in the request to the subgraph, you chose to send this information in some other form, like through the extensions or something else?

Copy link
Member

@ardatan ardatan Sep 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it is in the extensions then it changes the request body which is also included in the fingerprint, no?
So the fingerprint should only respect what is sent to the subgraph. If it has anything specific for that GW request, then it will already change the subgraph request headers, body or even the endpoint, so not sure if having GW request headers or anything GW request specific in the fingerprint is needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, but what when you want to deduplicate based on a custom header that is not sent to the subgraph? Like a feature flag or whatever. Subgraph calls will be the same, but for some reason you want to execute those anyway.

If I follow your example, then why we even need to selectively pick headers, and not just hash the whole thing?

Deduplicating purely on a subgraph call and rely only on that, means users have no control over what is deduplicated and what is not based on the upstream call.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then we can have a configuration option to modify the way of creating the fingerprint instead of adding all of the headers from the GW request to the fingerprint. By default, it can use subgraph request details then we can make the fingerprint elements configurable by the user?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An option to specifically pass the gw headers? Come on :) Passing them by default has 0 cost.
How can you make the elements configurable? For extensions for example. We would introduce some weird syntax for no reason.

Copy link
Member

@ardatan ardatan Sep 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the fingerprint is the hash of = request body + headers, then why we need a weird syntax for extensions? Extensions are part of the body. If the subgraph request has anything from GW request headers, then it will change the body or the headers, so the fingerprint will be different. If the user needs to send that query without deduplication then it can be disabled by them or fingerprint components can be configured but not sure if that's needed for 90% of cases. The subgraph request with a "query" operation will be safely deduplicated if the request body(query, variables, extensions which is the JSON body sent) + headers, then we are good in most cases.
I'm not talking about the cost of adding GW request headers to the fingerprint but the efficiency of the deduplication.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

explain to me the efficiency of the fingerprint

Copy link
Member

@ardatan ardatan Sep 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The right word is the efficiency of the deduplication actually.
Let's say I have two requests sent to the GW with different auth headers, and they both have a subgraph request which are identical per body (query, variables and extensions) and headers(content-type etc but no auth header.
Then adding GW request headers to the fingerprint will not deduplicate these identical inflight subgraph requests because you have GW request headers in the fingerprint. But if we only respect what's sent(subgraph request), then these will be deduped which is more efficient or I am fully missing the point.

}
}
} else {
for header_name in fingerprint_headers.iter() {
if let Some(value) = req_headers.get(header_name) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understood this one correctly, then it means that it will filter the downstream (to-subgraph) as well, and we talked about now allowing the user to change these? 👀

if let Ok(value_str) = value.to_str() {
headers.insert(key.as_str().to_lowercase(), value_str.to_string());
headers.insert(header_name, value_str);
}
}
} else {
for header_name in fingerprint_headers.iter() {
if let Some(value) = req_headers.get(header_name) {
if let Ok(value_str) = value.to_str() {
headers.insert(header_name.to_lowercase(), value_str.to_string());
}
} else if let Some(value) = upstream_headers.get(header_name) {
if let Ok(value_str) = value.to_str() {
headers.insert(header_name, value_str);
}
}
}

Self {
method: method.clone(),
url: url.clone(),
headers,
body: body_bytes.to_vec(),
}
}
}

impl Hash for RequestFingerprint {
fn hash<H: Hasher>(&self, state: &mut H) {
self.method.hash(state);
self.url.hash(state);
self.headers.hash(state);
self.body.hash(state);
}
}
method.hash(&mut hasher);
url.hash(&mut hasher);
headers.hash(&mut hasher);
body_bytes.hash(&mut hasher);

impl PartialEq for RequestFingerprint {
fn eq(&self, other: &Self) -> bool {
self.method == other.method
&& self.url == other.url
&& self.headers == other.headers
&& self.body == other.body
}
hasher.finish()
}

pub type ABuildHasher = BuildHasherDefault<AHasher>;
15 changes: 7 additions & 8 deletions lib/executor/src/executors/http.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::sync::Arc;

use crate::executors::dedupe::{ABuildHasher, RequestFingerprint, SharedResponse};
use crate::executors::dedupe::request_fingerprint;
use crate::executors::dedupe::{ABuildHasher, SharedResponse};
use dashmap::DashMap;
use hive_router_config::traffic_shaping::TrafficShapingExecutorConfig;
use tokio::sync::OnceCell;
Expand Down Expand Up @@ -33,8 +34,7 @@ pub struct HTTPSubgraphExecutor {
pub header_map: HeaderMap,
pub semaphore: Arc<Semaphore>,
pub config: Arc<TrafficShapingExecutorConfig>,
pub in_flight_requests:
Arc<DashMap<RequestFingerprint, Arc<OnceCell<SharedResponse>>, ABuildHasher>>,
pub in_flight_requests: Arc<DashMap<u64, Arc<OnceCell<SharedResponse>>, ABuildHasher>>,
}

const FIRST_VARIABLE_STR: &[u8] = b",\"variables\":{";
Expand All @@ -46,9 +46,7 @@ impl HTTPSubgraphExecutor {
http_client: Arc<Client<HttpsConnector<HttpConnector>, Full<Bytes>>>,
semaphore: Arc<Semaphore>,
config: Arc<TrafficShapingExecutorConfig>,
in_flight_requests: Arc<
DashMap<RequestFingerprint, Arc<OnceCell<SharedResponse>>, ABuildHasher>,
>,
in_flight_requests: Arc<DashMap<u64, Arc<OnceCell<SharedResponse>>, ABuildHasher>>,
) -> Self {
let mut header_map = HeaderMap::new();
header_map.insert(
Expand Down Expand Up @@ -184,10 +182,11 @@ impl SubgraphExecutor for HTTPSubgraphExecutor {
};
}

let fingerprint = RequestFingerprint::new(
let fingerprint = request_fingerprint(
&http::Method::POST,
&self.endpoint,
&self.header_map,
execution_request.upstream_headers,
&body,
&self.config.dedupe_fingerprint_headers,
);
Expand All @@ -196,7 +195,7 @@ impl SubgraphExecutor for HTTPSubgraphExecutor {
// Prevents any deadlocks.
let cell = self
.in_flight_requests
.entry(fingerprint.clone())
.entry(fingerprint)
.or_default()
.value()
.clone();
Expand Down
7 changes: 3 additions & 4 deletions lib/executor/src/executors/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use tokio::sync::{OnceCell, Semaphore};
use crate::{
executors::{
common::{HttpExecutionRequest, SubgraphExecutor, SubgraphExecutorBoxedArc},
dedupe::{ABuildHasher, RequestFingerprint, SharedResponse},
dedupe::{ABuildHasher, SharedResponse},
error::SubgraphExecutorError,
http::HTTPSubgraphExecutor,
},
Expand Down Expand Up @@ -81,9 +81,8 @@ impl SubgraphExecutorMap {
let semaphores_by_origin: DashMap<String, Arc<Semaphore>> = DashMap::new();
let max_connections_per_host = config.max_connections_per_host;
let config_arc = Arc::new(config);
let in_flight_requests: Arc<
DashMap<RequestFingerprint, Arc<OnceCell<SharedResponse>>, ABuildHasher>,
> = Arc::new(DashMap::with_hasher(ABuildHasher::default()));
let in_flight_requests: Arc<DashMap<u64, Arc<OnceCell<SharedResponse>>, ABuildHasher>> =
Arc::new(DashMap::with_hasher(ABuildHasher::default()));

let executor_map = subgraph_endpoint_map
.into_iter()
Expand Down
19 changes: 18 additions & 1 deletion lib/router-config/src/traffic_shaping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ pub struct TrafficShapingExecutorConfig {
/// A list of headers that should be used to fingerprint requests for deduplication.
///
/// If not provided, the default is to use the "authorization" header only.
#[serde(default = "default_dedupe_fingerprint_headers")]
#[serde(
default = "default_dedupe_fingerprint_headers",
deserialize_with = "deserialize_and_normalize_dedupe_fingerprint_headers"
)]
pub dedupe_fingerprint_headers: Vec<String>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this could already be HeaderName instead of String, and then you can deserialize it (and validate that the string is an actual valid header name) while parsing

}

Expand Down Expand Up @@ -51,3 +54,17 @@ fn default_dedupe_enabled() -> bool {
fn default_dedupe_fingerprint_headers() -> Vec<String> {
vec!["authorization".to_string()]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this also include Cookie? It's fairly common (and more secure) to use it instead of Authorization header

}

fn deserialize_and_normalize_dedupe_fingerprint_headers<'de, D>(
deserializer: D,
) -> Result<Vec<String>, D::Error>
where
D: serde::Deserializer<'de>,
{
let headers: Vec<String> = Deserialize::deserialize(deserializer)?;
Ok(normalize_dedupe_fingerprint_headers(headers))
}

fn normalize_dedupe_fingerprint_headers(headers: Vec<String>) -> Vec<String> {
headers.into_iter().map(|h| h.to_lowercase()).collect()
}
Loading