Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ authors = ["Dwight Browne <dwight@dwightjbrowne.com"]
edition = "2021"
rust-version = "1.86.0"
readme = "README.md"
repository = "https://github.com/dbrowne/PLACEHOLDER!!!!"
homepage = "https://github.com/dbrowne/PLACHOLDER!!!!"
repository = "https://github.com/dbrowne/alphavantage"
homepage = "https://github.com/dbrowne/alphavantage"
keywords = ["alphavantage", "finance", "api", "trading", "market-data"]
categories = ["api-bindings", "finance"]
license = "MIT"
Expand Down
8 changes: 8 additions & 0 deletions DOT_env_EXAMPLE
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
DATABASE_URL=postgres://ts_user:dev_pw@localhost:6433/sec_master
OTHER_LISTED=/YOUR_PATH/alphavantage/data/nyse-listed.csv
ALPHA_VANTAGE_API_KEY="YOUR_API_KEY"
NASDAQ_LISTED=/YOUR_PATH/alphavantage/data/nasdaq-listed_csv.csv
SOSOVALUE_API_KEY="YOUR_KEY"
CMC_API_KEY="YOUR_KEY"
GITHUB_TOKEN="YOUR_KEY"
COINGECKO_API_KEY="YOUR_KEY"
6 changes: 3 additions & 3 deletions crates/av-cli/src/commands/load/crypto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,8 @@ pub async fn execute(args: CryptoArgs, config: Config) -> Result<()> {
// Create database context and crypto repository
let db_context = av_database_postgres::repository::DatabaseContext::new(&config.database_url)
.map_err(|e| anyhow::anyhow!("Failed to create database context: {}", e))?;
let crypto_repo: Arc<dyn av_database_postgres::repository::CryptoRepository> =
Arc::new(db_context.crypto_repository());
let _crypto_repo: Arc<dyn av_database_postgres::repository::CryptoRepository> =
Arc::new(db_context.crypto_repository()); // todo: Fix this!!!

// Create cache repository for API response caching
let cache_repo: Arc<dyn av_database_postgres::repository::CacheRepository> =
Expand Down Expand Up @@ -664,7 +664,7 @@ async fn save_crypto_symbols_to_db(
let new_symbol = NewSymbolOwned {
sid: new_sid,
symbol: crypto_symbol.symbol.clone(),
priority: crypto_symbol.priority.clone(),
priority: crypto_symbol.priority,
name: crypto_symbol.name.clone(),
sec_type: "Cryptocurrency".to_string(),
region: "Global".to_string(), // ADD this line
Expand Down
6 changes: 3 additions & 3 deletions crates/av-cli/src/commands/load/crypto_intraday.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ async fn get_latest_timestamps(
async fn save_crypto_intraday_prices_optimized(
config: &Config,
prices: Vec<CryptoIntradayPriceData>,
update_existing: bool,
_update_existing: bool, //todo: fix this!!
update_symbols: bool,
check_each_record: bool,
latest_timestamps: HashMap<i64, DateTime<Utc>>,
Expand Down Expand Up @@ -187,10 +187,10 @@ async fn save_crypto_intraday_prices_optimized(
// Group prices by symbol for efficient processing
let mut prices_by_symbol: HashMap<i64, Vec<CryptoIntradayPriceData>> = HashMap::new();
for price in prices {
prices_by_symbol.entry(price.sid).or_insert_with(Vec::new).push(price);
prices_by_symbol.entry(price.sid).or_default().push(price);
}

for (sid, mut symbol_prices) in prices_by_symbol {
for (sid, symbol_prices) in prices_by_symbol {
let original_count = symbol_prices.len();
let latest_existing = latest_timestamps.get(&sid);

Expand Down
2 changes: 1 addition & 1 deletion crates/av-cli/src/commands/load/crypto_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ async fn save_metadata_to_db(
.first::<i64>(&mut conn)
.optional()?;

if let Some(_) = exists {
if exists.is_some() {
if update_existing {
// Update existing record
diesel::update(crypto_metadata::table.find(meta.sid))
Expand Down
6 changes: 3 additions & 3 deletions crates/av-cli/src/commands/load/crypto_overview.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1301,7 +1301,7 @@ async fn fetch_github_data(
let caps = re.captures(link)?;
caps.get(1)?.as_str().parse::<i32>().ok()
})
.or_else(|| Some(1))
.or(Some(1))
} else {
None
};
Expand Down Expand Up @@ -1332,7 +1332,7 @@ async fn fetch_github_data(
let caps = re.captures(link)?;
caps.get(1)?.as_str().parse::<i32>().ok()
})
.or_else(|| Some(1))
.or(Some(1))
} else {
None
};
Expand All @@ -1359,7 +1359,7 @@ async fn fetch_github_data(
let caps = re.captures(link)?;
caps.get(1)?.as_str().parse::<i32>().ok()
})
.or_else(|| Some(1))
.or(Some(1))
} else {
None
};
Expand Down
10 changes: 5 additions & 5 deletions crates/av-cli/src/commands/load/crypto_prices.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ async fn fetch_from_coingecko(
})?;

let timestamp = if let Some(ts) = coin_data.get("last_updated_at").and_then(|v| v.as_i64()) {
DateTime::from_timestamp(ts, 0).unwrap_or_else(|| Utc::now())
DateTime::from_timestamp(ts, 0).unwrap_or_else(Utc::now)
} else {
Utc::now()
};
Expand Down Expand Up @@ -530,7 +530,7 @@ async fn fetch_price_parallel(
client: &reqwest::Client,
symbol_with_mappings: &SymbolWithMappings,
source_priority: &[String],
parallel_fetch: bool,
_parallel_fetch: bool, //todo: fix this!!!
coingecko_api_key: Option<&str>,
coinmarketcap_api_key: Option<&str>,
alphavantage_api_key: Option<&str>,
Expand Down Expand Up @@ -827,7 +827,7 @@ pub async fn execute(args: CryptoPricesArgs, config: Config) -> Result<()> {
// Group mappings by sid
let mut mappings_by_sid: HashMap<i64, Vec<SymbolMapping>> = HashMap::new();
for mapping in all_mappings {
mappings_by_sid.entry(mapping.sid).or_insert_with(Vec::new).push(mapping);
mappings_by_sid.entry(mapping.sid).or_default().push(mapping);
}

// Combine symbols with their mappings
Expand All @@ -836,7 +836,7 @@ pub async fn execute(args: CryptoPricesArgs, config: Config) -> Result<()> {
.map(|(sid, symbol)| SymbolWithMappings {
sid,
symbol,
mappings: mappings_by_sid.remove(&sid).unwrap_or_else(Vec::new),
mappings: mappings_by_sid.remove(&sid).unwrap_or_default(),
})
.collect();

Expand Down Expand Up @@ -938,7 +938,7 @@ pub async fn execute(args: CryptoPricesArgs, config: Config) -> Result<()> {
}

// Try cache first
let cache_key = generate_cache_key(sid, &symbol);
let cache_key = generate_cache_key(sid, symbol);
let price = if let Some(cached) = get_cached_price(&cache_config, &cache_key).await {
cache_hits += 1;
Some(cached)
Expand Down
12 changes: 7 additions & 5 deletions crates/av-cli/src/commands/load/intraday.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
* SOFTWARE.
*/

use anyhow::Result;
use anyhow::{Result, anyhow};
use chrono::{DateTime, Utc};
use clap::Parser;
use diesel::prelude::*;
Expand Down Expand Up @@ -204,7 +204,7 @@ async fn get_latest_timestamps(
async fn save_intraday_prices_optimized(
config: &Config,
prices: Vec<IntradayPriceData>,
update_existing: bool,
_update_existing: bool, //todo: FIX THIS!!
update_symbols: bool,
check_each_record: bool,
latest_timestamps: HashMap<i64, DateTime<Utc>>,
Expand Down Expand Up @@ -232,10 +232,10 @@ async fn save_intraday_prices_optimized(
// Group prices by symbol for efficient processing
let mut prices_by_symbol: HashMap<i64, Vec<IntradayPriceData>> = HashMap::new();
for price in prices {
prices_by_symbol.entry(price.sid).or_insert_with(Vec::new).push(price);
prices_by_symbol.entry(price.sid).or_default().push(price);
}

for (sid, mut symbol_prices) in prices_by_symbol {
for (sid, symbol_prices) in prices_by_symbol {
unique_sids.insert(sid);
let original_count = symbol_prices.len();
let latest_existing = latest_timestamps.get(&sid);
Expand Down Expand Up @@ -424,7 +424,9 @@ pub async fn execute(args: IntradayArgs, config: Config) -> Result<()> {

// Configure the loader
let loader_cfg = IntradayPriceConfig {
interval: IntradayInterval::from_str(args.interval.as_str()).unwrap(),
interval: IntradayInterval::from_str(args.interval.as_str()).ok_or_else(|| {
anyhow!("Invalid interval: {}. Must be 1min, 5min, 15min, 30min, or 60min", args.interval)
})?,
extended_hours: args.extended_hours,
adjusted: args.adjusted,
month: args.month.clone(),
Expand Down
2 changes: 1 addition & 1 deletion crates/av-cli/src/commands/load/news.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ pub async fn execute(args: NewsArgs, config: Config) -> Result<()> {
return Err(anyhow!("Limit must be at least 1"));
}

let continue_on_error = if args.stop_on_error { false } else { args.continue_on_error };
let _continue_on_error = if args.stop_on_error { false } else { args.continue_on_error }; //todo:: fix this

// Create API client
let client = Arc::new(AlphaVantageClient::new(config.api_config.clone()));
Expand Down
10 changes: 7 additions & 3 deletions crates/av-cli/src/commands/update/crypto_metadata_etl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,12 @@ pub fn execute_metadata_etl(database_url: &str) -> Result<()> {
}

info!(
"ETL Complete: processed={}, basic={}, social={}, technical={}",
stats.total_processed, stats.basic_updated, stats.social_updated, stats.technical_updated
"ETL Complete: processed={}, basic={}, social={}, technical={}, errors={}",
stats.total_processed,
stats.basic_updated,
stats.social_updated,
stats.technical_updated,
stats.errors
);

Ok(())
Expand Down Expand Up @@ -272,7 +276,7 @@ fn process_social(conn: &mut PgConnection, sid: i64, data: &Value) -> Result<()>
let discord_url = links
.and_then(|l| l.get("chat_url"))
.and_then(|urls| urls.as_array())
.and_then(|arr| arr.iter().find(|url| url.as_str().map_or(false, |s| s.contains("discord"))))
.and_then(|arr| arr.iter().find(|url| url.as_str().is_some_and(|s| s.contains("discord"))))
.and_then(|v| v.as_str())
.map(|s| s.to_string());

Expand Down
11 changes: 4 additions & 7 deletions crates/av-cli/src/commands/update/crypto_update_functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,19 +42,16 @@ pub async fn update_crypto_command(args: UpdateCryptoArgs, _config: Config) -> R
}

if args.basic_only {
println!("Updating basic crypto data...");
unimplemented!("basic crypto data version date: TBD");
// TODO: Implement basic crypto data update
} else if args.social_only {
println!("Updating social crypto data...");
unimplemented!("social crypto data version date: TBD");
// TODO: Implement social crypto data update
} else if args.technical_only {
println!("Updating technical crypto data...");
unimplemented!("Technical crypto data version date: TBD");
// TODO: Implement technical crypto data update
} else {
println!("Updating all crypto data...");
unimplemented!("Updating all crypto data version date: TBD");
// TODO: Implement comprehensive crypto data update
}

println!("Crypto update completed");
Ok(())
}
2 changes: 1 addition & 1 deletion crates/av-core/src/types/market.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ impl SecurityType {
SecurityType::TreasuryBill => (TYPE_T_BILL as i64) << SHIFT_6BIT | id as i64,
SecurityType::Other => (TYPE_OTHER as i64) << SHIFT_6BIT | id as i64,
};
unsigned_result as i64
unsigned_result
}

/// Decode SecurityType from an encoded SID
Expand Down
8 changes: 7 additions & 1 deletion crates/av-loaders/src/batch_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,12 @@ pub struct BatchResult<T> {
pub total_processed: usize,
}

impl<T> Default for BatchResult<T> {
fn default() -> Self {
Self::new()
}
}

impl<T> BatchResult<T> {
pub fn new() -> Self {
Self { success: Vec::new(), failures: Vec::new(), total_processed: 0 }
Expand Down Expand Up @@ -130,7 +136,7 @@ impl BatchProcessor {
debug!("Processing {} items in batches of {}", total_items, self.config.batch_size);

let mut batch_idx = 0;
let total_batches = (total_items + self.config.batch_size - 1) / self.config.batch_size;
let total_batches = total_items.div_ceil(self.config.batch_size);

// Process items in chunks by draining from the vector
while !items.is_empty() {
Expand Down
2 changes: 1 addition & 1 deletion crates/av-loaders/src/crypto/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ impl CryptoDbLoader {
let mut symbol_groups: HashMap<String, Vec<&mut CryptoSymbolForDb>> = HashMap::new();

for token in tokens.iter_mut() {
symbol_groups.entry(token.symbol.clone()).or_insert_with(Vec::new).push(token);
symbol_groups.entry(token.symbol.clone()).or_default().push(token);
}

// Process each symbol group
Expand Down
4 changes: 2 additions & 2 deletions crates/av-loaders/src/crypto/intraday_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,7 @@ impl DataLoader for CryptoIntradayLoader {
let mut all_prices = Vec::new();
let mut symbols_loaded = 0;
let mut symbols_failed = 0;
let mut symbols_skipped = 0;
let symbols_skipped = 0;
let mut failed_symbols = Vec::new();

// Filter symbols if primary_only is set
Expand Down Expand Up @@ -523,7 +523,7 @@ impl DataLoader for CryptoIntradayLoader {
progress.set_message(format!("Loading {} ({})", symbol, market));

match loader
.fetch_crypto_intraday_csv(&context, &symbol, &market, &interval_str, sid)
.fetch_crypto_intraday_csv(context, &symbol, &market, &interval_str, sid)
.await
{
Ok(prices) => {
Expand Down
2 changes: 1 addition & 1 deletion crates/av-loaders/src/crypto/markets_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,7 @@ impl CryptoMarketsLoader {
// Log first 200 chars for debugging
if response_text.len() > 200 {
info!("📄 Response preview for {}: {}...", symbol.symbol, &response_text[..200]);
} else if response_text.len() > 0 {
} else if !response_text.is_empty() {
info!("📄 Full response for {}: {}", symbol.symbol, response_text);
}

Expand Down
3 changes: 1 addition & 2 deletions crates/av-loaders/src/crypto/metadata_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,8 +286,7 @@ impl CryptoMetadataLoader {
match self.load_alphavantage_metadata_fresh(symbol).await {
Ok((metadata, response, url, _status)) => {
// Cache the successful response
let response_json =
serde_json::to_value(&response).unwrap_or_else(|_| serde_json::Value::Null);
let response_json = serde_json::to_value(&response).unwrap_or(serde_json::Value::Null);

self.cache_response(cache_repo, &cache_key, "alphavantage", &url, &response_json).await;

Expand Down
6 changes: 6 additions & 0 deletions crates/av-loaders/src/csv_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ use std::path::Path;

pub struct CsvProcessor;

impl Default for CsvProcessor {
fn default() -> Self {
Self::new()
}
}

impl CsvProcessor {
pub fn new() -> Self {
Self
Expand Down
14 changes: 5 additions & 9 deletions crates/av-loaders/src/intraday_price_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,9 @@ use tokio::time::sleep;
use tracing::{debug, error, info, warn};

/// Supported intervals for intraday data
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum IntradayInterval {
#[default]
Min1,
Min5,
Min15,
Expand Down Expand Up @@ -93,12 +94,6 @@ impl IntradayInterval {
}
}

impl Default for IntradayInterval {
fn default() -> Self {
IntradayInterval::Min1 // Default to 1-minute intervals
}
}

/// Configuration for intraday price loading
#[derive(Debug, Clone)]
pub struct IntradayPriceConfig {
Expand Down Expand Up @@ -332,6 +327,7 @@ impl IntradayPriceLoader {
let expires_at = Utc::now() + chrono::Duration::hours(self.config.cache_ttl_hours as i64);

// Try to insert, if it fails due to duplicate, update instead

let insert_result = sql_query(
"INSERT INTO api_response_cache
(cache_key, api_source, endpoint_url, response_data, status_code, expires_at, cached_at)
Expand Down Expand Up @@ -448,7 +444,7 @@ impl IntradayPriceLoader {
/// Fetch intraday data in CSV format from API or cache
async fn fetch_intraday_csv(
&self,
context: &LoaderContext,
_context: &LoaderContext,
symbol: &str,
interval: &str,
month: Option<&str>,
Expand Down Expand Up @@ -595,7 +591,7 @@ impl DataLoader for IntradayPriceLoader {
progress.set_message(format!("Loading {}", symbol));

match loader
.fetch_intraday_csv(&context, &symbol, &interval_str, month.as_deref(), sid)
.fetch_intraday_csv(context, &symbol, &interval_str, month.as_deref(), sid)
.await
{
Ok(prices) => {
Expand Down
Loading