From 36909853b876053ff853db818af4ed8b875e50b0 Mon Sep 17 00:00:00 2001 From: "Dwight J. Browne" Date: Wed, 19 Nov 2025 11:41:39 -0800 Subject: [PATCH] bugfix for missing security loader. Needs decoulping. --- .../commands/load/missing_symbol_logger.rs | 2 +- .../src/commands/load/missing_symbols.rs | 296 +++++++++++------- 2 files changed, 183 insertions(+), 115 deletions(-) diff --git a/crates/av-cli/src/commands/load/missing_symbol_logger.rs b/crates/av-cli/src/commands/load/missing_symbol_logger.rs index b15cc9f..7a2d627 100644 --- a/crates/av-cli/src/commands/load/missing_symbol_logger.rs +++ b/crates/av-cli/src/commands/load/missing_symbol_logger.rs @@ -45,7 +45,7 @@ use tracing::{debug, warn}; /// * `Ok(())` - Symbol was logged successfully /// * `Err(e)` - Database error occurred pub fn log_missing_symbol( - conn: &mut PgConnection, + conn: &mut PgConnection, //todo: decouple from postgres symbol: &str, source: &str, ) -> Result<(), diesel::result::Error> { diff --git a/crates/av-cli/src/commands/load/missing_symbols.rs b/crates/av-cli/src/commands/load/missing_symbols.rs index bfcd8e9..b39250a 100644 --- a/crates/av-cli/src/commands/load/missing_symbols.rs +++ b/crates/av-cli/src/commands/load/missing_symbols.rs @@ -35,13 +35,14 @@ use std::time::Duration; use tokio::time::sleep; use tracing::{debug, error, info, warn}; +use super::securities::normalize_alpha_region; use super::sid_generator::SidGenerator; use crate::config::Config; use av_client::AlphaVantageClient; use av_core::types::market::SecurityType; use av_database_postgres::models::{MissingSymbol, NewSymbolOwned}; use av_database_postgres::schema::{missing_symbols, symbols}; - +//todo: Need to refactor const NO_PRIORITY: i32 = 9_999_999; #[derive(Args, Debug)] pub struct MissingSymbolsArgs { @@ -126,7 +127,7 @@ pub async fn execute(args: MissingSymbolsArgs, config: Config) -> Result<()> { let mut sid_generator = tokio::task::spawn_blocking({ let database_url = config.database_url.clone(); move || -> Result { - let mut conn = diesel::PgConnection::establish(&database_url)?; + let mut conn = diesel::PgConnection::establish(&database_url)?; //todo: decouple from Postgres SidGenerator::new(&mut conn) } }) @@ -206,25 +207,25 @@ enum ResolutionResult { } /// Normalize symbol for querying AlphaVantage API -/// Returns (query_symbol, skip_reason) -fn normalize_symbol_for_query(symbol: &str) -> (String, Option) { +/// Returns (query_symbol, Security Type +fn normalize_symbol_for_query(symbol: &str) -> (String, SecurityType) { // Handle prefixed symbols from news feeds if let Some(stripped) = symbol.strip_prefix("CRYPTO:") { - return (stripped.to_string(), None); + return (stripped.to_string(), SecurityType::Cryptocurrency); } // Skip FOREX, INDEX, COMMODITY - these aren't equities/ETFs/mutual funds if symbol.starts_with("FOREX:") { - return (symbol.to_string(), Some("Skipped: FOREX symbols not supported".to_string())); + return (symbol.to_string(), SecurityType::Currency); } if symbol.starts_with("INDEX:") { - return (symbol.to_string(), Some("Skipped: INDEX symbols not supported".to_string())); + return (symbol.to_string(), SecurityType::Index); } if symbol.starts_with("COMMODITY:") { - return (symbol.to_string(), Some("Skipped: COMMODITY symbols not supported".to_string())); + return (symbol.to_string(), SecurityType::Commodity); } - (symbol.to_string(), None) + (symbol.to_string(), SecurityType::Equity) } async fn fetch_and_store_overview( @@ -237,10 +238,10 @@ async fn fetch_and_store_overview( let symbol = &missing_symbol.symbol; // Normalize the symbol for API query - let (query_symbol, skip_reason) = normalize_symbol_for_query(symbol); + let (query_symbol, sec_type) = normalize_symbol_for_query(symbol); - if let Some(reason) = skip_reason { - // Skip this symbol type + if sec_type != SecurityType::Equity { + let reason = format!("unsupported: {}", sec_type); tokio::task::spawn_blocking({ let database_url = database_url.to_string(); let id = missing_symbol.id; @@ -257,20 +258,19 @@ async fn fetch_and_store_overview( return Ok(ResolutionResult::Skipped(reason)); } - // Fetch overview from AlphaVantage - debug!("Fetching overview for {} (query: {})", symbol, query_symbol); + // Step 1: Search for the symbol using SYMBOL_SEARCH endpoint + debug!("Searching for symbol: {} (query: {})", symbol, query_symbol); - let overview_result = client.fundamentals().company_overview(&query_symbol).await; + let search_result = client.time_series().symbol_search(&query_symbol).await; - match overview_result { - Ok(overview) => { - // Check if overview has meaningful data - if overview.symbol.is_empty() { - // Symbol not found in AlphaVantage + let found_symbol = match search_result { + Ok(search_response) => { + if search_response.best_matches.is_empty() { + // No matches found in symbol search tokio::task::spawn_blocking({ let database_url = database_url.to_string(); let id = missing_symbol.id; - let details = Some("Not found in AlphaVantage API".to_string()); + let details = Some("No matches found in symbol search".to_string()); move || -> Result<()> { let mut conn = diesel::PgConnection::establish(&database_url)?; @@ -280,133 +280,201 @@ async fn fetch_and_store_overview( }) .await??; - return Ok(ResolutionResult::NotFound("Symbol not found in AlphaVantage".to_string())); + return Ok(ResolutionResult::NotFound("No matches in symbol search".to_string())); } - // Use av-core's SecurityType mapping - let security_type = SecurityType::from_alpha_vantage(&overview.asset_type); + // Step 2: Select the best match (first one with highest match score) + let best_match = search_response + .best_matches + .iter() + .max_by(|a, b| { + let score_a = a.match_score.parse::().unwrap_or(0.0); + let score_b = b.match_score.parse::().unwrap_or(0.0); + score_a.partial_cmp(&score_b).unwrap_or(std::cmp::Ordering::Equal) + }) + .unwrap(); // Safe because we checked is_empty above + + info!( + "Found match: {} ({}) - Type: {}, Score: {}", + best_match.symbol, best_match.name, best_match.stock_type, best_match.match_score + ); + + best_match.clone() + } + Err(e) => { + // Handle search API errors + let error_msg = e.to_string(); + + if error_msg.contains("rate limit") || error_msg.contains("429") { + return Err(anyhow::anyhow!("Rate limit exceeded")); + } + + if error_msg.contains("Invalid API") { + return Err(anyhow::anyhow!("API key invalid or missing")); + } - // Skip "Other" types as we don't know how to handle them - if security_type == SecurityType::Other { + // Unknown error + if auto_skip { tokio::task::spawn_blocking({ let database_url = database_url.to_string(); let id = missing_symbol.id; - let asset_type = overview.asset_type.clone(); - let details = Some(format!("Skipped: Unknown asset_type '{}'", asset_type)); + let reason = Some(format!("Search API error: {}", error_msg)); move || -> Result<()> { let mut conn = diesel::PgConnection::establish(&database_url)?; - MissingSymbol::mark_skipped(&mut conn, id, details)?; + MissingSymbol::mark_skipped(&mut conn, id, reason)?; Ok(()) } }) .await??; - return Ok(ResolutionResult::Skipped(format!( - "Unknown asset_type: '{}'", - overview.asset_type - ))); + return Ok(ResolutionResult::Skipped(format!("Search API error: {}", error_msg))); + } else { + return Err(e.into()); } + } + }; - // Symbol found - insert into symbols table - let sid = tokio::task::spawn_blocking({ - let database_url = database_url.to_string(); - let symbol_str = overview.symbol.clone(); - let name = - if overview.name.is_empty() { symbol_str.clone() } else { overview.name.clone() }; - let asset_type = overview.asset_type.clone(); - let new_sid = sid_generator.next_sid(security_type); - - move || -> Result { - let mut conn = diesel::PgConnection::establish(&database_url)?; - - // Check if symbol already exists - let existing_sid: Option = symbols::table - .filter(symbols::symbol.eq(&symbol_str)) - .select(symbols::sid) - .first(&mut conn) - .optional()?; - - if let Some(sid) = existing_sid { - info!("Symbol {} already exists with SID: {}", symbol_str, sid); - return Ok(sid); - } - - // Insert new symbol using the pattern from securities.rs - let new_symbol = NewSymbolOwned { - sid: new_sid, - symbol: symbol_str.clone(), - priority: NO_PRIORITY, - name: name.clone(), - sec_type: format!("{:?}", security_type), // Matches securities.rs pattern - region: "US".to_string(), - currency: "USD".to_string(), - overview: true, - intraday: false, - summary: false, - c_time: chrono::Utc::now().naive_utc(), - m_time: chrono::Utc::now().naive_utc(), - }; - - diesel::insert_into(symbols::table).values(&new_symbol).execute(&mut conn)?; - - info!( - "Inserted new symbol: {} with SID: {} (type: {}, asset_type: {})", - symbol_str, new_sid, security_type, asset_type - ); - Ok(new_sid) - } - }) - .await??; - - // Mark as found in missing_symbols table - tokio::task::spawn_blocking({ - let database_url = database_url.to_string(); - let id = missing_symbol.id; - let asset_type = overview.asset_type.clone(); - let details = Some(format!("Loaded from AlphaVantage as {}", asset_type)); - - move || -> Result<()> { - let mut conn = diesel::PgConnection::establish(&database_url)?; - MissingSymbol::mark_found(&mut conn, id, sid, details)?; - Ok(()) - } - }) - .await??; + // Step 3: Determine security type from search result + let security_type = SecurityType::from_alpha_vantage(&found_symbol.stock_type); - Ok(ResolutionResult::Found(sid)) - } - Err(e) => { - // Check if it's a rate limit or API error - let error_msg = e.to_string(); + // Skip "Other" types as we don't know how to handle them + if security_type == SecurityType::Other { + tokio::task::spawn_blocking({ + let database_url = database_url.to_string(); + let id = missing_symbol.id; + let stock_type = found_symbol.stock_type.clone(); + let details = Some(format!("Skipped: Unknown stock_type '{}'", stock_type)); - if error_msg.contains("rate limit") || error_msg.contains("429") { - return Err(anyhow::anyhow!("Rate limit exceeded")); + move || -> Result<()> { + let mut conn = diesel::PgConnection::establish(&database_url)?; + MissingSymbol::mark_skipped(&mut conn, id, details)?; + Ok(()) } + }) + .await??; - if error_msg.contains("Invalid API") { - return Err(anyhow::anyhow!("API key invalid or missing")); + return Ok(ResolutionResult::Skipped(format!( + "Unknown stock_type: '{}'", + found_symbol.stock_type + ))); + } + + // Step 4: Check if symbol already exists (by symbol AND sec_type) + let sid = tokio::task::spawn_blocking({ + let database_url = database_url.to_string(); + let symbol_str = found_symbol.symbol.clone(); + let name = found_symbol.name.clone(); + let region_raw = found_symbol.region.clone(); + let currency_raw = found_symbol.currency.clone(); + let sec_type_str = format!("{:?}", security_type); + let new_sid = sid_generator.next_sid(security_type); + + move || -> Result { + let mut conn = diesel::PgConnection::establish(&database_url)?; + + // Check if symbol already exists with this sec_type (composite uniqueness) + let existing_sid: Option = symbols::table + .filter(symbols::symbol.eq(&symbol_str)) + .filter(symbols::sec_type.eq(&sec_type_str)) + .select(symbols::sid) + .first(&mut conn) + .optional()?; + + if let Some(sid) = existing_sid { + info!( + "Symbol {} already exists with sec_type {} and SID: {}", + symbol_str, sec_type_str, sid + ); + return Ok(sid); } - // Unknown error - if auto_skip { + // Normalize region and currency to fit database constraints (varchar(10)) + let region = normalize_alpha_region(®ion_raw); + let currency = + if currency_raw.len() > 10 { currency_raw[..10].to_string() } else { currency_raw }; + + // Insert new symbol + let new_symbol = NewSymbolOwned { + sid: new_sid, + symbol: symbol_str.clone(), + priority: NO_PRIORITY, + name: name.clone(), + sec_type: sec_type_str.clone(), + region: region.clone(), + currency: currency.clone(), + overview: false, // Will be set to true after fetching overview + intraday: false, + summary: false, + c_time: chrono::Utc::now().naive_utc(), + m_time: chrono::Utc::now().naive_utc(), + }; + + diesel::insert_into(symbols::table).values(&new_symbol).execute(&mut conn)?; + + info!( + "Inserted new symbol: {} with SID: {} (sec_type: {}, region: {})", + symbol_str, new_sid, sec_type_str, region + ); + Ok(new_sid) + } + }) + .await??; + + // Mark as found in missing_symbols table (found via symbol search) + tokio::task::spawn_blocking({ + let database_url = database_url.to_string(); + let id = missing_symbol.id; + let symbol_info = format!("{} ({})", found_symbol.name, found_symbol.stock_type); + let details = Some(format!("Resolved via symbol search as {}", symbol_info)); + + move || -> Result<()> { + let mut conn = diesel::PgConnection::establish(&database_url)?; + MissingSymbol::mark_found(&mut conn, id, sid, details)?; + Ok(()) + } + }) + .await??; + + // Step 5: Try to fetch overview (best effort - optional additional data) + debug!("Fetching overview for found symbol: {}", found_symbol.symbol); + + match client.fundamentals().company_overview(&found_symbol.symbol).await { + Ok(overview) => { + if !overview.symbol.is_empty() { + // Update the symbol record to mark overview=true tokio::task::spawn_blocking({ let database_url = database_url.to_string(); - let id = missing_symbol.id; - let reason = Some(format!("API error: {}", error_msg)); + let symbol_sid = sid; move || -> Result<()> { let mut conn = diesel::PgConnection::establish(&database_url)?; - MissingSymbol::mark_skipped(&mut conn, id, reason)?; + + diesel::update(symbols::table.filter(symbols::sid.eq(symbol_sid))) + .set(symbols::overview.eq(true)) + .execute(&mut conn)?; + Ok(()) } }) .await??; - Ok(ResolutionResult::Skipped(format!("API error: {}", error_msg))) + info!( + "Successfully fetched and stored overview for {} (SID: {})", + found_symbol.symbol, sid + ); } else { - Err(e.into()) + debug!("Overview returned empty for {} - skipping", found_symbol.symbol); } } + Err(e) => { + // Overview fetch failed, but symbol is already found and in the table + debug!( + "Failed to fetch overview for {} (SID: {}): {} - continuing", + found_symbol.symbol, sid, e + ); + } } + + Ok(ResolutionResult::Found(sid)) }