Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
475 changes: 475 additions & 0 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,4 @@ xxhash-rust = { version = "0.8.15", features = ["xxh3"] }
tokio = { version = "1.47.1", features = ["full"] }
tokio-util = { version = "0.7.16" }
rand = "0.9.2"
reqwest = "0.12.23"
2 changes: 2 additions & 0 deletions bin/router/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ futures = { workspace = true }
graphql-parser = { workspace = true }
graphql-tools = { workspace = true }
serde = { workspace = true }
reqwest = { workspace = true }
sonic-rs = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }
Expand All @@ -42,3 +43,4 @@ mimalloc = { version = "0.1.47", features = ["override"] }
moka = { version = "0.12.10", features = ["future"] }
ulid = "1.2.1"
ntex = { version = "2", features = ["tokio"] }
arc-swap = "1.7.1"
17 changes: 11 additions & 6 deletions bin/router/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ mod http_utils;
mod logger;
mod pipeline;
mod shared_state;
mod supergraph;
mod supergraph_mgr;

use std::sync::Arc;

Expand All @@ -10,6 +12,7 @@ use crate::{
logger::configure_logging,
pipeline::graphql_request_handler,
shared_state::RouterSharedState,
supergraph_mgr::SupergraphManager,
};

use hive_router_config::load_config;
Expand All @@ -18,29 +21,31 @@ use ntex::{
web::{self, HttpRequest},
};

use hive_router_query_planner::utils::parsing::parse_schema;

async fn graphql_endpoint_handler(
mut request: HttpRequest,
body_bytes: Bytes,
supergraph_manager: web::types::State<Arc<SupergraphManager>>,
app_state: web::types::State<Arc<RouterSharedState>>,
) -> impl web::Responder {
graphql_request_handler(&mut request, body_bytes, app_state.get_ref()).await
let supergraph = supergraph_manager.current();

graphql_request_handler(&mut request, body_bytes, &supergraph, app_state.get_ref()).await
}

pub async fn router_entrypoint() -> Result<(), Box<dyn std::error::Error>> {
let config_path = std::env::var("ROUTER_CONFIG_FILE_PATH").ok();
let router_config = load_config(config_path)?;
configure_logging(&router_config.log);

let supergraph_sdl = router_config.supergraph.load().await?;
let parsed_schema = parse_schema(&supergraph_sdl);
let addr = router_config.http.address();
let shared_state = RouterSharedState::new(parsed_schema, router_config);

let supergraph_manager = Arc::new(SupergraphManager::new_from_config(&router_config).await?);
let shared_state = Arc::new(RouterSharedState::new(router_config));

web::HttpServer::new(move || {
web::App::new()
.state(shared_state.clone())
.state(supergraph_manager.clone())
.route("/graphql", web::to(graphql_endpoint_handler))
.route("/health", web::to(health_check_handler))
.default_service(web::to(landing_page_handler))
Expand Down
6 changes: 3 additions & 3 deletions bin/router/src/pipeline/coerce_variables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use tracing::{error, trace, warn};
use crate::pipeline::error::{PipelineError, PipelineErrorFromAcceptHeader, PipelineErrorVariant};
use crate::pipeline::execution_request::ExecutionRequest;
use crate::pipeline::normalize::GraphQLNormalizationPayload;
use crate::shared_state::RouterSharedState;
use crate::supergraph_mgr::SupergraphData;

#[derive(Clone, Debug)]
pub struct CoerceVariablesPayload {
Expand All @@ -21,7 +21,7 @@ pub struct CoerceVariablesPayload {
#[inline]
pub fn coerce_request_variables(
req: &HttpRequest,
app_state: &Arc<RouterSharedState>,
supergraph: &Arc<SupergraphData>,
execution_params: ExecutionRequest,
normalized_operation: &Arc<GraphQLNormalizationPayload>,
) -> Result<CoerceVariablesPayload, PipelineError> {
Expand All @@ -38,7 +38,7 @@ pub fn coerce_request_variables(
match collect_variables(
&normalized_operation.operation_for_plan,
execution_params.variables,
&app_state.schema_metadata,
&supergraph.metadata,
) {
Ok(values) => {
trace!(
Expand Down
8 changes: 5 additions & 3 deletions bin/router/src/pipeline/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::pipeline::coerce_variables::CoerceVariablesPayload;
use crate::pipeline::error::{PipelineError, PipelineErrorFromAcceptHeader, PipelineErrorVariant};
use crate::pipeline::normalize::GraphQLNormalizationPayload;
use crate::shared_state::RouterSharedState;
use crate::supergraph_mgr::SupergraphData;
use hive_router_plan_executor::execute_query_plan;
use hive_router_plan_executor::execution::plan::QueryPlanExecutionContext;
use hive_router_plan_executor::introspection::resolve::IntrospectionContext;
Expand All @@ -25,6 +26,7 @@ enum ExposeQueryPlanMode {
#[inline]
pub async fn execute_plan(
req: &mut HttpRequest,
supergraph: &Arc<SupergraphData>,
app_state: &Arc<RouterSharedState>,
normalized_payload: &Arc<GraphQLNormalizationPayload>,
query_plan_payload: &Arc<QueryPlan>,
Expand Down Expand Up @@ -57,8 +59,8 @@ pub async fn execute_plan(

let introspection_context = IntrospectionContext {
query: normalized_payload.operation_for_introspection.as_ref(),
schema: &app_state.planner.consumer_schema.document,
metadata: &app_state.schema_metadata,
schema: &supergraph.planner.consumer_schema.document,
metadata: &supergraph.metadata,
};

execute_query_plan(QueryPlanExecutionContext {
Expand All @@ -68,7 +70,7 @@ pub async fn execute_plan(
extensions,
introspection_context: &introspection_context,
operation_type_name: normalized_payload.root_type_name,
executors: &app_state.subgraph_executor_map,
executors: &supergraph.subgraph_executor_map,
})
.await
.map(Bytes::from)
Expand Down
14 changes: 10 additions & 4 deletions bin/router/src/pipeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use crate::{
validation::validate_operation_with_cache,
},
shared_state::RouterSharedState,
supergraph_mgr::SupergraphData,
};

pub mod coerce_variables;
Expand All @@ -43,6 +44,7 @@ static GRAPHIQL_HTML: &str = include_str!("../../static/graphiql.html");
pub async fn graphql_request_handler(
req: &mut HttpRequest,
body_bytes: Bytes,
supergraph: &Arc<SupergraphData>,
state: &Arc<RouterSharedState>,
) -> impl web::Responder {
if req.method() == Method::GET && req.accepts_content_type(*TEXT_HTML_CONTENT_TYPE) {
Expand All @@ -51,7 +53,7 @@ pub async fn graphql_request_handler(
.body(GRAPHIQL_HTML);
}

match execute_pipeline(req, body_bytes, state).await {
match execute_pipeline(req, body_bytes, supergraph, state).await {
Ok(response_bytes) => {
let response_content_type: &'static HeaderValue =
if req.accepts_content_type(*APPLICATION_GRAPHQL_RESPONSE_JSON_STR) {
Expand All @@ -72,23 +74,26 @@ pub async fn graphql_request_handler(
pub async fn execute_pipeline(
req: &mut HttpRequest,
body_bytes: Bytes,
supergraph: &Arc<SupergraphData>,
state: &Arc<RouterSharedState>,
) -> Result<Bytes, PipelineError> {
let execution_request = get_execution_request(req, body_bytes).await?;
let parser_payload = parse_operation_with_cache(req, state, &execution_request).await?;
validate_operation_with_cache(req, state, &parser_payload).await?;
validate_operation_with_cache(req, supergraph, state, &parser_payload).await?;

let progressive_override_ctx = request_override_context()?;
let normalize_payload =
normalize_request_with_cache(req, state, &execution_request, &parser_payload).await?;
normalize_request_with_cache(req, supergraph, state, &execution_request, &parser_payload)
.await?;
let variable_payload =
coerce_request_variables(req, state, execution_request, &normalize_payload)?;
coerce_request_variables(req, supergraph, execution_request, &normalize_payload)?;

let query_plan_cancellation_token =
CancellationToken::with_timeout(state.router_config.query_planner.timeout);

let query_plan_payload = plan_operation_with_cache(
req,
supergraph,
state,
&normalize_payload,
&progressive_override_ctx,
Expand All @@ -98,6 +103,7 @@ pub async fn execute_pipeline(

let execution_result = execute_plan(
req,
supergraph,
state,
&normalize_payload,
&query_plan_payload,
Expand Down
6 changes: 4 additions & 2 deletions bin/router/src/pipeline/normalize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::pipeline::error::{PipelineError, PipelineErrorFromAcceptHeader, Pipel
use crate::pipeline::execution_request::ExecutionRequest;
use crate::pipeline::parser::GraphQLParserPayload;
use crate::shared_state::RouterSharedState;
use crate::supergraph_mgr::SupergraphData;
use tracing::{error, trace};

#[derive(Debug)]
Expand All @@ -26,6 +27,7 @@ pub struct GraphQLNormalizationPayload {
#[inline]
pub async fn normalize_request_with_cache(
req: &HttpRequest,
supergraph: &Arc<SupergraphData>,
app_state: &Arc<RouterSharedState>,
execution_params: &ExecutionRequest,
parser_payload: &GraphQLParserPayload,
Expand All @@ -51,7 +53,7 @@ pub async fn normalize_request_with_cache(
Ok(payload)
}
None => match normalize_operation(
&app_state.planner.supergraph,
&supergraph.planner.supergraph,
&parser_payload.parsed_operation,
execution_params.operation_name.as_deref(),
) {
Expand All @@ -64,7 +66,7 @@ pub async fn normalize_request_with_cache(

let operation = doc.operation;
let (root_type_name, projection_plan) =
FieldProjectionPlan::from_operation(&operation, &app_state.schema_metadata);
FieldProjectionPlan::from_operation(&operation, &supergraph.metadata);
let partitioned_operation = partition_operation(operation);

let payload = GraphQLNormalizationPayload {
Expand Down
6 changes: 4 additions & 2 deletions bin/router/src/pipeline/query_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::pipeline::error::{PipelineError, PipelineErrorFromAcceptHeader, Pipel
use crate::pipeline::normalize::GraphQLNormalizationPayload;
use crate::pipeline::progressive_override::{RequestOverrideContext, StableOverrideContext};
use crate::shared_state::RouterSharedState;
use crate::supergraph_mgr::SupergraphData;
use hive_router_query_planner::planner::plan_nodes::QueryPlan;
use hive_router_query_planner::utils::cancellation::CancellationToken;
use ntex::web::HttpRequest;
Expand All @@ -13,13 +14,14 @@ use xxhash_rust::xxh3::Xxh3;
#[inline]
pub async fn plan_operation_with_cache(
req: &HttpRequest,
supergraph: &Arc<SupergraphData>,
app_state: &Arc<RouterSharedState>,
normalized_operation: &Arc<GraphQLNormalizationPayload>,
request_override_context: &RequestOverrideContext,
cancellation_token: &CancellationToken,
) -> Result<Arc<QueryPlan>, PipelineError> {
let stable_override_context =
StableOverrideContext::new(&app_state.planner.supergraph, request_override_context);
StableOverrideContext::new(&supergraph.planner.supergraph, request_override_context);

let filtered_operation_for_plan = &normalized_operation.operation_for_plan;
let plan_cache_key =
Expand All @@ -37,7 +39,7 @@ pub async fn plan_operation_with_cache(
}));
}

app_state
supergraph
.planner
.plan_from_normalized_operation(
filtered_operation_for_plan,
Expand Down
4 changes: 3 additions & 1 deletion bin/router/src/pipeline/validation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,19 @@ use std::sync::Arc;
use crate::pipeline::error::{PipelineError, PipelineErrorFromAcceptHeader, PipelineErrorVariant};
use crate::pipeline::parser::GraphQLParserPayload;
use crate::shared_state::RouterSharedState;
use crate::supergraph_mgr::SupergraphData;
use graphql_tools::validation::validate::validate;
use ntex::web::HttpRequest;
use tracing::{error, trace};

#[inline]
pub async fn validate_operation_with_cache(
req: &HttpRequest,
supergraph: &Arc<SupergraphData>,
app_state: &Arc<RouterSharedState>,
parser_payload: &GraphQLParserPayload,
) -> Result<(), PipelineError> {
let consumer_schema_ast = &app_state.planner.consumer_schema.document;
let consumer_schema_ast = &supergraph.planner.consumer_schema.document;

let validation_result = match app_state
.validate_cache
Expand Down
36 changes: 4 additions & 32 deletions bin/router/src/shared_state.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,14 @@
use std::sync::Arc;

use graphql_parser::schema::Document;
use graphql_tools::validation::{utils::ValidationError, validate::ValidationPlan};
use hive_router_config::HiveRouterConfig;
use hive_router_plan_executor::{
introspection::schema::{SchemaMetadata, SchemaWithMetadata},
SubgraphExecutorMap,
};
use hive_router_query_planner::{
planner::{plan_nodes::QueryPlan, Planner},
state::supergraph_state::SupergraphState,
};
use hive_router_query_planner::planner::plan_nodes::QueryPlan;
use moka::future::Cache;

use crate::pipeline::normalize::GraphQLNormalizationPayload;

pub struct RouterSharedState {
pub schema_metadata: SchemaMetadata,
pub planner: Planner,
pub validation_plan: ValidationPlan,
pub subgraph_executor_map: SubgraphExecutorMap,
pub plan_cache: Cache<u64, Arc<QueryPlan>>,
pub validate_cache: Cache<u64, Arc<Vec<ValidationError>>>,
pub parse_cache: Cache<u64, Arc<graphql_parser::query::Document<'static, String>>>,
Expand All @@ -28,31 +17,14 @@ pub struct RouterSharedState {
}

impl RouterSharedState {
pub fn new(
parsed_supergraph_sdl: Document<'static, String>,
router_config: HiveRouterConfig,
) -> Arc<Self> {
let supergraph_state = SupergraphState::new(&parsed_supergraph_sdl);
let planner =
Planner::new_from_supergraph(&parsed_supergraph_sdl).expect("failed to create planner");
let schema_metadata = planner.consumer_schema.schema_metadata();

let subgraph_executor_map = SubgraphExecutorMap::from_http_endpoint_map(
supergraph_state.subgraph_endpoint_map,
router_config.traffic_shaping.clone(),
)
.expect("Failed to create subgraph executor map");

Arc::new(Self {
schema_metadata,
planner,
pub fn new(router_config: HiveRouterConfig) -> Self {
Self {
validation_plan: graphql_tools::validation::rules::default_rules_validation_plan(),
subgraph_executor_map,
plan_cache: moka::future::Cache::new(1000),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All those caches are supergraph specific so I think RouterSharedState is already the state which lasts with the supergraph.

validate_cache: moka::future::Cache::new(1000),
parse_cache: moka::future::Cache::new(1000),
normalize_cache: moka::future::Cache::new(1000),
router_config,
})
}
}
}
15 changes: 15 additions & 0 deletions bin/router/src/supergraph/base.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
use async_trait::async_trait;

#[derive(Debug, thiserror::Error)]
pub enum LoadSupergraphError {
#[error("Failed to read supergraph file: {0}")]
ReadFileError(#[from] std::io::Error),
#[error("Failed to read supergraph from network: {0}")]
NetworkError(#[from] reqwest::Error),
}

#[async_trait]
pub trait SupergraphLoader {
async fn reload(&mut self) -> Result<(), LoadSupergraphError>;
fn current(&self) -> Option<&str>;
}
Loading
Loading