From 52c6e8c9804df74e313e478a0c0fca173848d314 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 11 Nov 2025 00:47:49 +0000 Subject: [PATCH 1/4] Initial plan From 1de4ceef48ed813448c4c027518ad62350832c0c Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 11 Nov 2025 01:02:21 +0000 Subject: [PATCH 2/4] Remove automatic creation of /apis directory in consumption_dir() Co-authored-by: callicles <4429209+callicles@users.noreply.github.com> --- apps/framework-cli/src/project.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/apps/framework-cli/src/project.rs b/apps/framework-cli/src/project.rs index 5579373c4..73fb3190f 100644 --- a/apps/framework-cli/src/project.rs +++ b/apps/framework-cli/src/project.rs @@ -543,10 +543,6 @@ impl Project { pub fn consumption_dir(&self) -> PathBuf { let apis_dir = self.app_dir().join(CONSUMPTION_DIR); - if !apis_dir.exists() { - std::fs::create_dir_all(&apis_dir).expect("Failed to create consumption directory"); - } - debug!("Consumptions dir: {:?}", apis_dir); apis_dir } From 1b14532fc4a748e8cbd2fc04951ec4c98004f9b2 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 11 Nov 2025 15:47:34 +0000 Subject: [PATCH 3/4] Remove DMV1 consumption loading infrastructure - part 1 Co-authored-by: callicles <4429209+callicles@users.noreply.github.com> --- .../src/cli/display/infrastructure.rs | 9 - apps/framework-cli/src/framework.rs | 1 - .../src/framework/consumption/loader.rs | 88 ------- .../src/framework/consumption/model.rs | 54 ---- .../src/framework/core/infrastructure_map.rs | 67 ----- .../core/partial_infrastructure_map.rs | 4 - .../src/framework/core/primitive_map.rs | 18 +- apps/framework-cli/src/framework/python.rs | 1 - .../src/framework/python/consumption.rs | 159 ----------- .../framework-cli/src/framework/typescript.rs | 1 - .../src/framework/typescript/consumption.rs | 246 ------------------ .../src/infrastructure/processes.rs | 17 -- .../processes/consumption_registry.rs | 111 -------- .../processes/process_registry.rs | 20 -- apps/framework-cli/src/project.rs | 12 +- 15 files changed, 2 insertions(+), 806 deletions(-) delete mode 100644 apps/framework-cli/src/framework/consumption/loader.rs delete mode 100644 apps/framework-cli/src/framework/consumption/model.rs delete mode 100644 apps/framework-cli/src/framework/python/consumption.rs delete mode 100644 apps/framework-cli/src/framework/typescript/consumption.rs delete mode 100644 apps/framework-cli/src/infrastructure/processes/consumption_registry.rs diff --git a/apps/framework-cli/src/cli/display/infrastructure.rs b/apps/framework-cli/src/cli/display/infrastructure.rs index 14859fc92..7c26012d7 100644 --- a/apps/framework-cli/src/cli/display/infrastructure.rs +++ b/apps/framework-cli/src/cli/display/infrastructure.rs @@ -670,15 +670,6 @@ pub fn show_process_changes(process_changes: &[ProcessChange]) { ProcessChange::OlapProcess(olap_change) => { handle_standard_change!(olap_change); } - ProcessChange::ConsumptionApiWebServer(Change::Added(_)) => { - infra_added("Starting Consumption WebServer..."); - } - ProcessChange::ConsumptionApiWebServer(Change::Removed(_)) => { - infra_removed("Stopping Consumption WebServer..."); - } - ProcessChange::ConsumptionApiWebServer(Change::Updated { .. }) => { - infra_updated("Reloading Consumption WebServer..."); - } ProcessChange::OrchestrationWorker(Change::Added(_)) => { infra_added("Starting Orchestration worker..."); } diff --git a/apps/framework-cli/src/framework.rs b/apps/framework-cli/src/framework.rs index d4b559e80..ab92582c2 100644 --- a/apps/framework-cli/src/framework.rs +++ b/apps/framework-cli/src/framework.rs @@ -2,7 +2,6 @@ use clap::Subcommand; pub mod blocks; pub mod bulk_import; -pub mod consumption; pub mod core; pub mod data_model; pub mod languages; diff --git a/apps/framework-cli/src/framework/consumption/loader.rs b/apps/framework-cli/src/framework/consumption/loader.rs deleted file mode 100644 index ac4fc6adf..000000000 --- a/apps/framework-cli/src/framework/consumption/loader.rs +++ /dev/null @@ -1,88 +0,0 @@ -use super::model::{Consumption, ConsumptionQueryParam, EndpointFile}; -use crate::framework::languages::SupportedLanguages; -use crate::framework::python::consumption::load_python_query_param; -use crate::framework::typescript; -use crate::framework::typescript::export_collectors::ExportCollectorError; -use crate::project::Project; -use crate::utilities::PathExt; -use serde::Deserialize; -use serde_json::Value; -use sha2::{Digest, Sha256}; -use std::{fs, path::Path}; - -#[derive(Debug, thiserror::Error)] -#[non_exhaustive] -pub enum AnalyticsApiLoaderError { - #[error("Failed to open file: {0}")] - FailedToOpenFile(std::io::Error), - #[error("Failed to load query params: {0}")] - FailedToLoadPythonParams(std::io::Error), - #[error("Failed to load query params: {0}")] - FailedToLoadTypescriptParams(ExportCollectorError), -} - -#[derive(Debug, Deserialize)] -pub struct QueryParamOutput { - pub params: Vec, -} - -pub async fn load_consumption(project: &Project) -> Result { - let mut endpoint_files = Vec::new(); - for f in walkdir::WalkDir::new(project.consumption_dir()) - .into_iter() - // drop Err cases - .flatten() - { - if f.file_type().is_file() && f.path().ext_is_supported_lang() { - let result = build_endpoint_file(project, f.path()).await; - log::debug!("build_endpoint_file result: {:?}", result); - if let Some(file) = result.ok().flatten() { - endpoint_files.push(file); - } - } - } - - Ok(Consumption { endpoint_files }) -} - -async fn build_endpoint_file( - project: &Project, - file_path: &Path, -) -> Result, AnalyticsApiLoaderError> { - if let Ok(path) = file_path.strip_prefix(project.consumption_dir()) { - let mut file = - fs::File::open(file_path).map_err(AnalyticsApiLoaderError::FailedToOpenFile)?; - let mut hasher = Sha256::new(); - std::io::copy(&mut file, &mut hasher).map_err(AnalyticsApiLoaderError::FailedToOpenFile)?; - let hash = hasher.finalize(); - - let mut path = path.to_path_buf(); - path.set_extension(""); - - let (query_params, output_schema, version) = match project.language { - SupportedLanguages::Typescript => typescript::export_collectors::get_func_types( - &path, - project, - &project.project_location, - ) - .await - .map_err(AnalyticsApiLoaderError::FailedToLoadTypescriptParams)?, - SupportedLanguages::Python => { - let params = load_python_query_param(project, &project.project_location, &path) - .await - .map_err(AnalyticsApiLoaderError::FailedToLoadPythonParams)?; - (params, Value::Null, None) - } - }; - - Ok(Some(EndpointFile { - path, - hash, - query_params, - output_schema, - version, - })) - } else { - Ok(None) - } -} diff --git a/apps/framework-cli/src/framework/consumption/model.rs b/apps/framework-cli/src/framework/consumption/model.rs deleted file mode 100644 index d3fbd1f2f..000000000 --- a/apps/framework-cli/src/framework/consumption/model.rs +++ /dev/null @@ -1,54 +0,0 @@ -use crate::framework::core::infrastructure::table::ColumnType; -use crate::proto::infrastructure_map::ConsumptionQueryParam as ProtoConsumptionQueryParam; -use hex::encode; -use protobuf::MessageField; -use serde::{Deserialize, Serialize}; -use serde_json::Value; -use sha2::{digest::Output, Sha256}; -use std::path::PathBuf; - -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] -pub struct ConsumptionQueryParam { - pub name: String, - pub data_type: ColumnType, - pub required: bool, -} - -impl ConsumptionQueryParam { - pub fn to_proto(&self) -> ProtoConsumptionQueryParam { - ProtoConsumptionQueryParam { - name: self.name.clone(), - data_type: MessageField::some(self.data_type.to_proto()), - required: self.required, - special_fields: Default::default(), - } - } - - pub fn from_proto(proto: ProtoConsumptionQueryParam) -> Self { - ConsumptionQueryParam { - name: proto.name, - data_type: ColumnType::from_proto(proto.data_type.unwrap()), - required: proto.required, - } - } -} - -#[derive(Debug, Clone, Default)] -pub struct EndpointFile { - pub path: PathBuf, - pub hash: Output, - pub query_params: Vec, - pub output_schema: Value, - pub version: Option, -} - -impl EndpointFile { - pub fn id(&self) -> String { - format!("{}-{}", self.path.to_string_lossy(), encode(self.hash)) - } -} - -#[derive(Debug, Clone, Default)] -pub struct Consumption { - pub endpoint_files: Vec, -} diff --git a/apps/framework-cli/src/framework/core/infrastructure_map.rs b/apps/framework-cli/src/framework/core/infrastructure_map.rs index 8e5232112..0925816d0 100644 --- a/apps/framework-cli/src/framework/core/infrastructure_map.rs +++ b/apps/framework-cli/src/framework/core/infrastructure_map.rs @@ -34,7 +34,6 @@ //! This module is essential for maintaining consistency between the defined infrastructure //! and the actual deployed components. use super::infrastructure::api_endpoint::{APIType, ApiEndpoint}; -use super::infrastructure::consumption_webserver::ConsumptionApiWebServer; use super::infrastructure::function_process::FunctionProcess; use super::infrastructure::olap_process::OlapProcess; use super::infrastructure::orchestration_worker::OrchestrationWorker; @@ -443,8 +442,6 @@ pub enum ProcessChange { FunctionProcess(Change), /// Change to an OLAP process OlapProcess(Change), - /// Change to a consumption API web server - ConsumptionApiWebServer(Change), /// Change to an orchestration worker OrchestrationWorker(Change), } @@ -527,11 +524,6 @@ pub struct InfrastructureMap { // TODO change to a hashmap of processes when we have several pub block_db_processes: OlapProcess, - /// Web server handling consumption API endpoints - // Not sure if we will want to change that or not in the future to be able to tell - // the new consumption endpoints that were added or removed. - pub consumption_api_web_server: ConsumptionApiWebServer, - /// Collection of orchestration workers indexed by worker ID #[serde(default = "HashMap::new")] pub orchestration_workers: HashMap, @@ -578,7 +570,6 @@ impl InfrastructureMap { topic_to_topic_sync_processes: Default::default(), function_processes: Default::default(), block_db_processes: OlapProcess {}, - consumption_api_web_server: ConsumptionApiWebServer {}, orchestration_workers: Default::default(), sql_resources: Default::default(), workflows: Default::default(), @@ -720,27 +711,6 @@ impl InfrastructureMap { // TODO update here when we have several blocks processes let block_db_processes = OlapProcess::from_blocks(&primitive_map.blocks); - // consumption api endpoints - let consumption_api_web_server = ConsumptionApiWebServer {}; - if !project.features.apis && !primitive_map.consumption.endpoint_files.is_empty() { - log::error!("Analytics APIs disabled. API endpoints will not be available."); - show_message_wrapper( - MessageType::Error, - Message { - action: "Disabled".to_string(), - details: format!( - "Analytics APIs feature is disabled but {} API endpoint(s) found. Enable 'apis = true' in moose.config.toml.", - primitive_map.consumption.endpoint_files.len() - ), - }, - ); - } else { - for api_endpoint in primitive_map.consumption.endpoint_files { - let api_endpoint_infra = ApiEndpoint::from(api_endpoint); - api_endpoints.insert(api_endpoint_infra.id(), api_endpoint_infra); - } - } - // Orchestration workers let mut orchestration_workers = HashMap::new(); let orchestration_worker = OrchestrationWorker::new(project.language); @@ -756,7 +726,6 @@ impl InfrastructureMap { views, function_processes, block_db_processes, - consumption_api_web_server, orchestration_workers, sql_resources: Default::default(), workflows: Default::default(), @@ -904,15 +873,6 @@ impl InfrastructureMap { ))); } - // Only add Analytics API server if apis feature is enabled - if project.features.apis { - process_changes.push(ProcessChange::ConsumptionApiWebServer(Change::< - ConsumptionApiWebServer, - >::Added( - Box::new(ConsumptionApiWebServer {}), - ))); - } - process_changes.push(ProcessChange::OrchestrationWorker(Change::< OrchestrationWorker, >::Added( @@ -1324,12 +1284,6 @@ impl InfrastructureMap { process_changes, ); - Self::diff_consumption_api_processes( - &self.consumption_api_web_server, - &target_map.consumption_api_web_server, - process_changes, - ); - Self::diff_orchestration_workers( &self.orchestration_workers, &target_map.orchestration_workers, @@ -1518,25 +1472,6 @@ impl InfrastructureMap { })); } - /// Compare Consumption API process changes between two infrastructure maps - fn diff_consumption_api_processes( - self_process: &ConsumptionApiWebServer, - target_process: &ConsumptionApiWebServer, - process_changes: &mut Vec, - ) { - log::info!("Analyzing changes in Analytics API processes..."); - - // We are currently not tracking individual consumption endpoints, so we will just restart - // the consumption web server when something changed - log::debug!("Analytics API Web Server updated (assumed for now)"); - process_changes.push(ProcessChange::ConsumptionApiWebServer(Change::< - ConsumptionApiWebServer, - >::Updated { - before: Box::new(self_process.clone()), - after: Box::new(target_process.clone()), - })); - } - /// Compare OrchestrationWorker changes between two infrastructure maps fn diff_orchestration_workers( self_workers: &HashMap, @@ -2526,7 +2461,6 @@ impl InfrastructureMap { .into_iter() .map(|(k, v)| (k, OrchestrationWorker::from_proto(v))) .collect(), - consumption_api_web_server: ConsumptionApiWebServer {}, block_db_processes: OlapProcess {}, sql_resources: proto .sql_resources @@ -2946,7 +2880,6 @@ impl Default for InfrastructureMap { topic_to_topic_sync_processes: HashMap::new(), function_processes: HashMap::new(), block_db_processes: OlapProcess {}, - consumption_api_web_server: ConsumptionApiWebServer {}, orchestration_workers: HashMap::new(), sql_resources: HashMap::new(), workflows: HashMap::new(), diff --git a/apps/framework-cli/src/framework/core/partial_infrastructure_map.rs b/apps/framework-cli/src/framework/core/partial_infrastructure_map.rs index 2ec987f84..090c8ed19 100644 --- a/apps/framework-cli/src/framework/core/partial_infrastructure_map.rs +++ b/apps/framework-cli/src/framework/core/partial_infrastructure_map.rs @@ -448,7 +448,6 @@ pub struct PartialInfrastructureMap { #[serde(default)] function_processes: HashMap, block_db_processes: Option, - consumption_api_web_server: Option, #[serde(default)] workflows: HashMap, #[serde(default)] @@ -582,9 +581,6 @@ impl PartialInfrastructureMap { topic_to_topic_sync_processes: self.topic_to_topic_sync_processes, function_processes, block_db_processes: self.block_db_processes.unwrap_or(OlapProcess {}), - consumption_api_web_server: self - .consumption_api_web_server - .unwrap_or(ConsumptionApiWebServer {}), orchestration_workers, workflows, web_apps, diff --git a/apps/framework-cli/src/framework/core/primitive_map.rs b/apps/framework-cli/src/framework/core/primitive_map.rs index 4675988ae..c6c3379ce 100644 --- a/apps/framework-cli/src/framework/core/primitive_map.rs +++ b/apps/framework-cli/src/framework/core/primitive_map.rs @@ -5,18 +5,14 @@ use std::{ }; use walkdir::WalkDir; +use crate::framework::core::infrastructure::table::ColumnType; use crate::framework::data_model::config::DataModelConfig; use crate::framework::data_model::DuplicateModelError; use crate::framework::languages::SupportedLanguages; -use crate::framework::{ - consumption::loader::{load_consumption, AnalyticsApiLoaderError}, - core::infrastructure::table::ColumnType, -}; use crate::utilities::PathExt; use crate::{ framework::{ blocks::model::Blocks, - consumption::model::Consumption, data_model::{ self, config::ModelConfigurationError, @@ -40,9 +36,6 @@ pub enum PrimitiveMapLoadingError { #[error("Failed to load functions")] FunctionsLoading(#[from] crate::framework::streaming::model::FunctionError), - - #[error("Failed to load analytics apis")] - AnalyticsApis(#[from] AnalyticsApiLoaderError), } #[derive(Debug, thiserror::Error)] @@ -68,13 +61,6 @@ pub struct PrimitiveMap { // to start/stop them individually. Right now we are starting all of them at once through the language specific // blocks runner. We are loading blocks as 1 unique blocks as a default. pub blocks: Blocks, - - // We are currently not loading individual consumption endpoints in the CLI and we probably will not need to - // Since this is a local webserver without side effects, keeping track of what is up and running is not necessary - // it just needs to be restarted when something in its dependency tree changes. - // We might want to try and load the full map of consumption endpoints in the future to be able to display thgat - // to the user. - pub consumption: Consumption, } fn check_no_empty_nested( @@ -148,8 +134,6 @@ impl PrimitiveMap { .cloned() .collect(); - primitive_map.consumption = load_consumption(project).await?; - log::debug!("Loaded Versioned primitive map: {:?}", primitive_map); primitive_map.validate()?; diff --git a/apps/framework-cli/src/framework/python.rs b/apps/framework-cli/src/framework/python.rs index a46005c44..ec612578c 100644 --- a/apps/framework-cli/src/framework/python.rs +++ b/apps/framework-cli/src/framework/python.rs @@ -2,7 +2,6 @@ use crate::framework::versions::Version; pub mod blocks; pub mod checker; -pub mod consumption; pub mod datamodel_config; pub mod executor; pub mod generate; diff --git a/apps/framework-cli/src/framework/python/consumption.rs b/apps/framework-cli/src/framework/python/consumption.rs deleted file mode 100644 index 5f84b1615..000000000 --- a/apps/framework-cli/src/framework/python/consumption.rs +++ /dev/null @@ -1,159 +0,0 @@ -use crate::cli::display::message::{Message, MessageType}; -use crate::framework::consumption::model::ConsumptionQueryParam; -use crate::framework::python::executor::{run_python_program, PythonProgram}; -use crate::infrastructure::olap::clickhouse::config::ClickHouseConfig; -use crate::infrastructure::processes::consumption_registry::ConsumptionError; -use crate::project::{JwtConfig, Project}; -use crate::utilities::constants::{CONSUMPTION_WRAPPER_PACKAGE_NAME, UTILS_WRAPPER_PACKAGE_NAME}; -use log::{error, info}; -use std::fs; -use std::path::Path; -use tokio::io::{AsyncBufReadExt, BufReader}; -use tokio::process::Child; - -use super::executor; - -pub fn run( - project: &Project, - clickhouse_config: &ClickHouseConfig, - jwt_config: &Option, - consumption_path: &Path, - proxy_port: Option, -) -> Result { - // Create the wrapper lib files inside the .moose directory - let internal_dir = project.internal_dir()?; - let consumption_runner_dir = internal_dir.join(CONSUMPTION_WRAPPER_PACKAGE_NAME); - let utils_lib_dir = consumption_runner_dir.join(UTILS_WRAPPER_PACKAGE_NAME); - - // Create the directory if it doesn't exist - if !consumption_runner_dir.exists() { - fs::create_dir(&consumption_runner_dir)?; - } - if !utils_lib_dir.exists() { - fs::create_dir(&utils_lib_dir)?; - } - - // Overwrite the wrapper files - fs::write( - utils_lib_dir.join("__init__.py"), - include_str!("./utils/__init__.py"), - )?; - fs::write( - utils_lib_dir.join("temporal.py"), - include_str!("./utils/temporal.py"), - )?; - - let jwt_secret = jwt_config - .as_ref() - .map(|jwt| jwt.secret.clone()) - .unwrap_or("".to_string()); - - let jwt_issuer = jwt_config - .as_ref() - .map(|jwt| jwt.issuer.clone()) - .unwrap_or("".to_string()); - - let jwt_audience = jwt_config - .as_ref() - .map(|jwt| jwt.audience.clone()) - .unwrap_or("".to_string()); - - let enforce_on_all_consumptions_apis = jwt_config - .as_ref() - .map(|jwt| jwt.enforce_on_all_consumptions_apis.to_string()) - .unwrap_or("false".to_string()); - - let args = vec![ - consumption_path.to_str().unwrap().to_string(), - clickhouse_config.db_name.clone(), - clickhouse_config.host.clone(), - clickhouse_config.host_port.to_string(), - clickhouse_config.user.clone(), - clickhouse_config.password.clone(), - clickhouse_config.use_ssl.to_string(), - jwt_secret, - jwt_issuer, - jwt_audience, - enforce_on_all_consumptions_apis, - project.temporal_config.temporal_url(), - project.temporal_config.get_temporal_namespace(), - project.temporal_config.client_cert.clone(), - project.temporal_config.client_key.clone(), - project.temporal_config.api_key.clone(), - project.features.data_model_v2.to_string(), - proxy_port.unwrap_or(4001).to_string(), - ]; - - let mut consumption_process = executor::run_python_program( - project, - &project.project_location, - executor::PythonProgram::ConsumptionRunner { args }, - )?; - - let stdout = consumption_process - .stdout - .take() - .expect("Analytics api process did not have a handle to stdout"); - - let stderr = consumption_process - .stderr - .take() - .expect("Analytics api process did not have a handle to stderr"); - - let mut stdout_reader = BufReader::new(stdout).lines(); - let mut stderr_reader = BufReader::new(stderr).lines(); - - tokio::spawn(async move { - while let Ok(Some(line)) = stdout_reader.next_line().await { - let stripped = line - .strip_prefix("[QueryClient] | ") - .or_else(|| line.strip_prefix("[API] | ")); - if let Some(stripped) = stripped { - show_message!( - MessageType::Info, - Message { - action: "API".to_string(), - details: stripped.to_string(), - } - ); - } else { - info!("{}", line); - } - } - }); - - tokio::spawn(async move { - while let Ok(Some(line)) = stderr_reader.next_line().await { - error!("{}", line); - } - }); - - Ok(consumption_process) -} - -pub async fn load_python_query_param( - project: &Project, - project_location: &Path, - path: &Path, -) -> Result, std::io::Error> { - let args = vec![path.file_name().unwrap().to_str().unwrap().to_string()]; - let process = run_python_program( - project, - project_location, - PythonProgram::LoadApiParam { args }, - )?; - let output = process.wait_with_output().await?; - - if !output.status.success() { - return Err(std::io::Error::other( - String::from_utf8_lossy(&output.stderr).to_string(), - )); - } - let raw_string_stdout = String::from_utf8_lossy(&output.stdout); - - let config = serde_json::from_str::( - &raw_string_stdout, - ) - .map_err(std::io::Error::other)?; - Ok(config.params) -} diff --git a/apps/framework-cli/src/framework/typescript.rs b/apps/framework-cli/src/framework/typescript.rs index ac5624a45..ca957ab9c 100644 --- a/apps/framework-cli/src/framework/typescript.rs +++ b/apps/framework-cli/src/framework/typescript.rs @@ -1,7 +1,6 @@ pub mod bin; pub mod blocks; pub mod checker; -pub mod consumption; pub mod export_collectors; pub mod generate; pub mod generator; diff --git a/apps/framework-cli/src/framework/typescript/consumption.rs b/apps/framework-cli/src/framework/typescript/consumption.rs deleted file mode 100644 index bb64f23df..000000000 --- a/apps/framework-cli/src/framework/typescript/consumption.rs +++ /dev/null @@ -1,246 +0,0 @@ -use crate::cli::display::message::{Message, MessageType}; -use crate::framework::consumption::model::ConsumptionQueryParam; -use crate::framework::core::infrastructure::table::{ColumnType, FloatType, IntType}; -use crate::framework::typescript::export_collectors::ExportCollectorError; -use crate::infrastructure::olap::clickhouse::config::ClickHouseConfig; -use crate::infrastructure::processes::consumption_registry::ConsumptionError; -use crate::project::{JwtConfig, Project}; -use log::{debug, error, info}; -use serde_json::{Map, Value}; -use std::path::Path; -use tokio::io::{AsyncBufReadExt, BufReader}; -use tokio::process::Child; - -use super::bin; - -const CONSUMPTION_RUNNER_BIN: &str = "consumption-apis"; - -// TODO: Abstract away ClickhouseConfig to support other databases -// TODO: Bubble up compilation errors to the user -pub fn run( - project: &Project, - clickhouse_config: &ClickHouseConfig, - jwt_config: &Option, - consumption_path: &Path, - project_path: &Path, - proxy_port: Option, -) -> Result { - let host_port = clickhouse_config.host_port.to_string(); - let temporal_url = project.temporal_config.temporal_url(); - let temporal_namespace = project.temporal_config.get_temporal_namespace(); - let client_cert = project.temporal_config.client_cert.clone(); - let client_key = project.temporal_config.client_key.clone(); - let api_key = project.temporal_config.api_key.clone(); - - let mut string_args = vec![ - consumption_path.to_str().unwrap().to_string(), - clickhouse_config.db_name.clone(), - clickhouse_config.host.clone(), - host_port, - clickhouse_config.user.clone(), - clickhouse_config.password.clone(), - ]; - - if clickhouse_config.use_ssl { - string_args.push("--clickhouse-use-ssl".to_string()); - } - - if let Some(jwt_config) = jwt_config { - if jwt_config.enforce_on_all_consumptions_apis { - string_args.push("--enforce-auth".to_string()); - } - - string_args.push("--jwt-secret".to_string()); - string_args.push(jwt_config.secret.clone()); - string_args.push("--jwt-issuer".to_string()); - string_args.push(jwt_config.issuer.clone()); - string_args.push("--jwt-audience".to_string()); - string_args.push(jwt_config.audience.clone()); - } - - if project.features.workflows { - string_args.push("--temporal-url".to_string()); - string_args.push(temporal_url); - - string_args.push("--temporal-namespace".to_string()); - string_args.push(temporal_namespace); - - string_args.push("--client-cert".to_string()); - string_args.push(client_cert); - - string_args.push("--client-key".to_string()); - string_args.push(client_key); - - string_args.push("--api-key".to_string()); - string_args.push(api_key); - } - - if project.features.data_model_v2 { - string_args.push("--is-dmv2".to_string()); - } - - if let Some(port) = proxy_port { - string_args.push("--proxy-port".to_string()); - string_args.push(port.to_string()); - } - - if let Some(worker_count) = project.http_server_config.api_workers { - string_args.push("--worker-count".to_string()); - string_args.push(worker_count.to_string()); - } - - let args: Vec<&str> = string_args.iter().map(|s| s.as_str()).collect(); - let mut consumption_process = bin::run(CONSUMPTION_RUNNER_BIN, project_path, &args, project)?; - - let stdout = consumption_process - .stdout - .take() - .expect("Analytics api process did not have a handle to stdout"); - - let stderr = consumption_process - .stderr - .take() - .expect("Analytics api process did not have a handle to stderr"); - - let mut stdout_reader = BufReader::new(stdout).lines(); - let mut stderr_reader = BufReader::new(stderr).lines(); - - tokio::spawn(async move { - while let Ok(Some(line)) = stdout_reader.next_line().await { - let stripped = line - .strip_prefix("[QueryClient] | ") - .or_else(|| line.strip_prefix("[API] | ")); - if let Some(stripped) = stripped { - show_message!( - MessageType::Info, - Message { - action: "API".to_string(), - details: stripped.to_string(), - } - ); - } else { - info!("{}", line); - } - } - }); - - tokio::spawn(async move { - while let Ok(Some(line)) = stderr_reader.next_line().await { - error!("{}", line); - } - }); - - Ok(consumption_process) -} - -fn schema_to_params_list( - schema: &Value, -) -> Result, ExportCollectorError> { - let required_keys = schema - .as_object() - .and_then(|m| m.get("required")) - .and_then(|v| v.as_array()); - - let converted = schema - .as_object() - .and_then(|m| m.get("properties")) - .and_then(|o| o.as_object()) - .ok_or_else(|| ExportCollectorError::Other { - message: "Missing properties in schema.".to_string(), - })? - .iter() - .map(|(k, v)| { - let type_object = v.as_object(); - let data_type = match type_object - .and_then(|m| m.get("type")) - .and_then(|v| v.as_str()) - { - Some("string") => ColumnType::String, - Some("number") => ColumnType::Float(FloatType::Float64), - Some("integer") => ColumnType::Int(IntType::Int64), - Some("boolean") => ColumnType::Boolean, - // no recursion here, query param does not support nested arrays - Some("array") => { - let inner_type = match type_object - .unwrap() - .get("items") - .and_then(|v| v.as_object()) - .and_then(|m| m.get("type")) - .and_then(|v| v.as_str()) - { - Some("number") => ColumnType::Float(FloatType::Float64), - Some("integer") => ColumnType::Int(IntType::Int64), - Some("boolean") => ColumnType::Boolean, - _ => ColumnType::String, - }; - ColumnType::Array { - element_type: Box::new(inner_type), - element_nullable: false, - } - } - - unexpected => { - debug!("unexpected type {:?} for field {k}", unexpected); - ColumnType::String - } - }; - - ConsumptionQueryParam { - name: k.to_string(), - data_type, - required: required_keys - .is_some_and(|arr| arr.iter().any(|v| v.as_str() == Some(k))), - } - }) - .collect(); - Ok(converted) -} - -pub fn extract_schema(json_schema: &Map) -> Result<&Value, ExportCollectorError> { - let schemas = json_schema - .get("schemas") - .and_then(|o| o.as_array()) - .ok_or_else(|| ExportCollectorError::Other { - message: "Unexpected schema shape.".to_string(), - })?; - - let schema = if schemas.len() == 1 { - schemas.iter().next().unwrap() - } else { - return Err(ExportCollectorError::Other { - message: format!("Unexpected number of schemas: {}", schemas.len()), - }); - }; - - let schema_deref = if let Some(Value::String(s)) = schema.get("$ref") { - let components_schemas = json_schema - .get("components") - .and_then(|o| o.as_object()) - .and_then(|m| m.get("schemas")) - .and_then(|o| o.as_object()) - .ok_or_else(|| ExportCollectorError::Other { - message: "Unexpected schema shape.".to_string(), - })?; - components_schemas - .get(s.strip_prefix("#/components/schemas/").unwrap_or(s)) - .ok_or_else(|| ExportCollectorError::Other { - message: format!("Schema {s} not found."), - })? - } else { - schema - }; - Ok(schema_deref) -} - -pub fn extract_intput_param( - map: &Map, -) -> Result, ExportCollectorError> { - let input_schema = map - .get("inputSchema") - .and_then(|o| o.as_object()) - .ok_or_else(|| ExportCollectorError::Other { - message: "inputSchema field should be an object.".to_string(), - })?; - - schema_to_params_list(extract_schema(input_schema)?) -} diff --git a/apps/framework-cli/src/infrastructure/processes.rs b/apps/framework-cli/src/infrastructure/processes.rs index a7ddd5b3f..1e2a0f810 100644 --- a/apps/framework-cli/src/infrastructure/processes.rs +++ b/apps/framework-cli/src/infrastructure/processes.rs @@ -17,7 +17,6 @@ use crate::{ }; pub mod blocks_registry; -pub mod consumption_registry; pub mod functions_registry; pub mod kafka_clickhouse_sync; pub mod orchestration_workers_registry; @@ -176,22 +175,6 @@ pub async fn execute_changes( before: _, after: _, }) => {} - ProcessChange::ConsumptionApiWebServer(Change::Added(_)) => { - log::info!("Starting analytics api webserver process"); - process_registry.consumption.start()?; - } - ProcessChange::ConsumptionApiWebServer(Change::Removed(_)) => { - log::info!("Stopping analytics api webserver process"); - process_registry.consumption.stop().await?; - } - ProcessChange::ConsumptionApiWebServer(Change::Updated { - before: _, - after: _, - }) => { - log::info!("Re-Starting analytics api webserver process"); - process_registry.consumption.stop().await?; - process_registry.consumption.start()?; - } ProcessChange::OrchestrationWorker(Change::Added(new_orchestration_worker)) => { log::info!("Starting Orchestration worker process"); process_registry diff --git a/apps/framework-cli/src/infrastructure/processes/consumption_registry.rs b/apps/framework-cli/src/infrastructure/processes/consumption_registry.rs deleted file mode 100644 index df6623797..000000000 --- a/apps/framework-cli/src/infrastructure/processes/consumption_registry.rs +++ /dev/null @@ -1,111 +0,0 @@ -use std::path::PathBuf; - -use log::info; - -use crate::utilities::system::{RestartingProcess, StartChildFn}; -use crate::{ - framework::{languages::SupportedLanguages, python, typescript}, - infrastructure::olap::clickhouse::config::ClickHouseConfig, - project::{JwtConfig, Project, ProjectFileError}, - utilities::system::KillProcessError, -}; - -#[derive(Debug, thiserror::Error)] -#[non_exhaustive] -pub enum ConsumptionError { - #[error("Failed to start/stop the analytics api process")] - IoError(#[from] std::io::Error), - - #[error("Kill process Error")] - KillProcessError(#[from] KillProcessError), - - #[error("Failed to create library files")] - ProjectFileError(#[from] ProjectFileError), -} - -pub struct ConsumptionProcessRegistry { - api_process: Option, - clickhouse_config: ClickHouseConfig, - dir: PathBuf, - language: SupportedLanguages, - project_path: PathBuf, - jwt_config: Option, - project: Project, - proxy_port: Option, -} - -impl ConsumptionProcessRegistry { - pub fn new( - language: SupportedLanguages, - clickhouse_config: ClickHouseConfig, - jwt_config: Option, - dir: PathBuf, - project_path: PathBuf, - project: Project, - proxy_port: Option, - ) -> Self { - let proxy_port = proxy_port.or(Some(project.http_server_config.proxy_port)); - Self { - api_process: Option::None, - language, - dir, - clickhouse_config, - project_path, - jwt_config, - project, - proxy_port, - } - } - - pub fn start(&mut self) -> Result<(), ConsumptionError> { - info!("Starting analytics api..."); - - let project = self.project.clone(); - let clickhouse_config = self.clickhouse_config.clone(); - let jwt_config = self.jwt_config.clone(); - let proxy_port = self.proxy_port; - let dir = self.dir.clone(); - - let start_child: StartChildFn = match self.language { - SupportedLanguages::Python => Box::new(move || { - python::consumption::run( - &project, - &clickhouse_config, - &jwt_config, - &dir, - proxy_port, - ) - }), - SupportedLanguages::Typescript => { - let project_path = self.project_path.clone(); - Box::new(move || { - typescript::consumption::run( - &project, - &clickhouse_config, - &jwt_config, - &dir, - &project_path, - proxy_port, - ) - }) - } - }; - - self.api_process = Some(RestartingProcess::create( - "consumption-api".to_string(), - start_child, - )?); - - Ok(()) - } - - pub async fn stop(&mut self) -> Result<(), ConsumptionError> { - info!("Stopping analytics apis..."); - - if let Some(child) = self.api_process.take() { - child.stop().await - }; - - Ok(()) - } -} diff --git a/apps/framework-cli/src/infrastructure/processes/process_registry.rs b/apps/framework-cli/src/infrastructure/processes/process_registry.rs index 16bfaae7b..4f26c50c9 100644 --- a/apps/framework-cli/src/infrastructure/processes/process_registry.rs +++ b/apps/framework-cli/src/infrastructure/processes/process_registry.rs @@ -8,7 +8,6 @@ use crate::cli::settings::Settings; use crate::project::Project; use super::blocks_registry::BlocksProcessRegistry; -use super::consumption_registry::{ConsumptionError, ConsumptionProcessRegistry}; use super::functions_registry::{FunctionProcessRegistry, FunctionRegistryError}; use super::kafka_clickhouse_sync::SyncingProcessesRegistry; use super::orchestration_workers_registry::{ @@ -26,9 +25,6 @@ pub struct ProcessRegistries { /// Registry for block processes that handle data processing blocks pub blocks: Option, - /// Registry for consumption processes that provide API access to data - pub consumption: ConsumptionProcessRegistry, - /// Registry for orchestration worker processes that handle workflow execution pub orchestration_workers: OrchestrationWorkersRegistry, @@ -46,10 +42,6 @@ pub enum ProcessRegistryError { /// Error that occurs when stopping orchestration worker processes fails #[error("Failed to stop the orchestration workers")] OrchestrationWorkersProcessError(#[from] OrchestrationWorkersRegistryError), - - /// Error that occurs when stopping a consumption process fails - #[error("Failed to stop the analytics api process")] - ConsumptionProcessError(#[from] ConsumptionError), } impl ProcessRegistries { @@ -80,22 +72,11 @@ impl ProcessRegistries { )) }; - let consumption = ConsumptionProcessRegistry::new( - project.language, - project.clickhouse_config.clone(), - project.jwt.clone(), - project.consumption_dir(), - project.project_location.clone(), - project.clone(), - None, // proxy_port: will use project.http_server_config.proxy_port - ); - let orchestration_workers = OrchestrationWorkersRegistry::new(project, settings); Self { functions, blocks, - consumption, orchestration_workers, syncing, } @@ -110,7 +91,6 @@ impl ProcessRegistries { /// * `Result<(), ProcessRegistryError>` - Ok if all processes stopped successfully, Error otherwise pub async fn stop(&mut self) -> Result<(), ProcessRegistryError> { self.functions.stop_all().await; - self.consumption.stop().await?; self.orchestration_workers.stop_all().await?; self.syncing.stop_all().await; Ok(()) diff --git a/apps/framework-cli/src/project.rs b/apps/framework-cli/src/project.rs index 73fb3190f..4d385b8a4 100644 --- a/apps/framework-cli/src/project.rs +++ b/apps/framework-cli/src/project.rs @@ -73,9 +73,7 @@ use crate::utilities::constants::CLI_INTERNAL_VERSIONS_DIR; use crate::utilities::constants::ENVIRONMENT_VARIABLE_PREFIX; use crate::utilities::constants::PROJECT_CONFIG_FILE; use crate::utilities::constants::{APP_DIR, CLI_PROJECT_INTERNAL_DIR, SCHEMAS_DIR}; -use crate::utilities::constants::{ - CONSUMPTION_DIR, FUNCTIONS_DIR, OLD_PROJECT_CONFIG_FILE, TS_FLOW_FILE, -}; +use crate::utilities::constants::{FUNCTIONS_DIR, OLD_PROJECT_CONFIG_FILE, TS_FLOW_FILE}; use crate::utilities::git::GitConfig; use crate::utilities::PathExt; use crate::utilities::_true; @@ -539,14 +537,6 @@ impl Project { blocks_dir } - /// Returns the path to the consumption directory - pub fn consumption_dir(&self) -> PathBuf { - let apis_dir = self.app_dir().join(CONSUMPTION_DIR); - - debug!("Consumptions dir: {:?}", apis_dir); - apis_dir - } - /// Returns the path to the internal directory pub fn internal_dir(&self) -> Result { let mut internal_dir = self.project_location.clone(); From 0c99406d76680b3355b1536ac470a470c4feb526 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 12 Nov 2025 16:55:43 +0000 Subject: [PATCH 4/4] Fix compilation errors - moved consumption types to api_endpoint module Co-authored-by: callicles <4429209+callicles@users.noreply.github.com> --- .../framework-cli/src/cli/routines/openapi.rs | 5 +- .../framework/core/infra_reality_checker.rs | 5 -- .../core/infrastructure/api_endpoint.rs | 47 ++++++++++++++++- .../core/partial_infrastructure_map.rs | 3 +- .../framework/typescript/export_collectors.rs | 52 ++++--------------- .../src/infrastructure/processes.rs | 4 -- 6 files changed, 61 insertions(+), 55 deletions(-) diff --git a/apps/framework-cli/src/cli/routines/openapi.rs b/apps/framework-cli/src/cli/routines/openapi.rs index d4d66e8cf..e3f5fea2f 100644 --- a/apps/framework-cli/src/cli/routines/openapi.rs +++ b/apps/framework-cli/src/cli/routines/openapi.rs @@ -1,5 +1,6 @@ -use crate::framework::consumption::model::ConsumptionQueryParam; -use crate::framework::core::infrastructure::api_endpoint::{APIType, ApiEndpoint}; +use crate::framework::core::infrastructure::api_endpoint::{ + APIType, ApiEndpoint, ConsumptionQueryParam, +}; use crate::framework::core::infrastructure::table::ColumnType; use crate::framework::core::infrastructure_map::InfrastructureMap; use crate::project::Project; diff --git a/apps/framework-cli/src/framework/core/infra_reality_checker.rs b/apps/framework-cli/src/framework/core/infra_reality_checker.rs index 3c8c5626d..398125f07 100644 --- a/apps/framework-cli/src/framework/core/infra_reality_checker.rs +++ b/apps/framework-cli/src/framework/core/infra_reality_checker.rs @@ -313,7 +313,6 @@ impl InfraRealityChecker { mod tests { use super::*; use crate::cli::local_webserver::LocalWebserverConfig; - use crate::framework::core::infrastructure::consumption_webserver::ConsumptionApiWebServer; use crate::framework::core::infrastructure::olap_process::OlapProcess; use crate::framework::core::infrastructure::table::{ Column, ColumnType, IntType, OrderBy, Table, @@ -443,7 +442,6 @@ mod tests { topic_to_topic_sync_processes: HashMap::new(), function_processes: HashMap::new(), block_db_processes: OlapProcess {}, - consumption_api_web_server: ConsumptionApiWebServer {}, orchestration_workers: HashMap::new(), sql_resources: HashMap::new(), workflows: HashMap::new(), @@ -511,7 +509,6 @@ mod tests { topic_to_topic_sync_processes: HashMap::new(), function_processes: HashMap::new(), block_db_processes: OlapProcess {}, - consumption_api_web_server: ConsumptionApiWebServer {}, orchestration_workers: HashMap::new(), sql_resources: HashMap::new(), workflows: HashMap::new(), @@ -585,7 +582,6 @@ mod tests { topic_to_topic_sync_processes: HashMap::new(), function_processes: HashMap::new(), block_db_processes: OlapProcess {}, - consumption_api_web_server: ConsumptionApiWebServer {}, orchestration_workers: HashMap::new(), sql_resources: HashMap::new(), workflows: HashMap::new(), @@ -652,7 +648,6 @@ mod tests { topic_to_topic_sync_processes: HashMap::new(), function_processes: HashMap::new(), block_db_processes: OlapProcess {}, - consumption_api_web_server: ConsumptionApiWebServer {}, orchestration_workers: HashMap::new(), sql_resources: HashMap::new(), workflows: HashMap::new(), diff --git a/apps/framework-cli/src/framework/core/infrastructure/api_endpoint.rs b/apps/framework-cli/src/framework/core/infrastructure/api_endpoint.rs index 689034d8f..f6c621465 100644 --- a/apps/framework-cli/src/framework/core/infrastructure/api_endpoint.rs +++ b/apps/framework-cli/src/framework/core/infrastructure/api_endpoint.rs @@ -1,8 +1,8 @@ +use super::table::ColumnType; use super::table::Metadata; use super::{topic::Topic, DataLineage, InfrastructureSignature}; use crate::framework::versions::Version; use crate::framework::{ - consumption::model::{ConsumptionQueryParam, EndpointFile}, core::infrastructure_map::{PrimitiveSignature, PrimitiveTypes}, data_model::model::DataModel, }; @@ -12,12 +12,57 @@ use crate::proto::infrastructure_map::Method as ProtoMethod; use crate::proto::infrastructure_map::{ ApiEndpoint as ProtoApiEndpoint, EgressDetails, IngressDetails, }; +use hex::encode; use protobuf::{EnumOrUnknown, MessageField}; use serde::de::Error; use serde::{Deserialize, Deserializer, Serialize}; use serde_json::Value; +use sha2::{digest::Output, Sha256}; use std::path::PathBuf; +/// Query parameter definition for consumption APIs +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct ConsumptionQueryParam { + pub name: String, + pub data_type: ColumnType, + pub required: bool, +} + +impl ConsumptionQueryParam { + pub fn to_proto(&self) -> crate::proto::infrastructure_map::ConsumptionQueryParam { + crate::proto::infrastructure_map::ConsumptionQueryParam { + name: self.name.clone(), + data_type: MessageField::some(self.data_type.to_proto()), + required: self.required, + special_fields: Default::default(), + } + } + + pub fn from_proto(proto: crate::proto::infrastructure_map::ConsumptionQueryParam) -> Self { + ConsumptionQueryParam { + name: proto.name, + data_type: ColumnType::from_proto(proto.data_type.unwrap()), + required: proto.required, + } + } +} + +/// Endpoint file definition (used by DMV1 only, kept for compatibility) +#[derive(Debug, Clone, Default)] +pub struct EndpointFile { + pub path: PathBuf, + pub hash: Output, + pub query_params: Vec, + pub output_schema: Value, + pub version: Option, +} + +impl EndpointFile { + pub fn id(&self) -> String { + format!("{}-{}", self.path.to_string_lossy(), encode(self.hash)) + } +} + #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub enum APIType { INGRESS { diff --git a/apps/framework-cli/src/framework/core/partial_infrastructure_map.rs b/apps/framework-cli/src/framework/core/partial_infrastructure_map.rs index 090c8ed19..6dd1b4c2a 100644 --- a/apps/framework-cli/src/framework/core/partial_infrastructure_map.rs +++ b/apps/framework-cli/src/framework/core/partial_infrastructure_map.rs @@ -46,7 +46,6 @@ use tokio::process::Child; use super::{ infrastructure::{ api_endpoint::{APIType, ApiEndpoint, Method}, - consumption_webserver::ConsumptionApiWebServer, function_process::FunctionProcess, olap_process::OlapProcess, orchestration_worker::OrchestrationWorker, @@ -61,7 +60,7 @@ use super::{ use crate::framework::core::infrastructure::table::OrderBy; use crate::{ framework::{ - consumption::model::ConsumptionQueryParam, languages::SupportedLanguages, + core::infrastructure::api_endpoint::ConsumptionQueryParam, languages::SupportedLanguages, scripts::Workflow, versions::Version, }, infrastructure::olap::clickhouse::queries::ClickhouseEngine, diff --git a/apps/framework-cli/src/framework/typescript/export_collectors.rs b/apps/framework-cli/src/framework/typescript/export_collectors.rs index 12a7e8bcd..e85e91243 100644 --- a/apps/framework-cli/src/framework/typescript/export_collectors.rs +++ b/apps/framework-cli/src/framework/typescript/export_collectors.rs @@ -1,9 +1,6 @@ use super::bin; -use crate::framework::consumption::model::ConsumptionQueryParam; use crate::framework::data_model::config::{ConfigIdentifier, DataModelConfig}; -use crate::framework::typescript::consumption::{extract_intput_param, extract_schema}; use crate::project::Project; -use log::debug; use serde_json::Value; use std::collections::{HashMap, HashSet}; use std::path::Path; @@ -11,10 +8,11 @@ use tokio::io::AsyncReadExt; use tokio::process::Child; const EXPORT_SERIALIZER_BIN: &str = "export-serializer"; -const EXPORT_FUNC_TYPE_BIN: &str = "consumption-type-serializer"; +// DMV1 consumption type serializer - no longer used +// const EXPORT_FUNC_TYPE_BIN: &str = "consumption-type-serializer"; const EXPORT_CONFIG_PROCESS: &str = "Data model config"; -const EXPORT_FUNC_TYPE_PROCESS: &str = "API schema"; +// const EXPORT_FUNC_TYPE_PROCESS: &str = "API schema"; #[derive(Debug, thiserror::Error)] #[error("Failed to run code")] @@ -112,39 +110,11 @@ pub async fn get_data_model_configs( } } -pub async fn get_func_types( - file: &Path, - project: &Project, - project_path: &Path, -) -> Result<(Vec, Value, Option), ExportCollectorError> { - let exports = collect_exports( - EXPORT_FUNC_TYPE_BIN, - EXPORT_FUNC_TYPE_PROCESS, - file, - project, - project_path, - ) - .await?; - - debug!("Schema for path {:?} {}", file, exports); - - let (input_params, output_schema, version) = match exports { - Value::Object(mut map) => ( - extract_intput_param(&map)?, - match map.remove("outputSchema") { - None => Value::Null, - Some(Value::Object(schema)) => extract_schema(&schema)?.clone(), - Some(_) => Err(ExportCollectorError::Other { - message: "output schema must be an object".to_string(), - })?, - }, - map.get("version") - .and_then(|v| v.as_str()) - .map(|s| s.to_string()), - ), - _ => Err(ExportCollectorError::Other { - message: "Expected an object as the root of the exports".to_string(), - })?, - }; - Ok((input_params, output_schema, version)) -} +// DMV1 consumption API type extraction - no longer used since DMV2 loads from index.ts +// pub async fn get_func_types( +// file: &Path, +// project: &Project, +// project_path: &Path, +// ) -> Result<(Vec, Value, Option), ExportCollectorError> { +// ...removed for DMV2... +// } diff --git a/apps/framework-cli/src/infrastructure/processes.rs b/apps/framework-cli/src/infrastructure/processes.rs index 1e2a0f810..604e9ab64 100644 --- a/apps/framework-cli/src/infrastructure/processes.rs +++ b/apps/framework-cli/src/infrastructure/processes.rs @@ -1,6 +1,5 @@ use std::sync::Arc; -use consumption_registry::ConsumptionError; use orchestration_workers_registry::OrchestrationWorkersRegistryError; use process_registry::ProcessRegistries; @@ -33,9 +32,6 @@ pub enum SyncProcessChangesError { #[error("Failed in the blocks registry")] OlapProcess(#[from] BlocksError), - #[error("Failed in the analytics api registry")] - ConsumptionProcess(#[from] ConsumptionError), - #[error("Failed in the orchestration workers registry")] OrchestrationWorkersRegistry(#[from] OrchestrationWorkersRegistryError),