diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..3e54262 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,16 @@ +.git +.venv +venv +__pycache__ +*.pyc +*.pyo +*.pyd +*.db +*.sqlite +*.log +.env +.DS_Store +.pytest_cache +.mypy_cache +.streamlit/cache +.streamlit/state diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..27736a2 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,29 @@ +name: CI + +on: + push: + pull_request: + +jobs: + tests: + runs-on: ubuntu-latest + + steps: + - name: Check out repository + uses: actions/checkout@v3 + + - name: Set up Python + uses: actions/setup-python@v4 + with: + python-version: "3.11" + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements.txt + + - name: Compile sources + run: python -m py_compile $(git ls-files '*.py') + + - name: Run tests + run: pytest -q diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..0c9b120 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,13 @@ +FROM python:3.11-slim + +WORKDIR /app +ENV PIP_NO_CACHE_DIR=1 + +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +COPY . . + +EXPOSE 8501 + +CMD ["streamlit", "run", "streamlit_app.py", "--server.port=8501", "--server.address=0.0.0.0"] diff --git a/README.md b/README.md index ded6b9c..17ac1a1 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,6 @@ # QuantBoard — Análisis técnico y Backtesting +[![CI](https://github.com/felipeimpieri/quantboard/actions/workflows/ci.yml/badge.svg)](https://github.com/felipeimpieri/quantboard/actions/workflows/ci.yml) Dashboard interactivo hecho con **Streamlit + yfinance + Plotly** para analizar precios, aplicar indicadores y correr backtests simples. > **v0.2 – Novedades** @@ -50,3 +51,12 @@ source .venv/bin/activate python -m pip install --upgrade pip pip install -r requirements.txt python -m streamlit run streamlit_app.py + +### Ejecutar con Docker +```bash +# Construir la imagen +docker build -t quantboard:latest . + +# Ejecutar el contenedor (Streamlit en http://localhost:8501) +docker run --rm -p 8501:8501 quantboard:latest +``` diff --git a/pages/01_Watchlist.py b/pages/01_Watchlist.py index 38850f9..999752e 100644 --- a/pages/01_Watchlist.py +++ b/pages/01_Watchlist.py @@ -3,14 +3,16 @@ import pandas as pd import streamlit as st -from quantboard.data import get_prices +from quantboard.data import get_prices_cached from quantboard.features.watchlist import load_watchlist, save_watchlist +from quantboard.ui.state import set_param, shareable_link_button from quantboard.ui.theme import apply_global_theme st.set_page_config(page_title="Watchlist", page_icon="👀", layout="wide") apply_global_theme() st.title("Watchlist") +shareable_link_button() watchlist = load_watchlist() @@ -37,7 +39,7 @@ def load_data(tickers: list[str]) -> pd.DataFrame: start = (datetime.today() - timedelta(days=30)).date() end = datetime.today().date() for tick in tickers: - df = get_prices(tick, start=start, end=end, interval="1d") + df = get_prices_cached(tick, start=start, end=end, interval="1d") if df.empty or "close" not in df.columns: continue close = pd.to_numeric(df["close"], errors="coerce").dropna() @@ -60,7 +62,7 @@ def load_data(tickers: list[str]) -> pd.DataFrame: c2.write(f"{row['Last price']:.2f}") c3.write(f"{row['30d %']:.2f}%") if c4.button("Open in Home", key=f"open_{row['Ticker']}"): - st.experimental_set_query_params(ticker=row["Ticker"]) + set_param("ticker", row["Ticker"]) try: st.switch_page("streamlit_app.py") except Exception: diff --git a/pages/02_SMA_Heatmap.py b/pages/02_SMA_Heatmap.py index 3c978b1..e808b46 100644 --- a/pages/02_SMA_Heatmap.py +++ b/pages/02_SMA_Heatmap.py @@ -1,11 +1,15 @@ +"""SMA heatmap optimisation page.""" +from __future__ import annotations + from datetime import date, timedelta import pandas as pd import streamlit as st -from quantboard.data import get_prices +from quantboard.data import get_prices_cached from quantboard.optimize import grid_search_sma from quantboard.plots import heatmap_metric +from quantboard.ui.state import get_param, set_param, shareable_link_button from quantboard.ui.theme import apply_global_theme st.set_page_config(page_title="SMA Heatmap", page_icon="🔥", layout="wide") @@ -13,38 +17,73 @@ def _validate_prices(df: pd.DataFrame) -> pd.Series | None: + """Return a cleaned *close* series or ``None`` when empty.""" if df.empty or "close" not in df.columns: st.error("No data for the selected range/interval.") return None + close = pd.to_numeric(df["close"], errors="coerce").dropna() if close.empty: st.error("No data for the selected range/interval.") return None + return close +@st.cache_data(ttl=60) +def _load_prices(ticker: str, start: date, end: date) -> pd.DataFrame: + return get_prices_cached(ticker, start=start, end=end, interval="1d") + + def main() -> None: st.title("🔥 SMA Heatmap") + shareable_link_button() + + today = date.today() + default_start = today - timedelta(days=365 * 2) + + ticker_default = str(get_param("ticker", "AAPL")).strip().upper() or "AAPL" + end_default = get_param("heat_end", today) + start_default = get_param("heat_start", default_start) + fast_min_default = int(get_param("heat_fast_min", 10)) + fast_max_default = int(get_param("heat_fast_max", 25)) + slow_min_default = int(get_param("heat_slow_min", 50)) + slow_max_default = int(get_param("heat_slow_max", 120)) + + fast_min_default = max(5, min(60, fast_min_default)) + fast_max_default = max(fast_min_default, min(60, fast_max_default)) + slow_min_default = max(20, min(240, slow_min_default)) + slow_max_default = max(slow_min_default, min(240, slow_max_default)) + with st.sidebar: st.header("Parameters") - ticker = st.text_input("Ticker", value="AAPL").strip().upper() - end = st.date_input("To", value=date.today()) - start = st.date_input("From", value=date.today() - timedelta(days=365 * 2)) - fast_min, fast_max = st.slider("Fast SMA range", 5, 60, (10, 25)) - slow_min, slow_max = st.slider("Slow SMA range", 20, 240, (50, 120)) - run_btn = st.button("Run search", type="primary") + with st.form("heatmap_form"): + ticker = st.text_input("Ticker", value=ticker_default).strip().upper() + end = st.date_input("To", value=end_default) + start = st.date_input("From", value=start_default) + fast_min, fast_max = st.slider("Fast SMA range", 5, 60, (fast_min_default, fast_max_default)) + slow_min, slow_max = st.slider("Slow SMA range", 20, 240, (slow_min_default, slow_max_default)) + submitted = st.form_submit_button("Run search", type="primary") + + set_param("ticker", ticker or None) + set_param("heat_end", end) + set_param("heat_start", start) + set_param("heat_fast_min", int(fast_min)) + set_param("heat_fast_max", int(fast_max)) + set_param("heat_slow_min", int(slow_min)) + set_param("heat_slow_max", int(slow_max)) + + if not submitted: + st.info("Choose parameters and click **Run search**.") + return if fast_min >= slow_min: st.error("Fast SMA range must stay below the Slow SMA range.") return - if not run_btn: - st.info("Choose parameters and click **Run search**.") - return - with st.spinner("Fetching data..."): - df = get_prices(ticker, start=start, end=end, interval="1d") + df = _load_prices(ticker, start=start, end=end) close = _validate_prices(df) if close is None: @@ -57,28 +96,61 @@ def main() -> None: slow_range=range(int(slow_min), int(slow_max) + 1), metric="Sharpe", ) - # Invalidate combinations where fast >= slow - for f in z.index: - for s in z.columns: - if int(f) >= int(s): - z.loc[f, s] = float("nan") + + # Invalidate combinations where fast >= slow + for fast_window in z.index: + for slow_window in z.columns: + if int(fast_window) >= int(slow_window): + z.loc[fast_window, slow_window] = float("nan") st.subheader("Heatmap (Sharpe)") st.plotly_chart(heatmap_metric(z, title="SMA grid — Sharpe"), use_container_width=True) - # Best combination ignoring NaNs - best = z.stack().dropna().astype(float).idxmax() if z.stack().dropna().size else None - if best: - f_best, s_best = map(int, best) - st.success(f"Best combo: **Fast SMA {f_best} / Slow SMA {s_best}**") - if st.button("Use in Home"): - st.experimental_set_query_params(ticker=ticker) - try: - st.switch_page("streamlit_app.py") - except Exception: - st.info("Open Home from the menu; the ticker was set.") - else: + stacked = z.stack().dropna().astype(float) + if stacked.empty: st.warning("No valid combination found in the selected range.") + return + + f_best, s_best = map(int, stacked.idxmax()) + st.success(f"Best combo: **Fast SMA {f_best} / Slow SMA {s_best}**") + + top_df = ( + stacked.sort_values(ascending=False) + .head(10) + .rename_axis(("fast", "slow")) + .reset_index(name="Sharpe") + ) + + st.subheader(f"Top {len(top_df)} combinations") + header_cols = st.columns([1, 1, 1, 1.5]) + header_cols[0].markdown("**Fast**") + header_cols[1].markdown("**Slow**") + header_cols[2].markdown("**Sharpe**") + header_cols[3].markdown("**Action**") + + for idx, row in top_df.iterrows(): + fast_val = int(row["fast"]) + slow_val = int(row["slow"]) + sharpe_val = float(row["Sharpe"]) + cols = st.columns([1, 1, 1, 1.5]) + cols[0].write(fast_val) + cols[1].write(slow_val) + cols[2].write(f"{sharpe_val:.2f}") + if cols[3].button("Run Backtest", key=f"run_backtest_{idx}"): + set_param("ticker", ticker or None) + set_param("fast", int(fast_val)) + set_param("slow", int(slow_val)) + try: + st.switch_page("pages/03_Backtest.py") + except Exception: # pragma: no cover - depends on Streamlit runtime + st.info("Open Backtest from the menu; the parameters were set.") + + if st.button("Use in Home"): + set_param("ticker", ticker or None) + try: + st.switch_page("streamlit_app.py") + except Exception: # pragma: no cover - depends on Streamlit runtime + st.info("Open Home from the menu; the ticker was set.") main() diff --git a/pages/03_Backtest.py b/pages/03_Backtest.py index 916fda9..31a4b68 100644 --- a/pages/03_Backtest.py +++ b/pages/03_Backtest.py @@ -8,14 +8,15 @@ import streamlit as st from quantboard.backtest import run_backtest -from quantboard.data import get_prices +from quantboard.data import get_prices_cached from quantboard.indicators import sma from quantboard.plots import apply_plotly_theme +from quantboard.ui.state import get_param, set_param, shareable_link_button from quantboard.ui.theme import apply_global_theme try: from quantboard.strategies import signals_sma_crossover -except Exception: # pragma: no cover - fallback when optional module is missing +except Exception: # pragma: no cover - optional dependency guard signals_sma_crossover = None # type: ignore[assignment] st.set_page_config(page_title="Backtest", page_icon="🧪", layout="wide") @@ -29,84 +30,129 @@ def _validate_prices(df: pd.DataFrame) -> pd.DataFrame | None: return df +def _clean_prices(df: pd.DataFrame) -> pd.DataFrame | None: + price_cols = ["open", "high", "low", "close", "volume"] + numeric = {col: pd.to_numeric(df.get(col), errors="coerce") for col in price_cols if col in df} + clean = df.assign(**numeric).dropna(subset=["close"]) + if clean.empty: + st.error("No price data available after cleaning.") + return None + return clean + + +def _run_signals(close: pd.Series, fast: int, slow: int) -> pd.Series: + if signals_sma_crossover is not None: + sig, _ = signals_sma_crossover(close, fast=fast, slow=slow) + return sig + + sma_fast = sma(close, fast) + sma_slow = sma(close, slow) + cross_up = (sma_fast > sma_slow) & (sma_fast.shift(1) <= sma_slow.shift(1)) + cross_dn = (sma_fast < sma_slow) & (sma_fast.shift(1) >= sma_slow.shift(1)) + sig = pd.Series(0.0, index=close.index) + sig = sig.where(~cross_up, 1.0) + sig = sig.where(~cross_dn, -1.0) + return sig.replace(0, pd.NA).ffill().fillna(0.0) + + def main() -> None: st.title("Backtest — SMA crossover") + shareable_link_button() + + today = date.today() + default_start = today - timedelta(days=365 * 2) + + ticker_default = str(get_param("ticker", "AAPL")).strip().upper() or "AAPL" + end_default = get_param("bt_end", today) + start_default = get_param("bt_start", default_start) + interval_options = ["1d", "1h", "1wk", "1m"] + interval_default = str(get_param("interval", "1d")) + if interval_default not in interval_options: + interval_default = "1d" + fast_default = int(get_param("fast", 20)) + slow_default = int(get_param("slow", 50)) + fee_default = int(get_param("fee_bps", 0)) + slip_default = int(get_param("slippage_bps", 0)) + + fast_default = max(5, min(200, fast_default)) + slow_default = max(10, min(400, slow_default)) + fee_default = max(0, min(50, fee_default)) + slip_default = max(0, min(50, slip_default)) + with st.sidebar: st.header("Parameters") - ticker = st.text_input("Ticker", value="AAPL").strip().upper() - end = st.date_input("To", value=date.today()) - start = st.date_input("From", value=date.today() - timedelta(days=365 * 2)) - interval = st.selectbox("Interval", ["1d", "1h", "1wk", "1m"], index=0) - fast = st.number_input("Fast SMA", 5, 200, 20, step=1) - slow = st.number_input("Slow SMA", 10, 400, 50, step=1) - fee_bps = st.number_input("Fees (bps)", 0, 50, 0, step=1) - slip_bps = st.number_input("Slippage (bps)", 0, 50, 0, step=1) - run_btn = st.button("Run backtest", type="primary") + with st.form("backtest_form"): + ticker = st.text_input("Ticker", value=ticker_default).strip().upper() + end = st.date_input("To", value=end_default) + start = st.date_input("From", value=start_default) + interval = st.selectbox("Interval", interval_options, index=interval_options.index(interval_default)) + fast = st.number_input("Fast SMA", 5, 200, int(fast_default), step=1) + slow = st.number_input("Slow SMA", 10, 400, int(slow_default), step=1) + fee_bps = st.number_input("Fees (bps)", 0, 50, int(fee_default), step=1) + slip_bps = st.number_input("Slippage (bps)", 0, 50, int(slip_default), step=1) + submitted = st.form_submit_button("Run backtest", type="primary") + + set_param("ticker", ticker or None) + set_param("bt_end", end) + set_param("bt_start", start) + set_param("interval", interval) + set_param("fast", int(fast)) + set_param("slow", int(slow)) + set_param("fee_bps", int(fee_bps)) + set_param("slippage_bps", int(slip_bps)) st.info("Configure the sidebar parameters and run the backtest.") - if not run_btn: + if not submitted: + return + + if fast >= slow: + st.error("Fast SMA must be smaller than Slow SMA.") return with st.spinner("Fetching data..."): - df = get_prices(ticker, start=start, end=end, interval=interval) + df = get_prices_cached(ticker, start=start, end=end, interval=interval) df = _validate_prices(df) if df is None: return - # Series limpias - close = pd.to_numeric(df["close"], errors="coerce") - df = df.assign(close=close).dropna(subset=["close"]) + df = _clean_prices(df) + if df is None: + return + + signals = _run_signals(df["close"], fast=int(fast), slow=int(slow)) - # Signals - if signals_sma_crossover is not None: - sig, _ = signals_sma_crossover(df["close"], fast=int(fast), slow=int(slow)) - else: - # Local fallback if the shared strategies module is unavailable - sma_fast = sma(df["close"], int(fast)) - sma_slow = sma(df["close"], int(slow)) - cross_up = (sma_fast > sma_slow) & (sma_fast.shift(1) <= sma_slow.shift(1)) - cross_dn = (sma_fast < sma_slow) & (sma_fast.shift(1) >= sma_slow.shift(1)) - sig = pd.Series(0, index=df.index, dtype=float) - sig = sig.where(~cross_up, 1.0) - sig = sig.where(~cross_dn, -1.0) - sig = sig.replace(0, pd.NA).ffill().fillna(0.0) - - # Backtest bt, metrics = run_backtest( df, - signals=sig, + signals=signals, fee_bps=int(fee_bps), slippage_bps=int(slip_bps), interval=interval, ) - # --------- Charts --------- st.subheader("Price and SMAs") - sma_fast = sma(df["close"], int(fast)) sma_slow = sma(df["close"], int(slow)) price_fig = go.Figure() price_fig.add_candlestick( x=df.index, - open=pd.to_numeric(df.get("open", df["close"]), errors="coerce"), - high=pd.to_numeric(df.get("high", df["close"]), errors="coerce"), - low=pd.to_numeric(df.get("low", df["close"]), errors="coerce"), + open=df.get("open", df["close"]), + high=df.get("high", df["close"]), + low=df.get("low", df["close"]), close=df["close"], - name="OHLC", + name="ohlc", ) price_fig.add_trace( - go.Scatter(x=sma_fast.index, y=sma_fast, mode="lines", name=f"SMA {fast}") + go.Scatter(x=sma_fast.index, y=sma_fast, mode="lines", name=f"SMA {int(fast)}"), ) price_fig.add_trace( - go.Scatter(x=sma_slow.index, y=sma_slow, mode="lines", name=f"SMA {slow}") + go.Scatter(x=sma_slow.index, y=sma_slow, mode="lines", name=f"SMA {int(slow)}"), ) - # Buy/Sell markers - changes = sig.diff().fillna(0) + changes = signals.diff().fillna(0) buys = df.index[changes > 0] sells = df.index[changes < 0] price_fig.add_trace( @@ -117,7 +163,7 @@ def main() -> None: marker_symbol="triangle-up", marker_size=10, name="Buy", - ) + ), ) price_fig.add_trace( go.Scatter( @@ -127,7 +173,7 @@ def main() -> None: marker_symbol="triangle-down", marker_size=10, name="Sell", - ) + ), ) price_fig.update_layout(margin=dict(l=30, r=20, t=30, b=30), height=520) @@ -140,10 +186,41 @@ def main() -> None: apply_plotly_theme(eq_fig) st.plotly_chart(eq_fig, use_container_width=True) - # Metrics st.subheader("Metrics") - mdf = pd.DataFrame([metrics]).T.rename(columns={0: "Value"}) - st.dataframe(mdf.style.format({"Value": "{:.4f}"}), use_container_width=True) + metrics_order = [ + "CAGR", + "Sharpe", + "Sortino", + "Max Drawdown", + "Win rate", + "Avg trade return", + "Exposure (%)", + "Trades count", + ] + percent_metrics = { + "CAGR", + "Max Drawdown", + "Win rate", + "Avg trade return", + "Exposure (%)", + } + ratio_metrics = {"Sharpe", "Sortino"} + + metrics_rows = [] + for key in metrics_order: + value = metrics.get(key, 0.0) + if key == "Trades count": + display = f"{int(value)}" + elif key in percent_metrics: + display = f"{value:.2%}" + elif key in ratio_metrics: + display = f"{value:.2f}" + else: + display = f"{value:.4f}" + metrics_rows.append({"Metric": key, "Value": display}) + + metrics_df = pd.DataFrame(metrics_rows).set_index("Metric") + st.dataframe(metrics_df, use_container_width=True) main() diff --git a/pages/06_RSI_Strategy.py b/pages/06_RSI_Strategy.py new file mode 100644 index 0000000..e66c422 --- /dev/null +++ b/pages/06_RSI_Strategy.py @@ -0,0 +1,231 @@ +"""RSI strategy backtest page.""" +from __future__ import annotations + +from datetime import date, timedelta + +import pandas as pd +import plotly.graph_objects as go +import streamlit as st + +from quantboard.backtest import run_backtest +from quantboard.data import get_prices_cached +from quantboard.indicators import rsi +from quantboard.plots import apply_plotly_theme +from quantboard.ui.state import get_param, set_param, shareable_link_button +from quantboard.ui.theme import apply_global_theme + +st.set_page_config(page_title="RSI Strategy", page_icon="📈", layout="wide") +apply_global_theme() + + +def _validate_prices(df: pd.DataFrame) -> pd.DataFrame | None: + if df.empty or "close" not in df.columns: + st.error("No data for the selected range/interval.") + return None + return df + + +def _clean_prices(df: pd.DataFrame) -> pd.DataFrame | None: + price_cols = ["open", "high", "low", "close", "volume"] + numeric = {col: pd.to_numeric(df.get(col), errors="coerce") for col in price_cols if col in df} + clean = df.assign(**numeric).dropna(subset=["close"]) + if clean.empty: + st.error("No price data available after cleaning.") + return None + return clean + + +def _price_with_signals( + df: pd.DataFrame, + signals: pd.Series, + overlays: dict[str, pd.Series | pd.DataFrame] | None = None, +) -> go.Figure: + overlays = overlays or {} + fig = go.Figure() + fig.add_candlestick( + x=df.index, + open=df.get("open", df["close"]), + high=df.get("high", df["close"]), + low=df.get("low", df["close"]), + close=df["close"], + name="ohlc", + ) + + for name, series in overlays.items(): + if isinstance(series, pd.DataFrame): + for sub_name, ser in series.items(): + fig.add_trace( + go.Scatter(x=ser.index, y=ser.values, mode="lines", name=f"{name} ({sub_name})"), + ) + elif series is not None: + fig.add_trace(go.Scatter(x=series.index, y=series.values, mode="lines", name=name)) + + changes = signals.diff().fillna(signals.iloc[0] if len(signals) else 0.0) + buys = df.index[changes > 0] + sells = df.index[changes < 0] + + if len(buys): + fig.add_trace( + go.Scatter( + x=buys, + y=df.loc[buys, "close"], + mode="markers", + marker_symbol="triangle-up", + marker_size=10, + name="Buy", + ), + ) + if len(sells): + fig.add_trace( + go.Scatter( + x=sells, + y=df.loc[sells, "close"], + mode="markers", + marker_symbol="triangle-down", + marker_size=10, + name="Sell/Short", + ), + ) + + fig.update_layout(margin=dict(l=30, r=20, t=30, b=30), height=520) + return apply_plotly_theme(fig) + + +def _build_signals( + close: pd.Series, + *, + period: int, + lower: float, + upper: float, + mode: str, +) -> tuple[pd.Series, dict[str, pd.Series]]: + rsi_series = rsi(close, window=period) + go_long = (rsi_series.shift(1) < lower) & (rsi_series >= lower) + go_flat = (rsi_series.shift(1) > upper) & (rsi_series <= upper) + + signals = pd.Series(index=close.index, dtype=float) + signals.loc[go_long] = 1.0 + + if mode == "Long only": + signals.loc[go_flat] = 0.0 + else: + signals.loc[go_flat] = -1.0 + + signals = signals.ffill().fillna(0.0) + + if mode == "Long only": + signals = signals.clip(lower=0.0, upper=1.0) + else: + signals = signals.clip(-1.0, 1.0) + + return signals, {"RSI": rsi_series} + + +def main() -> None: + st.title("Backtest — RSI strategy") + + shareable_link_button() + + today = date.today() + default_start = today - timedelta(days=365 * 2) + + ticker_default = str(get_param("ticker", "AAPL")).strip().upper() or "AAPL" + end_default = get_param("rsi_end", today) + start_default = get_param("rsi_start", default_start) + interval_options = ["1d", "1h", "1wk"] + interval_default = str(get_param("interval", "1d")) + if interval_default not in interval_options: + interval_default = "1d" + period_default = int(get_param("rsi_period", 14)) + lower_default = int(get_param("rsi_lower", 30)) + upper_default = int(get_param("rsi_upper", 70)) + mode_default = str(get_param("rsi_mode", "Long only")) + fee_default = int(get_param("rsi_fee_bps", 0)) + slip_default = int(get_param("rsi_slippage_bps", 0)) + + period_default = max(2, min(100, period_default)) + lower_default = max(0, min(100, lower_default)) + upper_default = max(lower_default + 1, min(100, upper_default)) + if mode_default not in ["Long only", "Flip long/short"]: + mode_default = "Long only" + fee_default = max(0, min(50, fee_default)) + slip_default = max(0, min(50, slip_default)) + + with st.sidebar: + st.header("Parameters") + with st.form("rsi_form"): + ticker = st.text_input("Ticker", value=ticker_default).strip().upper() + end = st.date_input("To", value=end_default) + start = st.date_input("From", value=start_default) + interval = st.selectbox("Interval", interval_options, index=interval_options.index(interval_default)) + period = st.number_input("RSI period", 2, 100, int(period_default), step=1) + lower = st.number_input("Lower threshold", 0, 100, int(lower_default), step=1) + upper = st.number_input("Upper threshold", 0, 100, int(upper_default), step=1) + mode = st.selectbox("Signal mode", ["Long only", "Flip long/short"], index=["Long only", "Flip long/short"].index(mode_default)) + fee_bps = st.number_input("Fees (bps)", 0, 50, int(fee_default), step=1) + slip_bps = st.number_input("Slippage (bps)", 0, 50, int(slip_default), step=1) + submitted = st.form_submit_button("Run backtest", type="primary") + + st.info("Configure the sidebar parameters and run the backtest.") + + if not submitted: + return + + set_param("ticker", ticker or None) + set_param("rsi_end", end) + set_param("rsi_start", start) + set_param("interval", interval) + set_param("rsi_period", int(period)) + set_param("rsi_lower", int(lower)) + set_param("rsi_upper", int(upper)) + set_param("rsi_mode", mode) + set_param("rsi_fee_bps", int(fee_bps)) + set_param("rsi_slippage_bps", int(slip_bps)) + + if lower >= upper: + st.error("Lower threshold must be less than Upper threshold.") + return + + with st.spinner("Fetching data..."): + df = get_prices_cached(ticker, start=start, end=end, interval=interval) + + df = _validate_prices(df) + if df is None: + return + + df = _clean_prices(df) + if df is None: + return + + signals, overlays = _build_signals( + df["close"], + period=int(period), + lower=float(lower), + upper=float(upper), + mode=mode, + ) + + bt, metrics = run_backtest( + df, + signals=signals, + fee_bps=int(fee_bps), + slippage_bps=int(slip_bps), + interval=interval, + ) + + st.subheader("Price and signals") + price_fig = _price_with_signals(df, signals, overlays) + st.plotly_chart(price_fig, use_container_width=True) + + st.subheader("Equity curve") + eq_fig = go.Figure(go.Scatter(x=bt.index, y=bt["equity"], mode="lines", name="Equity")) + eq_fig.update_layout(margin=dict(l=30, r=20, t=30, b=30), height=320) + apply_plotly_theme(eq_fig) + st.plotly_chart(eq_fig, use_container_width=True) + + st.subheader("Metrics") + metrics_df = pd.DataFrame([metrics]).T.rename(columns={0: "Value"}) + st.dataframe(metrics_df.style.format({"Value": "{:.4f}"}), use_container_width=True) + + +main() diff --git a/pages/07_Bollinger_MR.py b/pages/07_Bollinger_MR.py new file mode 100644 index 0000000..008bfdd --- /dev/null +++ b/pages/07_Bollinger_MR.py @@ -0,0 +1,186 @@ +"""Bollinger Bands mean-reversion strategy page.""" +from __future__ import annotations + +from datetime import date, timedelta + +import pandas as pd +import plotly.graph_objects as go +import streamlit as st + +from quantboard.backtest import run_backtest +from quantboard.data import get_prices_cached +from quantboard.plots import apply_plotly_theme +from quantboard.strategies import signals_bollinger_mean_reversion +from quantboard.ui.state import get_param, set_param, shareable_link_button +from quantboard.ui.theme import apply_global_theme + +st.set_page_config(page_title="Bollinger Mean Reversion", page_icon="📊", layout="wide") +apply_global_theme() + + +def _validate_prices(df: pd.DataFrame) -> pd.DataFrame | None: + if df.empty or "close" not in df.columns: + st.error("No data for the selected range/interval.") + return None + return df + + +def _clean_prices(df: pd.DataFrame) -> pd.DataFrame | None: + price_cols = ["open", "high", "low", "close", "volume"] + numeric = {col: pd.to_numeric(df.get(col), errors="coerce") for col in price_cols if col in df} + clean = df.assign(**numeric).dropna(subset=["close"]) + if clean.empty: + st.error("No price data available after cleaning.") + return None + return clean + + +def _price_with_signals( + df: pd.DataFrame, + signals: pd.Series, + overlays: dict[str, pd.Series | pd.DataFrame] | None = None, +) -> go.Figure: + overlays = overlays or {} + fig = go.Figure() + fig.add_candlestick( + x=df.index, + open=df.get("open", df["close"]), + high=df.get("high", df["close"]), + low=df.get("low", df["close"]), + close=df["close"], + name="ohlc", + ) + + for name, series in overlays.items(): + if isinstance(series, pd.DataFrame): + for sub_name, ser in series.items(): + fig.add_trace( + go.Scatter(x=ser.index, y=ser.values, mode="lines", name=f"{name} ({sub_name})"), + ) + elif series is not None: + fig.add_trace(go.Scatter(x=series.index, y=series.values, mode="lines", name=name)) + + changes = signals.diff().fillna(signals.iloc[0] if len(signals) else 0.0) + buys = df.index[changes > 0] + sells = df.index[changes < 0] + + if len(buys): + fig.add_trace( + go.Scatter( + x=buys, + y=df.loc[buys, "close"], + mode="markers", + marker_symbol="triangle-up", + marker_size=10, + name="Buy", + ), + ) + if len(sells): + fig.add_trace( + go.Scatter( + x=sells, + y=df.loc[sells, "close"], + mode="markers", + marker_symbol="triangle-down", + marker_size=10, + name="Sell", + ), + ) + + fig.update_layout(margin=dict(l=30, r=20, t=30, b=30), height=520) + return apply_plotly_theme(fig) + + +def main() -> None: + st.title("Backtest — Bollinger mean reversion") + + shareable_link_button() + + today = date.today() + default_start = today - timedelta(days=365 * 2) + + ticker_default = str(get_param("ticker", "AAPL")).strip().upper() or "AAPL" + end_default = get_param("bb_end", today) + start_default = get_param("bb_start", default_start) + interval_options = ["1d", "1h", "1wk"] + interval_default = str(get_param("interval", "1d")) + if interval_default not in interval_options: + interval_default = "1d" + window_default = int(get_param("bb_window", 20)) + n_std_default = float(get_param("bb_n_std", 2.0)) + fee_default = int(get_param("bb_fee_bps", 0)) + slip_default = int(get_param("bb_slippage_bps", 0)) + + window_default = max(5, min(200, window_default)) + n_std_default = max(1.0, min(5.0, n_std_default)) + fee_default = max(0, min(50, fee_default)) + slip_default = max(0, min(50, slip_default)) + + with st.sidebar: + st.header("Parameters") + with st.form("bb_form"): + ticker = st.text_input("Ticker", value=ticker_default).strip().upper() + end = st.date_input("To", value=end_default) + start = st.date_input("From", value=start_default) + interval = st.selectbox("Interval", interval_options, index=interval_options.index(interval_default)) + window = st.number_input("Window", 5, 200, int(window_default), step=1) + n_std = st.number_input("Std dev", 1.0, 5.0, float(n_std_default), step=0.5) + fee_bps = st.number_input("Fees (bps)", 0, 50, int(fee_default), step=1) + slip_bps = st.number_input("Slippage (bps)", 0, 50, int(slip_default), step=1) + submitted = st.form_submit_button("Run backtest", type="primary") + + st.info("Configure the sidebar parameters and run the backtest.") + + if not submitted: + return + + set_param("ticker", ticker or None) + set_param("bb_end", end) + set_param("bb_start", start) + set_param("interval", interval) + set_param("bb_window", int(window)) + set_param("bb_n_std", float(n_std)) + set_param("bb_fee_bps", int(fee_bps)) + set_param("bb_slippage_bps", int(slip_bps)) + + with st.spinner("Fetching data..."): + df = get_prices_cached(ticker, start=start, end=end, interval=interval) + + df = _validate_prices(df) + if df is None: + return + + df = _clean_prices(df) + if df is None: + return + + signals, overlays = signals_bollinger_mean_reversion( + df["close"], + window=int(window), + n_std=float(n_std), + ) + + bt, metrics = run_backtest( + df, + signals=signals, + fee_bps=int(fee_bps), + slippage_bps=int(slip_bps), + interval=interval, + ) + + st.subheader("Price and bands") + price_fig = _price_with_signals(df, signals, overlays) + st.plotly_chart(price_fig, use_container_width=True) + + st.subheader("Equity curve") + eq_fig = go.Figure(go.Scatter(x=bt.index, y=bt["equity"], mode="lines", name="Equity")) + eq_fig.update_layout(margin=dict(l=30, r=20, t=30, b=30), height=320) + apply_plotly_theme(eq_fig) + st.plotly_chart(eq_fig, use_container_width=True) + + st.subheader("Metrics") + metrics_df = pd.DataFrame([metrics]).T.rename(columns={0: "Value"}) + st.dataframe(metrics_df.style.format({"Value": "{:.4f}"}), use_container_width=True) + + +main() diff --git a/pages/08_Donchian_Breakout.py b/pages/08_Donchian_Breakout.py new file mode 100644 index 0000000..4111876 --- /dev/null +++ b/pages/08_Donchian_Breakout.py @@ -0,0 +1,183 @@ +"""Donchian channel breakout strategy page.""" +from __future__ import annotations + +from datetime import date, timedelta + +import pandas as pd +import plotly.graph_objects as go +import streamlit as st + +from quantboard.backtest import run_backtest +from quantboard.data import get_prices_cached +from quantboard.plots import apply_plotly_theme +from quantboard.strategies import signals_donchian_breakout +from quantboard.ui.state import get_param, set_param, shareable_link_button +from quantboard.ui.theme import apply_global_theme + +st.set_page_config(page_title="Donchian Breakout", page_icon="📣", layout="wide") +apply_global_theme() + + +def _validate_prices(df: pd.DataFrame) -> pd.DataFrame | None: + if df.empty or "close" not in df.columns: + st.error("No data for the selected range/interval.") + return None + return df + + +def _clean_prices(df: pd.DataFrame) -> pd.DataFrame | None: + price_cols = ["open", "high", "low", "close", "volume"] + numeric = {col: pd.to_numeric(df.get(col), errors="coerce") for col in price_cols if col in df} + clean = df.assign(**numeric).dropna(subset=["close"]) + if clean.empty: + st.error("No price data available after cleaning.") + return None + return clean + + +def _price_with_signals( + df: pd.DataFrame, + signals: pd.Series, + overlays: dict[str, pd.Series | pd.DataFrame] | None = None, +) -> go.Figure: + overlays = overlays or {} + fig = go.Figure() + fig.add_candlestick( + x=df.index, + open=df.get("open", df["close"]), + high=df.get("high", df["close"]), + low=df.get("low", df["close"]), + close=df["close"], + name="ohlc", + ) + + for name, series in overlays.items(): + if isinstance(series, pd.DataFrame): + for sub_name, ser in series.items(): + fig.add_trace( + go.Scatter(x=ser.index, y=ser.values, mode="lines", name=f"{name} ({sub_name})"), + ) + elif series is not None: + fig.add_trace(go.Scatter(x=series.index, y=series.values, mode="lines", name=name)) + + changes = signals.diff().fillna(signals.iloc[0] if len(signals) else 0.0) + buys = df.index[changes > 0] + sells = df.index[changes < 0] + + if len(buys): + fig.add_trace( + go.Scatter( + x=buys, + y=df.loc[buys, "close"], + mode="markers", + marker_symbol="triangle-up", + marker_size=10, + name="Breakout", + ), + ) + if len(sells): + fig.add_trace( + go.Scatter( + x=sells, + y=df.loc[sells, "close"], + mode="markers", + marker_symbol="triangle-down", + marker_size=10, + name="Exit", + ), + ) + + fig.update_layout(margin=dict(l=30, r=20, t=30, b=30), height=520) + return apply_plotly_theme(fig) + + +def main() -> None: + st.title("Backtest — Donchian breakout") + + shareable_link_button() + + today = date.today() + default_start = today - timedelta(days=365 * 2) + + ticker_default = str(get_param("ticker", "AAPL")).strip().upper() or "AAPL" + end_default = get_param("don_end", today) + start_default = get_param("don_start", default_start) + interval_options = ["1d", "1h", "1wk"] + interval_default = str(get_param("interval", "1d")) + if interval_default not in interval_options: + interval_default = "1d" + window_default = int(get_param("don_window", 20)) + fee_default = int(get_param("don_fee_bps", 0)) + slip_default = int(get_param("don_slippage_bps", 0)) + + window_default = max(5, min(200, window_default)) + fee_default = max(0, min(50, fee_default)) + slip_default = max(0, min(50, slip_default)) + + with st.sidebar: + st.header("Parameters") + with st.form("donchian_form"): + ticker = st.text_input("Ticker", value=ticker_default).strip().upper() + end = st.date_input("To", value=end_default) + start = st.date_input("From", value=start_default) + interval = st.selectbox("Interval", interval_options, index=interval_options.index(interval_default)) + window = st.number_input("Channel length", 5, 200, int(window_default), step=1) + fee_bps = st.number_input("Fees (bps)", 0, 50, int(fee_default), step=1) + slip_bps = st.number_input("Slippage (bps)", 0, 50, int(slip_default), step=1) + submitted = st.form_submit_button("Run backtest", type="primary") + + st.info("Configure the sidebar parameters and run the backtest.") + + if not submitted: + return + + set_param("ticker", ticker or None) + set_param("don_end", end) + set_param("don_start", start) + set_param("interval", interval) + set_param("don_window", int(window)) + set_param("don_fee_bps", int(fee_bps)) + set_param("don_slippage_bps", int(slip_bps)) + + with st.spinner("Fetching data..."): + df = get_prices_cached(ticker, start=start, end=end, interval=interval) + + df = _validate_prices(df) + if df is None: + return + + df = _clean_prices(df) + if df is None: + return + + signals, overlays = signals_donchian_breakout( + df.get("high", df["close"]), + df.get("low", df["close"]), + df["close"], + window=int(window), + ) + + bt, metrics = run_backtest( + df, + signals=signals, + fee_bps=int(fee_bps), + slippage_bps=int(slip_bps), + interval=interval, + ) + + st.subheader("Price and channels") + price_fig = _price_with_signals(df, signals, overlays) + st.plotly_chart(price_fig, use_container_width=True) + + st.subheader("Equity curve") + eq_fig = go.Figure(go.Scatter(x=bt.index, y=bt["equity"], mode="lines", name="Equity")) + eq_fig.update_layout(margin=dict(l=30, r=20, t=30, b=30), height=320) + apply_plotly_theme(eq_fig) + st.plotly_chart(eq_fig, use_container_width=True) + + st.subheader("Metrics") + metrics_df = pd.DataFrame([metrics]).T.rename(columns={0: "Value"}) + st.dataframe(metrics_df.style.format({"Value": "{:.4f}"}), use_container_width=True) + + +main() diff --git a/pages/2_Optimizacion_SMA.py b/pages/2_Optimizacion_SMA.py index 0b6e33f..bd892ca 100644 --- a/pages/2_Optimizacion_SMA.py +++ b/pages/2_Optimizacion_SMA.py @@ -1,7 +1,11 @@ import streamlit as st +from quantboard.ui.state import shareable_link_button + st.set_page_config(page_title="Optimize SMA", page_icon="⚙️", layout="wide") st.title("⚙️ Optimize SMA") +shareable_link_button() + st.info("Esta página fue reemplazada por **SMA Heatmap**. Usá el ítem *SMA Heatmap* del menú para optimizar parámetros.") diff --git a/pages/99_Trades_Debug.py b/pages/99_Trades_Debug.py new file mode 100644 index 0000000..fb20ef9 --- /dev/null +++ b/pages/99_Trades_Debug.py @@ -0,0 +1,249 @@ +"""Trades debug page to inspect trade segmentation.""" +from __future__ import annotations + +from datetime import date, timedelta +from io import StringIO + +import pandas as pd +import plotly.graph_objects as go +import streamlit as st + +from quantboard.data import get_prices_cached +from quantboard.plots import apply_plotly_theme +from quantboard.ui.state import get_param, set_param, shareable_link_button +from quantboard.ui.theme import apply_global_theme + +st.set_page_config(page_title="Trades Debug", page_icon="🔎", layout="wide") +apply_global_theme() + + +def _load_prices(ticker: str, start: date, end: date) -> pd.DataFrame: + return get_prices_cached(ticker, start=start, end=end, interval="1d") + + +def _clean_close(df: pd.DataFrame) -> pd.Series | None: + if df.empty or "close" not in df.columns: + st.error("No price data available for the selected period.") + return None + close = pd.to_numeric(df["close"], errors="coerce").dropna() + if close.empty: + st.error("No price data available for the selected period.") + return None + return close + + +def main() -> None: + st.title("🔎 Trades Debug") + + shareable_link_button() + + ticker_default = str(get_param("ticker", "AAPL")).strip().upper() or "AAPL" + fast_default = int(get_param("fast", 20)) + slow_default = int(get_param("slow", 100)) + lookback_default = int(get_param("trades_lookback", 2)) + + fast_default = max(1, min(365, fast_default)) + slow_default = max(2, min(500, slow_default)) + lookback_default = max(1, min(5, lookback_default)) + + with st.sidebar: + st.header("Parameters") + ticker_input = st.text_input("Ticker", value=ticker_default).strip().upper() + fast_input = st.number_input("Fast SMA", min_value=1, max_value=365, value=int(fast_default), step=1) + slow_input = st.number_input("Slow SMA", min_value=2, max_value=500, value=int(slow_default), step=1) + lookback_years = st.slider("Lookback (years)", min_value=1, max_value=5, value=int(lookback_default)) + + if ticker_input: + set_param("ticker", ticker_input) + else: + set_param("ticker", None) + ticker = ticker_input or ticker_default + + fast = int(fast_input) + slow = int(slow_input) + + set_param("fast", int(fast)) + set_param("slow", int(slow)) + set_param("trades_lookback", int(lookback_years)) + + if fast >= slow: + st.error("Fast SMA must be strictly lower than Slow SMA.") + return + + end_date = date.today() + start_date = end_date - timedelta(days=365 * lookback_years) + + with st.spinner("Downloading price data..."): + raw = _load_prices(ticker, start=start_date, end=end_date) + + if raw.empty: + st.warning("No data returned for the selected configuration.") + return + + close = _clean_close(raw) + if close is None: + return + + fast_sma = close.rolling(fast).mean() + slow_sma = close.rolling(slow).mean() + + signals = pd.Series(0.0, index=close.index) + valid_mask = fast_sma.notna() & slow_sma.notna() + signals.loc[valid_mask] = (fast_sma.loc[valid_mask] > slow_sma.loc[valid_mask]).astype(float) + + rets = close.pct_change().fillna(0.0) + prev_pos = signals.shift(1).fillna(0.0) + strat_rets = prev_pos * rets + + entries = (signals != 0.0) & (signals != prev_pos) + # shift trade ids so the flip bar's return stays with the trade that held it + trade_ids = entries.shift(1).cumsum() + trade_ids = trade_ids.where(signals != 0.0) + grouped = (1.0 + strat_rets).groupby(trade_ids) + trade_returns = grouped.prod() - 1.0 + trade_returns = trade_returns.dropna() + + old_trade_ids = entries.cumsum() + old_trade_ids = old_trade_ids.where(signals != 0.0) + old_trade_returns = (1.0 + strat_rets).groupby(old_trade_ids).prod() - 1.0 + old_trade_returns = old_trade_returns.dropna() + + equity_curve = (1.0 + strat_rets).cumprod() + + trades_df = pd.DataFrame({ + "close": close, + "fast_sma": fast_sma, + "slow_sma": slow_sma, + "signal": signals, + "prev_pos": prev_pos, + "entries": entries, + "trade_id": trade_ids, + "old_trade_id": old_trade_ids, + "strat_rets": strat_rets, + "equity": equity_curve, + }) + + st.subheader("Price & Trades") + fig = go.Figure() + fig.add_trace( + go.Scatter( + x=close.index, + y=close.values, + name="close", + mode="lines", + ) + ) + fig.add_trace( + go.Scatter( + x=fast_sma.index, + y=fast_sma.values, + name=f"Fast SMA {fast}", + mode="lines", + ) + ) + fig.add_trace( + go.Scatter( + x=slow_sma.index, + y=slow_sma.values, + name=f"Slow SMA {slow}", + mode="lines", + ) + ) + + changes = signals.diff().fillna(signals.iloc[0] if len(signals) else 0.0) + entry_idx = close.index[changes > 0] + exit_idx = close.index[changes < 0] + + if len(entry_idx): + fig.add_trace( + go.Scatter( + x=entry_idx, + y=close.loc[entry_idx], + mode="markers", + marker=dict(symbol="triangle-up", size=10, color="#33C472"), + name="Entry", + ) + ) + if len(exit_idx): + fig.add_trace( + go.Scatter( + x=exit_idx, + y=close.loc[exit_idx], + mode="markers", + marker=dict(symbol="triangle-down", size=10, color="#FF4B4B"), + name="Exit", + ) + ) + + trade_rows: list[dict] = [] + new_trades = trades_df.dropna(subset=["trade_id"]).copy() + if not new_trades.empty: + new_trades["trade_id"] = new_trades["trade_id"].astype(int) + for idx, (trade_id, group) in enumerate(new_trades.groupby("trade_id")): + start_ts = group.index[0] + end_ts = group.index[-1] + bars = group.shape[0] + ret = float((1.0 + group["strat_rets"]).prod() - 1.0) + equity_end = float(group["equity"].iloc[-1]) + trade_rows.append( + { + "trade_id": int(trade_id), + "start_ts": start_ts, + "end_ts": end_ts, + "bars": bars, + "ret_pct": ret * 100.0, + "equity": equity_end, + } + ) + shade_color = "rgba(51, 196, 114, 0.12)" if idx % 2 == 0 else "rgba(255, 75, 75, 0.12)" + fig.add_vrect(x0=start_ts, x1=end_ts, fillcolor=shade_color, layer="below", opacity=0.2, line_width=0) + + apply_plotly_theme(fig) + fig.update_layout(margin=dict(l=0, r=0, t=10, b=0)) + st.plotly_chart(fig, use_container_width=True) + + if trade_rows: + trades_table = pd.DataFrame(trade_rows) + trades_table["start_ts"] = trades_table["start_ts"].dt.strftime("%Y-%m-%d") + trades_table["end_ts"] = trades_table["end_ts"].dt.strftime("%Y-%m-%d") + trades_table["ret_pct"] = trades_table["ret_pct"].map(lambda v: f"{v:.2f}%") + trades_table["equity"] = trades_table["equity"].map(lambda v: f"{v:.3f}") + + st.subheader("Trades (new grouping)") + st.dataframe(trades_table, use_container_width=True) + + csv_buffer = StringIO() + pd.DataFrame(trade_rows).to_csv(csv_buffer, index=False) + st.download_button( + "Download trades CSV", + data=csv_buffer.getvalue().encode("utf-8"), + file_name=f"{ticker}_trades_debug.csv", + mime="text/csv", + ) + else: + st.info("No trades were generated for the current configuration.") + + st.subheader("Old vs New grouping") + new_count = int(trade_returns.count()) + new_mean = float(trade_returns.mean()) if new_count else 0.0 + new_win_rate = float((trade_returns > 0).mean()) if new_count else 0.0 + + old_count = int(old_trade_returns.count()) + old_mean = float(old_trade_returns.mean()) if old_count else 0.0 + old_win_rate = float((old_trade_returns > 0).mean()) if old_count else 0.0 + + comparison = pd.DataFrame( + { + "Grouping": ["New (shifted)", "Old (unshifted)"], + "Trades": [new_count, old_count], + "Mean trade %": [new_mean * 100.0, old_mean * 100.0], + "Win rate": [new_win_rate, old_win_rate], + } + ) + comparison["Mean trade %"] = comparison["Mean trade %"].map(lambda v: f"{v:.2f}%") + comparison["Win rate"] = comparison["Win rate"].map(lambda v: f"{v:.1%}") + st.table(comparison) + + +if __name__ == "__main__": + main() diff --git a/quantboard/backtest.py b/quantboard/backtest.py index cc95864..aaa831e 100644 --- a/quantboard/backtest.py +++ b/quantboard/backtest.py @@ -42,31 +42,50 @@ def _cagr(equity: pd.Series, periods_per_year: float) -> float: def _sharpe(rets: pd.Series, periods_per_year: float) -> float: - if rets.std(ddof=0) == 0 or len(rets) == 0: + if len(rets) == 0: return 0.0 - return float(rets.mean() / rets.std(ddof=0) * np.sqrt(periods_per_year)) + std = rets.std(ddof=0) + if std == 0: + return 0.0 + return float(rets.mean() / std * np.sqrt(periods_per_year)) + + +def _sortino(rets: pd.Series, periods_per_year: float) -> float: + if len(rets) == 0: + return 0.0 + downside = rets[rets < 0] + if len(downside) == 0: + return 0.0 + downside_std = downside.std(ddof=0) + if downside_std == 0: + return 0.0 + return float(rets.mean() / downside_std * np.sqrt(periods_per_year)) def run_backtest( - df: pd.DataFrame, + df: pd.DataFrame | pd.Series, signals: pd.Series, *, - fee_bps: int = 0, - slippage_bps: int = 0, + fee_bps: float = 0.0, + slippage_bps: float = 0.0, interval: str = "1d", ) -> tuple[pd.DataFrame, dict]: """ Backtest long/short con señales en {-1, 0, 1}. Costos aplicados en cada cambio de posición (fee + slippage en bps). - Devuelve DataFrame con 'equity' y dict de métricas: CAGR, Sharpe, MaxDD. + Devuelve DataFrame con 'equity' y dict de métricas: + CAGR, Sharpe, Sortino, Max Drawdown, Win rate, Avg trade return, Exposure y Trades. """ - data = df.copy() - data.columns = [str(c).lower() for c in data.columns] - for c in ("open", "high", "low", "close"): - if c not in data.columns and "close" in data.columns: - # por compat - ya normalizamos en capas superiores - data[c] = pd.to_numeric(data["close"], errors="coerce") - close = pd.to_numeric(data["close"], errors="coerce").fillna(method="ffill") + if isinstance(df, pd.Series): + close = pd.to_numeric(df, errors="coerce").fillna(method="ffill") + data = close.to_frame(name="close") + else: + data = df.copy() + data.columns = [str(c).lower() for c in data.columns] + for c in ("open", "high", "low", "close"): + if c not in data.columns and "close" in data.columns: + data[c] = pd.to_numeric(data["close"], errors="coerce") + close = pd.to_numeric(data.get("close"), errors="coerce").fillna(method="ffill") rets = close.pct_change().fillna(0.0) pos = pd.Series(signals, index=close.index).replace([np.inf, -np.inf], np.nan).ffill().fillna(0.0) @@ -74,16 +93,41 @@ def run_backtest( # Costos por cambio de posición turn = pos.diff().abs().fillna(0.0) # 0->1, 1->-1, etc. - cost = turn * ((fee_bps + slippage_bps) / 10000.0) + total_cost_bps = float(fee_bps) + float(slippage_bps) + cost = turn * (total_cost_bps / 10000.0) strat_rets = pos.shift(1).fillna(0.0) * rets - cost equity = (1.0 + strat_rets).cumprod() res_df = pd.DataFrame({"equity": equity, "returns": strat_rets}) ppy = _periods_per_year(interval) + sortino = _sortino(strat_rets, ppy) + sharpe = _sharpe(strat_rets, ppy) + cagr = _cagr(equity, ppy) + max_dd = _max_drawdown(equity) + + prev_pos = pos.shift(1).fillna(0.0) + entries = (pos != 0.0) & (pos != prev_pos) + # shift trade ids so the flip bar's return stays with the trade that held it + trade_ids = entries.shift(1).cumsum() + trade_ids = trade_ids.where(pos != 0.0) + grouped = (1.0 + strat_rets).groupby(trade_ids) + trade_returns = grouped.prod() - 1.0 + trade_returns = trade_returns.dropna() + + trades_count = int(trade_returns.count()) + win_rate = float((trade_returns > 0).mean()) if trades_count else 0.0 + avg_trade_return = float(trade_returns.mean()) if trades_count else 0.0 + exposure = float((pos != 0.0).mean()) if len(pos) else 0.0 + metrics = { - "CAGR": _cagr(equity, ppy), - "Sharpe": _sharpe(strat_rets, ppy), - "MaxDD": _max_drawdown(equity), + "CAGR": cagr, + "Sharpe": sharpe, + "Sortino": sortino, + "Max Drawdown": max_dd, + "Win rate": win_rate, + "Avg trade return": avg_trade_return, + "Exposure (%)": exposure, + "Trades count": trades_count, } return res_df, metrics diff --git a/quantboard/data.py b/quantboard/data.py index 140a804..dfc9c15 100644 --- a/quantboard/data.py +++ b/quantboard/data.py @@ -1,16 +1,25 @@ +"""Data access helpers for price downloads.""" + +from __future__ import annotations + +from typing import Callable, Dict + import pandas as pd import yfinance as yf -try: + +try: # pragma: no cover - Streamlit not available during unit tests import streamlit as st - cache = st.cache_data(show_spinner=False, ttl=60) -except Exception: - # fallback no-cache when not running in Streamlit - def _no_cache(func): - return func - cache = _no_cache - -@cache +except Exception: # pragma: no cover - fallback when Streamlit missing + st = None # type: ignore[assignment] + +_TTL_BY_INTERVAL = {"1m": 60, "1h": 600, "1d": 3600, "1wk": 7200} +_DEFAULT_TTL = 3600 +_CACHED_FETCHERS: Dict[int, Callable[..., pd.DataFrame]] = {} + + def get_prices(ticker: str, start: str, end: str, interval: str = "1d") -> pd.DataFrame: + """Download OHLCV data and normalise column names.""" + try: df = yf.download( ticker, @@ -20,15 +29,32 @@ def get_prices(ticker: str, start: str, end: str, interval: str = "1d") -> pd.Da auto_adjust=True, progress=False, ) - if isinstance(df.columns, pd.MultiIndex): - # If multiple tickers accidentally passed, keep first level if present - try: - df = df.xs(ticker, axis=1, level=1) - except Exception: - df = df.droplevel(0, axis=1) - df = df.rename(columns=str.lower) - df.index = pd.to_datetime(df.index) - df = df.dropna() - return df except Exception: return pd.DataFrame() + + if isinstance(df.columns, pd.MultiIndex): + # If multiple tickers accidentally passed, keep first level if present + try: + df = df.xs(ticker, axis=1, level=1) + except Exception: + df = df.droplevel(0, axis=1) + + df = df.rename(columns=str.lower) + df.index = pd.to_datetime(df.index) + return df.dropna() + + +def get_prices_cached(ticker: str, start: str, end: str, interval: str = "1d") -> pd.DataFrame: + """Return cached prices with a TTL determined by the requested interval.""" + + ttl = _TTL_BY_INTERVAL.get(interval, _DEFAULT_TTL) + + if st is None: + return get_prices(ticker, start=start, end=end, interval=interval) + + fetcher = _CACHED_FETCHERS.get(ttl) + if fetcher is None: + fetcher = st.cache_data(show_spinner=False, ttl=ttl)(get_prices) + _CACHED_FETCHERS[ttl] = fetcher + + return fetcher(ticker, start, end, interval) diff --git a/quantboard/ui/state.py b/quantboard/ui/state.py new file mode 100644 index 0000000..14a6276 --- /dev/null +++ b/quantboard/ui/state.py @@ -0,0 +1,137 @@ +"""State helpers to sync Streamlit session state with query parameters.""" +from __future__ import annotations + +from datetime import date, datetime +from html import escape +from typing import Any, TypeVar, cast +from uuid import uuid4 + +import streamlit as st +from streamlit.components.v1 import html + +T = TypeVar("T") + + +def _normalize_query_value(value: Any) -> Any: + if isinstance(value, list): + return value[0] if value else None + return value + + +def _coerce_value(raw: Any, default: T) -> T: + if raw is None: + return default + + if isinstance(default, bool): + if isinstance(raw, bool): + return cast(T, raw) + value = str(raw).strip().lower() + if value in {"1", "true", "yes", "on"}: + return cast(T, True) + if value in {"0", "false", "no", "off"}: + return cast(T, False) + return default + + if isinstance(default, int) and not isinstance(default, bool): + try: + return cast(T, int(raw)) + except (TypeError, ValueError): + return default + + if isinstance(default, float): + try: + return cast(T, float(raw)) + except (TypeError, ValueError): + return default + + if isinstance(default, datetime): + try: + return cast(T, datetime.fromisoformat(str(raw))) + except (TypeError, ValueError): + return default + + if isinstance(default, date): + try: + return cast(T, date.fromisoformat(str(raw))) + except (TypeError, ValueError): + return default + + if isinstance(default, str): + return cast(T, str(raw)) + + return cast(T, raw) + + +def _stringify(value: Any) -> str: + if isinstance(value, bool): + return "true" if value else "false" + if isinstance(value, datetime): + return value.isoformat() + if isinstance(value, date): + return value.isoformat() + return str(value) + + +def get_param(key: str, default: T) -> T: + """Return a value prioritising query params, then session_state, then default.""" + params = st.query_params + if key in params: + raw = _normalize_query_value(params.get(key)) + return _coerce_value(raw, default) + + if key in st.session_state: + return cast(T, st.session_state[key]) + + return default + + +def set_param(key: str, value: Any | None) -> None: + """Persist *value* both in session_state and query params.""" + params = st.query_params + + if value is None: + st.session_state.pop(key, None) + if key in params: + try: + del params[key] + except Exception: + params[key] = "" + return + + st.session_state[key] = value + target = _stringify(value) + current = _normalize_query_value(params.get(key)) if key in params else None + if current != target: + params[key] = target + + +def shareable_link_button(label: str = "Copy shareable link") -> None: + """Render a small button that copies the current page URL to the clipboard.""" + button_id = f"copy-link-{uuid4().hex}" + safe_label = escape(label) + html( + f""" +
+ +
+ + """, + height=56, + ) diff --git a/streamlit_app.py b/streamlit_app.py index 982f47c..a44387a 100644 --- a/streamlit_app.py +++ b/streamlit_app.py @@ -7,9 +7,10 @@ from plotly.subplots import make_subplots import streamlit as st -from quantboard.data import get_prices +from quantboard.data import get_prices_cached from quantboard.indicators import sma, rsi from quantboard.plots import apply_plotly_theme +from quantboard.ui.state import get_param, set_param, shareable_link_button from quantboard.ui.theme import apply_global_theme st.set_page_config(page_title="QuantBoard", page_icon="📈", layout="wide") @@ -26,16 +27,6 @@ def _autorefresh_if_needed(enabled: bool, interval: str) -> None: st.session_state[key] = now st.rerun() -@st.cache_data(ttl=60, show_spinner=False) -def fetch_prices_cached(ticker: str, start: date | datetime, end: date | datetime, interval: str) -> pd.DataFrame: - df = get_prices((ticker or "").strip().upper(), start=start, end=end, interval=interval) - if df is None or df.empty: - return pd.DataFrame() - # normaliza a minúscula - df = df.rename(columns=str.lower) - df.index = pd.to_datetime(df.index) - return df.dropna() - def main() -> None: st.title("QuantBoard — Real-time Technical Analysis") st.caption("Configure the sidebar to load prices and indicators. **Intraday 1m** with **60s auto-refresh**.") @@ -43,13 +34,37 @@ def main() -> None: today = date.today() default_start = today - timedelta(days=365) + initial_ticker = str(get_param("ticker", "AAPL")).strip().upper() or "AAPL" + start_default = get_param("from", default_start) + end_default = get_param("to", today) + interval_options = ["1d", "1h", "1wk", "1m"] + interval_default = str(get_param("interval", "1d")) + if interval_default not in interval_options: + interval_default = "1d" + auto_refresh_default = bool(get_param("auto_refresh", False)) + with st.sidebar: st.header("Parameters") - ticker = st.text_input("Ticker", value="AAPL").strip().upper() - start_date = st.date_input("From", value=default_start, max_value=today) - end_date = st.date_input("To", value=today, min_value=default_start, max_value=today) - interval = st.selectbox("Interval", ["1d", "1h", "1wk", "1m"], index=0) - auto_refresh = st.checkbox("Auto-refresh 1m", value=False, help="Refreshes every 60 seconds when 1m interval is selected.") + ticker = st.text_input("Ticker", value=initial_ticker).strip().upper() + start_date = st.date_input("From", value=start_default, max_value=today) + end_date = st.date_input("To", value=end_default, min_value=default_start, max_value=today) + interval = st.selectbox("Interval", interval_options, index=interval_options.index(interval_default)) + auto_refresh = st.checkbox( + "Auto-refresh 1m", + value=auto_refresh_default, + help="Refreshes every 60 seconds when 1m interval is selected.", + ) + + if ticker: + set_param("ticker", ticker) + else: + set_param("ticker", None) + set_param("from", start_date) + set_param("to", end_date) + set_param("interval", interval) + set_param("auto_refresh", auto_refresh) + + shareable_link_button() if start_date > end_date: st.error("The 'From' date must be earlier than 'To'.") @@ -62,7 +77,7 @@ def main() -> None: return with st.spinner("Fetching data..."): - prices = fetch_prices_cached(ticker, start=start_date, end=end_date, interval=interval) + prices = get_prices_cached(ticker, start=start_date, end=end_date, interval=interval) if prices.empty or "close" not in prices.columns: st.error("No data for the selected range/interval.") diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..2a855d9 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,6 @@ +import sys +from pathlib import Path + +ROOT = Path(__file__).resolve().parents[1] +if str(ROOT) not in sys.path: + sys.path.insert(0, str(ROOT)) diff --git a/tests/test_backtest.py b/tests/test_backtest.py new file mode 100644 index 0000000..c53578b --- /dev/null +++ b/tests/test_backtest.py @@ -0,0 +1,36 @@ +import math + +import pandas as pd + +from quantboard.backtest import run_backtest + + +def test_backtest_produces_trades_and_metrics() -> None: + index = pd.date_range("2024-01-01", periods=10, freq="D") + close = pd.Series([100, 101, 102, 103, 102, 101, 100, 99, 100, 101], index=index, name="close") + + fast = close.rolling(2).mean() + slow = close.rolling(3).mean() + + signals = pd.Series(0.0, index=index) + crossover = fast > slow + signals[crossover.fillna(False)] = 1.0 + signals[~crossover.fillna(False)] = 0.0 + + bt, metrics = run_backtest(close.to_frame(), signals, interval="1d") + + assert set(metrics.keys()) == { + "CAGR", + "Sharpe", + "Sortino", + "Max Drawdown", + "Win rate", + "Avg trade return", + "Exposure (%)", + "Trades count", + } + + assert metrics["Trades count"] > 0 + assert math.isfinite(metrics["Avg trade return"]) + assert len(bt) == len(close) + assert bt["equity"].iloc[0] == 1.0 diff --git a/tests/test_backtest_alignment.py b/tests/test_backtest_alignment.py new file mode 100644 index 0000000..da6cca2 --- /dev/null +++ b/tests/test_backtest_alignment.py @@ -0,0 +1,86 @@ +import unittest + +import numpy as np +import pandas as pd + +from quantboard.backtest import run_backtest + + +def _compute_trade_returns(close: pd.Series, signals: pd.Series, shifted: bool) -> pd.Series: + close = pd.to_numeric(close, errors="coerce").ffill() + rets = close.pct_change().fillna(0.0) + + pos = pd.Series(signals, index=close.index, dtype=float) + pos = pos.replace([np.inf, -np.inf], np.nan).ffill().fillna(0.0).clip(-1, 1) + + strat_rets = pos.shift(1).fillna(0.0) * rets + prev_pos = pos.shift(1).fillna(0.0) + entries = (pos != 0.0) & (pos != prev_pos) + + trade_ids = entries.shift(1).cumsum() if shifted else entries.cumsum() + trade_ids = trade_ids.where(pos != 0.0) + + grouped = (1.0 + strat_rets).groupby(trade_ids) + trade_returns = grouped.prod() - 1.0 + return trade_returns.dropna() + + +class BacktestAlignmentTest(unittest.TestCase): + def test_single_long_trade_alignment(self) -> None: + index = pd.date_range("2023-01-01", periods=5, freq="D") + close = pd.Series([100, 102, 105, 104, 104], index=index, dtype=float) + signals = pd.Series([1, 1, 1, 1, 0], index=index, dtype=float) + + new_returns = _compute_trade_returns(close, signals, shifted=True) + old_returns = _compute_trade_returns(close, signals, shifted=False) + + self.assertEqual(len(new_returns), 1) + self.assertEqual(len(old_returns), 1) + + expected_return = (close.iloc[3] / close.iloc[0]) - 1.0 + self.assertAlmostEqual(new_returns.iloc[0], expected_return) + self.assertAlmostEqual(old_returns.iloc[0], expected_return) + + _, metrics = run_backtest(close.to_frame(name="close"), signals) + self.assertEqual(metrics["Trades count"], 1) + self.assertAlmostEqual(metrics["Avg trade return"], expected_return) + + def test_flip_trade_keeps_flip_bar_with_prior_position(self) -> None: + index = pd.date_range("2023-01-01", periods=5, freq="D") + close = pd.Series([100, 102, 101, 99, 98], index=index, dtype=float) + signals = pd.Series([1, 1, -1, -1, -1], index=index, dtype=float) + + new_returns = _compute_trade_returns(close, signals, shifted=True) + old_returns = _compute_trade_returns(close, signals, shifted=False) + + self.assertEqual(len(new_returns), 2) + self.assertEqual(len(old_returns), 2) + + rets = close.pct_change().fillna(0.0) + expected_new = [ + (1.0 + rets.iloc[1]) * (1.0 + rets.iloc[2]) - 1.0, + (1.0 - rets.iloc[3]) * (1.0 - rets.iloc[4]) - 1.0, + ] + expected_old = [ + (1.0 + rets.iloc[1]) - 1.0, + (1.0 + rets.iloc[2]) * (1.0 - rets.iloc[3]) * (1.0 - rets.iloc[4]) - 1.0, + ] + + self.assertAlmostEqual(new_returns.iloc[0], expected_new[0]) + self.assertAlmostEqual(new_returns.iloc[1], expected_new[1]) + self.assertAlmostEqual(old_returns.iloc[0], expected_old[0]) + self.assertAlmostEqual(old_returns.iloc[1], expected_old[1]) + + self.assertLess(new_returns.iloc[0], old_returns.iloc[0]) + self.assertGreater(new_returns.iloc[1], old_returns.iloc[1]) + + _, metrics = run_backtest(close.to_frame(name="close"), signals) + self.assertEqual(metrics["Trades count"], 2) + self.assertAlmostEqual( + metrics["Avg trade return"], + float(np.mean(expected_new)), + ) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_indicators.py b/tests/test_indicators.py new file mode 100644 index 0000000..60993cc --- /dev/null +++ b/tests/test_indicators.py @@ -0,0 +1,49 @@ +import math + +import pandas as pd +import pandas.testing as pdt + +from quantboard.indicators import rsi, sma + + +def test_sma_window_three_matches_expected_mean() -> None: + series = pd.Series([1.0, 2.0, 3.0, 4.0, 5.0], name="close") + result = sma(series, window=3) + + expected = pd.Series([float("nan"), float("nan"), 2.0, 3.0, 4.0], name="SMA_3") + pdt.assert_series_equal(result, expected) + + +def test_rsi_matches_reference_value() -> None: + prices = pd.Series( + [ + 44.34, + 44.09, + 44.15, + 43.61, + 44.33, + 44.83, + 45.10, + 45.42, + 45.84, + 46.08, + 45.89, + 46.03, + 45.61, + 46.28, + 46.28, + 46.00, + 46.03, + 46.41, + 46.22, + 45.64, + 46.21, + ] + ) + + result = rsi(prices, period=14) + + assert math.isclose(result.iloc[13], 50.65741494172488, rel_tol=1e-12, abs_tol=1e-9) + assert math.isclose(result.iloc[-1], 50.40461501951188, rel_tol=1e-12, abs_tol=1e-9) + assert result.name == "RSI_14" + assert len(result) == len(prices)