diff --git a/.sample.env b/.sample.env index 47513c03..6abd403d 100644 --- a/.sample.env +++ b/.sample.env @@ -19,7 +19,13 @@ REDIRECT_URL = 'http://127.0.0.1:5000//callback' # Change if different # Valid Brokers Configuration -VALID_BROKERS = 'fivepaisa,fivepaisaxts,aliceblue,angel,compositedge,dhan,dhan_sandbox,definedge,firstock,flattrade,fyers,groww,ibulls,iifl,indmoney,kotak,motilal,paytm,pocketful,shoonya,tradejini,upstox,wisdom,zebu,zerodha' +VALID_BROKERS = 'mstock,fivepaisa,fivepaisaxts,aliceblue,angel,compositedge,dhan,dhan_sandbox,definedge,firstock,flattrade,fyers,groww,ibulls,iifl,indmoney,kotak,motilal,paytm,pocketful,shoonya,tradejini,upstox,wisdom,zebu,zerodha,mstock' + +# mstock Configuration +# BROKER_API_KEY = 'YOUR_MSTOCK_API_KEY' +# BROKER_USERNAME = 'YOUR_MSTOCK_CLIENT_ID' +# BROKER_PIN = 'YOUR_MSTOCK_PIN' +# BROKER_TOTP_CODE = 'YOUR_MSTOCK_TOTP' # Security Configuration diff --git a/app.py b/app.py index 96a6d86f..aab78f53 100644 --- a/app.py +++ b/app.py @@ -454,4 +454,4 @@ def run_catchup(): url = f"http://{host_ip}:{port}" log_startup_banner(logger, "OpenAlgo is running!", url) - socketio.run(app, host=host_ip, port=port, debug=debug) + socketio.run(app, host=host_ip, port=port, debug=debug) \ No newline at end of file diff --git a/blueprints/brlogin.py b/blueprints/brlogin.py index c479bb3c..646a54a8 100644 --- a/blueprints/brlogin.py +++ b/blueprints/brlogin.py @@ -81,6 +81,16 @@ def broker_callback(broker,para=None): user_id = clientcode auth_token, feed_token, error_message = auth_function(clientcode, broker_pin, totp_code) forward_url = 'angel.html' + + elif broker == 'mstock': + if request.method == 'GET': + return render_template('mstock.html') + + elif request.method == 'POST': + totp_code = request.form.get('totp') + api_key = get_broker_api_key() + auth_token, feed_token, error_message = auth_function(api_key, totp_code) + forward_url = 'mstock.html' elif broker == 'aliceblue': if request.method == 'GET': diff --git a/broker/mstock/__init__.py b/broker/mstock/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/broker/mstock/api/__init__.py b/broker/mstock/api/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/broker/mstock/api/auth_api.py b/broker/mstock/api/auth_api.py new file mode 100644 index 00000000..73960695 --- /dev/null +++ b/broker/mstock/api/auth_api.py @@ -0,0 +1,44 @@ +import httpx +import json +import os +from utils.httpx_client import get_httpx_client + +def authenticate_broker(api_key, totp_code): + """ + Authenticate with mstock and return the auth token. + """ + try: + client = get_httpx_client() + + # If TOTP is enabled, we can directly verify it without a prior login call + headers = { + 'X-Mirae-Version': '1', + 'Content-Type': 'application/x-www-form-urlencoded', + } + data = { + 'api_key': api_key, + 'totp': totp_code, + } + + response = client.post( + 'https://api.mstock.trade/openapi/typea/session/verifytotp', + headers=headers, + data=data + ) + + response.raise_for_status() + data_dict = response.json() + + if data_dict.get("status") == "success" and "data" in data_dict: + auth_token = data_dict["data"].get("access_token") + # mstock does not provide a separate feed token in this response + feed_token = None + return auth_token, feed_token, None + else: + error_message = data_dict.get("message", "Authentication failed.") + return None, None, error_message + + except httpx.HTTPStatusError as e: + return None, None, f"HTTP error occurred: {e.response.status_code} - {e.response.text}" + except Exception as e: + return None, None, str(e) diff --git a/broker/mstock/api/data.py b/broker/mstock/api/data.py new file mode 100644 index 00000000..6410f247 --- /dev/null +++ b/broker/mstock/api/data.py @@ -0,0 +1,46 @@ +from utils.httpx_client import get_httpx_client +from broker.mstock.mapping.order_data import transform_positions_data, transform_holdings_data + +def get_positions(auth_token): + api_key = os.getenv('BROKER_API_KEY') + """ + Retrieves the user's positions. + """ + headers = { + 'X-Mirae-Version': '1', + 'Authorization': f'token {api_key}:{auth_token}', + } + + try: + client = get_httpx_client() + response = client.get( + 'https://api.mstock.trade/openapi/typea/portfolio/positions', + headers=headers, + ) + response.raise_for_status() + positions = response.json() + return transform_positions_data(positions), None + except Exception as e: + return None, str(e) + +def get_holdings(auth_token): + api_key = os.getenv('BROKER_API_KEY') + """ + Retrieves the user's holdings. + """ + headers = { + 'X-Mirae-Version': '1', + 'Authorization': f'token {api_key}:{auth_token}', + } + + try: + client = get_httpx_client() + response = client.get( + 'https://api.mstock.trade/openapi/typea/portfolio/holdings', + headers=headers, + ) + response.raise_for_status() + holdings = response.json() + return transform_holdings_data(holdings), None + except Exception as e: + return None, str(e) diff --git a/broker/mstock/api/funds.py b/broker/mstock/api/funds.py new file mode 100644 index 00000000..01718b07 --- /dev/null +++ b/broker/mstock/api/funds.py @@ -0,0 +1,67 @@ +import os +import httpx +from utils.httpx_client import get_httpx_client +from utils.logging import get_logger +from broker.mstock.database import master_contract_db + +logger = get_logger(__name__) + +def get_margin_data(auth_token): + """Fetch margin (fund) data from MStock API using Type A authentication.""" + api_key = os.getenv('BROKER_API_KEY') + + if not api_key: + logger.error("Missing environment variable: BROKER_API_KEY") + return {} + + headers = { + 'X-Mirae-Version': '1', + 'Authorization': f'token {api_key}:{auth_token}', + } + + try: + client = get_httpx_client() + response = client.get( + 'https://api.mstock.trade/openapi/typea/user/fundsummary', + headers=headers, + timeout=10.0 + ) + response.raise_for_status() + margin_data = response.json() + + if margin_data.get('status') == 'success' and margin_data.get('data'): + data = margin_data['data'][0] + key_mapping = { + "AVAILABLE_BALANCE": "availablecash", + "COLLATERALS": "collateral", + "REALISED_PROFITS": "m2mrealized", + "MTM_COMBINED": "m2munrealized", + "AMOUNT_UTILIZED": "utiliseddebits", + } + + filtered_data = {} + for mstock_key, openalgo_key in key_mapping.items(): + value = data.get(mstock_key) + if value in (None, "None", ""): + value = 0 + try: + formatted_value = "{:.2f}".format(float(value)) + except (ValueError, TypeError): + formatted_value = "0.00" + filtered_data[openalgo_key] = formatted_value + + logger.info(f"filteredMargin Data: {filtered_data}") + return filtered_data + + logger.error(f"Margin API failed: {margin_data.get('message', 'No data')}") + return {} + + except httpx.HTTPStatusError as e: + logger.error(f"HTTP Error while fetching margin data: {e}") + return {} + except httpx.RequestError as e: + logger.error(f"Network Error while fetching margin data: {e}") + return {} + except Exception as e: + logger.exception("Unexpected error while fetching margin data.") + return {} diff --git a/broker/mstock/api/order_api.py b/broker/mstock/api/order_api.py new file mode 100644 index 00000000..0ce40a43 --- /dev/null +++ b/broker/mstock/api/order_api.py @@ -0,0 +1,204 @@ +import httpx +import json +import os +from utils.httpx_client import get_httpx_client +from broker.mstock.mapping.transform_data import transform_data, transform_modify_order_data +from broker.mstock.mapping.order_data import transform_order_data, transform_tradebook_data +from utils.logging import get_logger +logger = get_logger(__name__) + +def get_api_response(endpoint, auth, method="GET", payload=''): + auth_token = auth + api_key = os.getenv('BROKER_API_KEY') + + # Get the shared httpx client with connection pooling + client = get_httpx_client() + + headers = { + 'X-Mirae-Version': '1', + 'Authorization': f'token {api_key}:{auth_token}', + 'Content-Type': 'application/json', + } + + url = f"https://api.mstock.trade/openapi/typea{endpoint}" + + if method == "GET": + response = client.get(url, headers=headers) + elif method == "POST": + response = client.post(url, headers=headers, content=payload) + else: + response = client.request(method, url, headers=headers, content=payload) + + # Add status attribute for compatibility with the existing codebase + response.status = response.status_code + + # Handle empty response + if not response.text: + return {} + + try: + logger.info(f"data from {endpoint}: {response.text}") + return json.loads(response.text) + except json.JSONDecodeError: + logger.error(f"Failed to parse JSON response from {endpoint}: {response.text}") + return {} + +def place_order(auth_token, data): + """ + Places an order with the broker. + """ + api_key = os.getenv('BROKER_API_KEY') + order_params = transform_data(data) + headers = { + 'X-Mirae-Version': '1', + 'Authorization': f'token {api_key}:{auth_token}', + 'Content-Type': 'application/json', + } + + try: + client = get_httpx_client() + response = client.post( + 'https://api.mstock.trade/openapi/typea/orders', + headers=headers, + json=order_params + ) + response.raise_for_status() + return response.json(), None + except httpx.HTTPStatusError as e: + return None, f"HTTP error occurred: {e.response.status_code} - {e.response.text}" + except Exception as e: + return None, str(e) + +def modify_order(auth_token, data): + """ + Modifies an existing order. + """ + api_key = os.getenv('BROKER_API_KEY') + order_params = transform_modify_order_data(data) + headers = { + 'X-Mirae-Version': '1', + 'Authorization': f'token {api_key}:{auth_token}', + 'Content-Type': 'application/json', + } + + try: + client = get_httpx_client() + response = client.put( + f'https://api.mstock.trade/openapi/typea/orders', + headers=headers, + json=order_params + ) + response.raise_for_status() + return response.json(), None + except httpx.HTTPStatusError as e: + return None, f"HTTP error occurred: {e.response.status_code} - {e.response.text}" + except Exception as e: + return None, str(e) + +def cancel_order(auth_token, order_id, variety): + """ + Cancels an existing order. + """ + api_key = os.getenv('BROKER_API_KEY') + headers = { + 'X-Mirae-Version': '1', + 'Authorization': f'token {api_key}:{auth_token}', + } + + try: + client = get_httpx_client() + response = client.delete( + f'https://api.mstock.trade/openapi/typea/orders/{order_id}?variety={variety}', + headers=headers, + ) + response.raise_for_status() + return response.json(), None + except httpx.HTTPStatusError as e: + return None, f"HTTP error occurred: {e.response.status_code} - {e.response.text}" + except Exception as e: + return None, str(e) + +def get_order_book(auth): + order_data = get_api_response("/orders", auth) + if ( + order_data + and isinstance(order_data, dict) + and "status" in order_data + and "data" in order_data + and isinstance(order_data["data"], list) + ): + for order in order_data["data"]: + tradingsymbol = str(order.get("tradingsymbol", "")).upper() + if tradingsymbol.endswith("CE") or tradingsymbol.endswith("PE"): + if order.get("exchange", "").upper() == "NSE": + order["exchange"] = "NFO" + + return { + "status": order_data["status"], + "data": order_data["data"] + } + + # fallback if structure invalid + return { + "status": order_data.get("status", "error") if isinstance(order_data, dict) else "error", + "data": [] + } + +def get_trade_book(auth): + trades_data = get_api_response("/typea/tradebook", auth) + + if ( + trades_data + and isinstance(trades_data, dict) + and "status" in trades_data + and "data" in trades_data + and isinstance(trades_data["data"], list) + ): + for trade in trades_data["data"]: + tradingsymbol = str(trade.get("tradingsymbol", "")).upper() + if tradingsymbol.endswith("CE") or tradingsymbol.endswith("PE"): + if trade.get("exchange", "").upper() == "NSE": + trade["exchange"] = "NFO" + + return { + "status": trades_data["status"], + "data": trades_data["data"] + } + + # fallback if structure invalid + return { + "status": trades_data.get("status", "error") if isinstance(trades_data, dict) else "error", + "data": [] + } + +def get_positions(auth): + positions_data = get_api_response("/portfolio/positions", auth) + + if ( + positions_data + and isinstance(positions_data, dict) + and "status" in positions_data + and "data" in positions_data + and isinstance(positions_data["data"], dict) + and "net" in positions_data["data"] + ): + + for position in positions_data["data"]["net"]: + tradingsymbol = str(position.get("tradingsymbol", "")).upper() + if tradingsymbol.endswith("CE") or tradingsymbol.endswith("PE"): + if position.get("exchange", "").upper() == "NSE": + position["exchange"] = "NFO" + + return { + "status": positions_data["status"], + "data": positions_data["data"]["net"] + } + + # If data missing or malformed, still preserve status if possible + return { + "status": positions_data.get("status", "error") if isinstance(positions_data, dict) else "error", + "data": [] + } + +def get_holdings(auth): + return get_api_response("/portfolio/holdings",auth) \ No newline at end of file diff --git a/broker/mstock/database/master_contract_db.py b/broker/mstock/database/master_contract_db.py new file mode 100644 index 00000000..0ad5fd30 --- /dev/null +++ b/broker/mstock/database/master_contract_db.py @@ -0,0 +1,224 @@ +import os +import pandas as pd +import requests +from io import StringIO +from datetime import datetime +from sqlalchemy import create_engine, Column, Integer, String, Float, Sequence, Index +from sqlalchemy.orm import scoped_session, sessionmaker +from sqlalchemy.ext.declarative import declarative_base + +from extensions import socketio +from utils.logging import get_logger +from database.auth_db import get_auth_token + +logger = get_logger(__name__) + +# ------------------------------------------------------------------- +# DATABASE SETUP +# ------------------------------------------------------------------- +DATABASE_URL = os.getenv('DATABASE_URL') +engine = create_engine(DATABASE_URL) +db_session = scoped_session(sessionmaker(autocommit=False, autoflush=False, bind=engine)) +Base = declarative_base() +Base.query = db_session.query_property() + + +# ------------------------------------------------------------------- +# TABLE DEFINITION +# ------------------------------------------------------------------- +class SymToken(Base): + __tablename__ = 'symtoken' + + id = Column(Integer, Sequence('symtoken_id_seq'), primary_key=True) + symbol = Column(String, nullable=False, index=True) + brsymbol = Column(String, nullable=False, index=True) + name = Column(String) + exchange = Column(String, index=True) + brexchange = Column(String, index=True) + token = Column(String, index=True) + expiry = Column(String) + strike = Column(Float) + lotsize = Column(Integer) + instrumenttype = Column(String) + tick_size = Column(Float) + + __table_args__ = (Index('idx_symbol_exchange', 'symbol', 'exchange'),) + + +# ------------------------------------------------------------------- +# INIT / UTILS +# ------------------------------------------------------------------- +def init_db(): + logger.info("Initializing MStock Master Contract DB") + Base.metadata.create_all(bind=engine) + + +def delete_symtoken_table(): + logger.info("Deleting SymToken Table (MStock)") + SymToken.query.delete() + db_session.commit() + + +def copy_from_dataframe(df): + """Bulk insert DataFrame records into the symtoken table.""" + logger.info("Performing Bulk Insert into SymToken Table") + + data_dict = df.to_dict(orient='records') + existing_tokens = {result.token for result in db_session.query(SymToken.token).all()} + + filtered_data_dict = [ + row for row in data_dict if row.get('token') and str(row['token']) not in existing_tokens + ] + + try: + if filtered_data_dict: + db_session.bulk_insert_mappings(SymToken, filtered_data_dict) + db_session.commit() + logger.info(f"Inserted {len(filtered_data_dict)} new records successfully.") + else: + logger.info("No new MStock records to insert.") + except Exception as e: + logger.error(f"Error during MStock bulk insert: {e}") + db_session.rollback() + + +# ------------------------------------------------------------------- +# MStock Master Contract Fetch +# ------------------------------------------------------------------- +def download_mstock_csv(auth_token): + """ + Download the MStock master contract CSV from the API. + """ + api_key = os.getenv('BROKER_API_KEY') + url = 'https://api.mstock.trade/openapi/typea/instruments/scriptmaster' + + headers = { + 'X-Mirae-Version': '1', + 'Authorization': f'token {api_key}:{auth_token}', + } + + logger.info(f"Fetching MStock master contract from {url}") + + try: + response = requests.get(url, headers=headers, timeout=60) + logger.info(f"MStock master contract download status: {response.status_code}") + + if response.status_code != 200: + logger.error(f"Failed to download MStock master contract: {response.status_code}") + return None + + return response.text + + except Exception as e: + logger.error(f"Error fetching MStock master contract: {e}") + return None + + +# ------------------------------------------------------------------- +# PROCESS CSV +# ------------------------------------------------------------------- +def process_mstock_csv(csv_text): + """ + Convert raw CSV text from MStock API to DataFrame in OpenAlgo schema. + """ + try: + df = pd.read_csv(StringIO(csv_text)) + except Exception as e: + logger.error(f"Error reading MStock CSV: {e}") + return pd.DataFrame() + + # Normalize column names + df.columns = [col.strip().lower() for col in df.columns] + + expected_cols = {'instrument_token','exchange_token','tradingsymbol','name','last_price','expiry','strike','tick_size','lot_size','instrument_type','segment','exchange' } + + if not expected_cols.issubset(df.columns): + logger.error(f"Unexpected MStock CSV columns: {list(df.columns)}") + return pd.DataFrame() + + # Map to OpenAlgo schema + df['symbol'] = df['tradingsymbol'].astype(str) + df['brsymbol'] = df['symbol'] + df['name'] = df['name'].astype(str) + df['exchange'] = df['exchange'].astype(str) + df['brexchange'] = df['exchange'] + df['token'] = df['instrument_token'].astype(str) + df['expiry'] = df['expiry'].fillna('') + df['strike'] = pd.to_numeric(df['strike'], errors='coerce').fillna(0) + df['lotsize'] = pd.to_numeric(df['lot_size'], errors='coerce').fillna(1).astype(int) + df['instrumenttype'] = df['instrument_type'] + df['tick_size'] = pd.to_numeric(df['tick_size'], errors='coerce').fillna(0.05) + + # Remove duplicates + df = df.drop_duplicates(subset=['symbol', 'exchange']) + + # Keep only relevant columns + final_cols = [ + 'symbol', 'brsymbol', 'name', 'exchange', 'brexchange', + 'token', 'expiry', 'strike', 'lotsize', 'instrumenttype', 'tick_size' + ] + df = df[final_cols] + + logger.info(f"MStock Master Contract Processed: {len(df)} records ready") + return df + + +# ------------------------------------------------------------------- +# MASTER CONTRACT PIPELINE +# ------------------------------------------------------------------- +def master_contract_download(): + """ + Main async download pipeline for MStock master contract. + """ + try: + login_username = os.getenv('LOGIN_USERNAME') + auth_token = get_auth_token(login_username) + + safe_token = f"{auth_token[:6]}..." if auth_token else "None" + logger.info(f"Downloading MStock Master Contract (token={safe_token})") + + csv_data = download_mstock_csv(auth_token) + if not csv_data: + logger.error("No data received from MStock API.") + socketio.emit('master_contract_download', { + 'status': 'error', + 'message': 'Failed to download MStock Master Contract' + }) + return + + token_df = process_mstock_csv(csv_data) + + if token_df is None or token_df.empty: + socketio.emit('master_contract_download', { + 'status': 'error', + 'message': 'Empty or invalid master contract data' + }) + return + + delete_symtoken_table() + copy_from_dataframe(token_df) + + socketio.emit('master_contract_download', { + 'status': 'success', + 'message': 'MStock Master Contract downloaded and stored successfully' + }) + + except Exception as e: + logger.error(f"Error during MStock master contract pipeline: {str(e)}") + socketio.emit('master_contract_download', { + 'status': 'error', + 'message': str(e) + }) + + +# ------------------------------------------------------------------- +# SEARCH SYMBOL +# ------------------------------------------------------------------- +def search_symbols(symbol, exchange): + """ + Search symbols in MStock Master Contract DB. + """ + return SymToken.query.filter( + SymToken.symbol.like(f"%{symbol}%"), + SymToken.exchange == exchange + ).all() diff --git a/broker/mstock/mapping/order_data.py b/broker/mstock/mapping/order_data.py new file mode 100644 index 00000000..8eb38f27 --- /dev/null +++ b/broker/mstock/mapping/order_data.py @@ -0,0 +1,207 @@ +import json +from database.token_db import get_symbol, get_oa_symbol +from utils.logging import get_logger + +logger = get_logger(__name__) + + +def map_order_data(order_data): + """ + Processes and modifies a list of order dictionaries based on specific conditions. + """ + if not order_data or 'data' not in order_data or order_data['data'] is None: + logger.info("No data available.") + order_data = [] + else: + order_data = order_data['data'] + logger.info(f"{order_data}") + + if order_data: + for order in order_data: + symboltoken = order.get('instrument_token') + exchange = order.get('exchange') + + if symboltoken and exchange: + symbol_from_db = get_symbol(symboltoken, exchange) + if symbol_from_db: + order['tradingsymbol'] = symbol_from_db + else: + logger.info(f"Symbol not found for token {symboltoken} and exchange {exchange}. Keeping original trading symbol.") + return order_data + + +def calculate_order_statistics(order_data): + """ + Calculates statistics from order data. + """ + total_buy_orders = total_sell_orders = 0 + total_completed_orders = total_open_orders = total_rejected_orders = 0 + + if order_data: + for order in order_data: + if order.get('transaction_type') == 'BUY': + total_buy_orders += 1 + elif order.get('transaction_type') == 'SELL': + total_sell_orders += 1 + + status = order.get('status', '').lower() + if 'complete' in status or 'traded' in status: + total_completed_orders += 1 + elif 'open' in status or 'pending' in status: + total_open_orders += 1 + elif 'rejected' in status: + total_rejected_orders += 1 + + return { + 'total_buy_orders': total_buy_orders, + 'total_sell_orders': total_sell_orders, + 'total_completed_orders': total_completed_orders, + 'total_open_orders': total_open_orders, + 'total_rejected_orders': total_rejected_orders + } + + +def transform_order_data(orders): + if isinstance(orders, dict): + orders = [orders] + + transformed_orders = [] + + for order in orders: + if not isinstance(order, dict): + logger.warning(f"Warning: Expected a dict, but found a {type(order)}. Skipping this item.") + continue + + transformed_order = { + "symbol": order.get("tradingsymbol", ""), + "exchange": order.get("exchange", ""), + "action": order.get("transaction_type", ""), + "quantity": order.get("quantity", 0), + "price": order.get("average_price", 0.0), + "trigger_price": order.get("trigger_price", 0.0), + "pricetype": order.get("order_type", ""), + "product": order.get("product", ""), + "orderid": order.get("order_id", ""), + "order_status": order.get("status", ""), + "timestamp": order.get("order_timestamp", "") + } + transformed_orders.append(transformed_order) + + return transformed_orders + + +def map_trade_data(trade_data): + """ + Processes and modifies a list of trade dictionaries. + """ + if not trade_data or 'data' not in trade_data or trade_data['data'] is None: + logger.info("No trade data available.") + return [] + + trade_data = trade_data['data'] + for trade in trade_data: + symbol = trade.get('SYMBOL') + exchange = trade.get('EXCHANGE') + if symbol and exchange: + oa_symbol = get_oa_symbol(symbol, exchange) + if oa_symbol: + trade['tradingsymbol'] = oa_symbol + else: + logger.info(f"Unable to find the OA symbol for {symbol} and exchange {exchange}.") + return trade_data + + +def transform_tradebook_data(tradebook_data): + transformed_data = [] + for trade in tradebook_data: + transformed_trade = { + "symbol": trade.get('tradingsymbol', ''), + "exchange": trade.get('EXCHANGE', ''), + "product": trade.get('PRODUCT', ''), + "action": trade.get('BUY_SELL', ''), + "quantity": trade.get('QUANTITY', 0), + "average_price": trade.get('PRICE', 0.0), + "trade_value": trade.get('TRADE_VALUE', 0), + "orderid": trade.get('ORDER_NUMBER', ''), + "timestamp": trade.get('ORDER_DATE_TIME', '') + } + transformed_data.append(transformed_trade) + return transformed_data + + +def map_position_data(position_data): + return map_order_data(position_data) + + +def transform_positions_data(positions_data): + transformed_data = [] + if 'data' in positions_data and positions_data['data']: + for position in positions_data['data']: + transformed_position = { + "symbol": position.get('tradingsymbol', ''), + "exchange": position.get('exchange', ''), + "product": position.get('product', ''), + "quantity": position.get('net_quantity', 0), + "average_price": position.get('average_price', 0.0), + "ltp": position.get('last_traded_price', 0.0), + "pnl": position.get('pnl', 0.0), + } + transformed_data.append(transformed_position) + return transformed_data + +def transform_holdings_data(holdings_data): + transformed_data = [] + if 'data' in holdings_data and holdings_data['data']: + for holding in holdings_data['data']: + transformed_holding = { + "symbol": holding.get('trading_symbol', ''), + "exchange": holding.get('exchange', ''), + "quantity": holding.get('quantity', 0), + "product": holding.get('product', ''), + "pnl": holding.get('pnl', 0.0), + "pnlpercent": holding.get('pnl_percentage', 0.0) + } + transformed_data.append(transformed_holding) + return transformed_data + + +def map_portfolio_data(portfolio_data): + """ + Processes portfolio data. + """ + if portfolio_data.get('data') is None: + logger.info("No portfolio data available.") + return {} + + data = portfolio_data['data'] + if 'holdings' in data and data['holdings']: + for holding in data['holdings']: + symbol = holding.get('trading_symbol') + exchange = holding.get('exchange') + if symbol and exchange: + oa_symbol = get_oa_symbol(symbol, exchange) + if oa_symbol: + holding['tradingsymbol'] = oa_symbol + + return data + + +def calculate_portfolio_statistics(holdings_data): + totalholdingvalue = 0 + totalinvvalue = 0 + totalprofitandloss = 0 + totalpnlpercentage = 0 + + if 'data' in holdings_data and 'total_holding' in holdings_data['data']: + total_holding = holdings_data['data']['total_holding'] + totalholdingvalue = total_holding.get('total_holding_value', 0) + totalinvvalue = total_holding.get('total_investment_value', 0) + totalprofitandloss = total_holding.get('total_pnl', 0) + totalpnlpercentage = total_holding.get('total_pnl_percentage', 0) + + return { + 'totalholdingvalue': totalholdingvalue, + 'totalinvvalue': totalinvvalue, + 'totalprofitandloss': totalprofitandloss, + 'totalpnlpercentage': totalpnlpercentage + } diff --git a/broker/mstock/mapping/transform_data.py b/broker/mstock/mapping/transform_data.py new file mode 100644 index 00000000..e159a8ad --- /dev/null +++ b/broker/mstock/mapping/transform_data.py @@ -0,0 +1,71 @@ +#Mapping OpenAlgo API Request https://openalgo.in/docs +#Mapping MStock Parameters https://tradingapi.mstock.com/docs/v1/typeA/Orders/ + +from database.token_db import get_br_symbol + +def transform_data(data,token): + """ + Transforms the new API request structure to the current expected structure. + """ + symbol = get_br_symbol(data["symbol"],data["exchange"]) + transformed = { + "tradingsymbol": symbol, + "exchange": data["exchange"], + "transaction_type": data["action"].upper(), + "order_type": map_order_type(data["pricetype"]), + "quantity": data["quantity"], + "product": map_product_type(data["product"]), + "validity": "DAY", # Assuming DAY as default + "price": data.get("price", "0"), + "trigger_price": data.get("trigger_price", "0"), + "disclosed_quantity": data.get("disclosed_quantity", "0"), + } + return transformed + + +def transform_modify_order_data(data, token): + return { + "order_type": map_order_type(data["pricetype"]), + "quantity": data["quantity"], + "price": data["price"], + "validity": "DAY", + "disclosed_quantity": data.get("disclosed_quantity", "0"), + "trigger_price": data.get("trigger_price", "0") + } + + + +def map_order_type(pricetype): + """ + Maps the new pricetype to the existing order type. + """ + order_type_mapping = { + "MARKET": "MARKET", + "LIMIT": "LIMIT", + "SL": "SL", + "SL-M": "SL-M" + } + return order_type_mapping.get(pricetype, "MARKET") + +def map_product_type(product): + """ + Maps the new product type to the existing product type. + """ + product_type_mapping = { + "CNC": "CNC", + "NRML": "NRML", + "MIS": "MIS", + } + return product_type_mapping.get(product, "MIS") + + +def reverse_map_product_type(product): + """ + Maps the new product type to the existing product type. + """ + reverse_product_type_mapping = { + "CNC": "CNC", + "NRML": "NRML", + "MIS": "MIS", + } + return reverse_product_type_mapping.get(product) diff --git a/broker/mstock/plugin.json b/broker/mstock/plugin.json new file mode 100644 index 00000000..a622c40d --- /dev/null +++ b/broker/mstock/plugin.json @@ -0,0 +1,8 @@ +{ + "Plugin Name": "mstock", + "Plugin URI": "https://openalgo.in", + "Description": "mstock OpenAlgo Plugin", + "Version": "1.0", + "Author": "Rajandran R", + "Author URI": "https://openalgo.in" +} \ No newline at end of file diff --git a/broker/mstock/remainwork.md b/broker/mstock/remainwork.md new file mode 100644 index 00000000..e2b09fc6 --- /dev/null +++ b/broker/mstock/remainwork.md @@ -0,0 +1,39 @@ +# Remaining Work for MStock Broker Integration + +## Missing Files + +- `broker/mstock/api/__init__.py` ok +- `broker/mstock/streaming/mstock_mapping.py` +- `broker/mstock/streaming/smartWebSocketV2.py` (or equivalent) + +## Missing Functions + +### `broker/mstock/api/data.py` + +- `get_quotes(symbol, exchange)` +- `get_history(symbol, exchange, interval, start_date, end_date)` +- `get_depth(symbol, exchange)` + +### `broker/mstock/api/order_api.py` + +- `get_positions(auth)` ok +- `get_holdings(auth)` ok +- `get_open_position(tradingsymbol, exchange, producttype,auth)` +- `place_smartorder_api(data,auth)` +- `close_all_positions(current_api_key,auth)` +- `cancel_all_orders_api(data,auth)` + +### `broker/mstock/mapping/order_data.py` + +- `map_order_data(order_data)` +- `calculate_order_statistics(order_data)` +- `map_trade_data(trade_data)` +- `map_position_data(position_data)` +- `map_portfolio_data(portfolio_data)` +- `calculate_portfolio_statistics(holdings_data)` + +### `broker/mstock/mapping/transform_data.py` + +- `map_product_type(product)` +- `reverse_map_product_type(product)` +- `map_variety(pricetype)` diff --git a/broker/mstock/streaming/adapter.py b/broker/mstock/streaming/adapter.py new file mode 100644 index 00000000..f327ca77 --- /dev/null +++ b/broker/mstock/streaming/adapter.py @@ -0,0 +1,83 @@ +from websocket_proxy.base_adapter import BaseBrokerWebSocketAdapter +import websocket +import json +import threading +from utils.logging import get_logger + +logger = get_logger(__name__) + +class MstockWebSocketAdapter(BaseBrokerWebSocketAdapter): + def __init__(self): + super().__init__() + self.ws = None + self.thread = None + + def initialize(self, broker_name, user_id, auth_data=None): + self.broker_name = broker_name + self.user_id = user_id + self.auth_data = auth_data + + def connect(self): + def on_message(ws, message): + data = json.loads(message) + # Process the message and publish to ZMQ + # This is a placeholder, the actual topic and data will depend on the message format + topic = f"{self.broker_name}_{data.get('symbol', 'UNKNOWN')}" + self.publish_market_data(topic, data) + + def on_error(ws, error): + logger.error(f"WebSocket error: {error}") + + def on_close(ws, close_status_code, close_msg): + logger.info("WebSocket closed") + self.connected = False + + def on_open(ws): + logger.info("WebSocket connection opened") + self.connected = True + # Subscribe to symbols upon connection + for (symbol, exchange), mode in self.subscriptions.items(): + self.subscribe(symbol, exchange, mode) + + self.ws = websocket.WebSocketApp( + "wss://ws.mstock.trade", + on_open=on_open, + on_message=on_message, + on_error=on_error, + on_close=on_close + ) + self.thread = threading.Thread(target=self.ws.run_forever) + self.thread.start() + + def disconnect(self): + if self.ws: + self.ws.close() + if self.thread: + self.thread.join() + + def subscribe(self, symbol, exchange, mode=2, depth_level=5): + if not self.connected: + return self._create_error_response(503, "WebSocket not connected") + + # mstock's websocket uses a simple string format for subscriptions + # Format: |# + # Mode: 1 for LTP, 2 for Quote, 4 for Depth + # We'll need a way to get the token for a given symbol + # For now, we will use a placeholder token. + # This will be addressed in a future commit. + placeholder_token = "12345" + subscription_string = f"{exchange}|{placeholder_token}#{mode}" + self.ws.send(subscription_string) + self.subscriptions[(symbol, exchange)] = mode + return self._create_success_response("Subscribed successfully") + + def unsubscribe(self, symbol, exchange, mode=2): + if not self.connected: + return self._create_error_response(503, "WebSocket not connected") + + placeholder_token = "12345" + unsubscription_string = f"{exchange}|{placeholder_token}#0" # Mode 0 to unsubscribe + self.ws.send(unsubscription_string) + if (symbol, exchange) in self.subscriptions: + del self.subscriptions[(symbol, exchange)] + return self._create_success_response("Unsubscribed successfully") diff --git a/install/install-docker.sh b/install/install-docker.sh index 14091c2c..61c9ac56 100644 --- a/install/install-docker.sh +++ b/install/install-docker.sh @@ -42,7 +42,7 @@ generate_hex() { # Function to validate broker validate_broker() { local broker=$1 - local valid_brokers="fivepaisa,fivepaisaxts,aliceblue,angel,compositedge,definedge,dhan,dhan_sandbox,firstock,flattrade,fyers,groww,ibulls,iifl,indmoney,kotak,motilal,paytm,pocketful,shoonya,tradejini,upstox,wisdom,zebu,zerodha" + local valid_brokers="fivepaisa,fivepaisaxts,aliceblue,angel,compositedge,definedge,dhan,dhan_sandbox,firstock,flattrade,fyers,groww,ibulls,iifl,indmoney,kotak,motilal,paytm,pocketful,shoonya,tradejini,upstox,wisdom,zebu,zerodha,mstock" [[ $valid_brokers == *"$broker"* ]] } @@ -116,7 +116,7 @@ while true; do echo "fivepaisa, fivepaisaxts, aliceblue, angel, compositedge, definedge," echo "dhan, dhan_sandbox, firstock, flattrade, fyers, groww, ibulls, iifl," echo "indmoney, kotak, motilal, paytm, pocketful, shoonya, tradejini," - echo "upstox, wisdom, zebu, zerodha" + echo "upstox, wisdom, zebu, zerodha, mstock" echo "" read -p "Enter your broker name: " BROKER_NAME if validate_broker "$BROKER_NAME"; then diff --git a/install/install-multi.sh b/install/install-multi.sh index 4d4d19ea..317025f3 100644 --- a/install/install-multi.sh +++ b/install/install-multi.sh @@ -50,7 +50,7 @@ generate_hex() { # Function to validate broker name validate_broker() { local broker=$1 - local valid_brokers="fivepaisa,fivepaisaxts,aliceblue,angel,compositedge,definedge,dhan,dhan_sandbox,firstock,flattrade,fyers,groww,ibulls,iifl,indmoney,kotak,motilal,paytm,pocketful,shoonya,tradejini,upstox,wisdom,zebu,zerodha" + local valid_brokers="fivepaisa,fivepaisaxts,aliceblue,angel,compositedge,definedge,dhan,dhan_sandbox,firstock,flattrade,fyers,groww,ibulls,iifl,indmoney,kotak,motilal,paytm,pocketful,shoonya,tradejini,upstox,wisdom,zebu,zerodha,mstock" if [[ $valid_brokers == *"$broker"* ]]; then return 0 @@ -152,7 +152,7 @@ for ((i=1; i<=INSTANCES; i++)); do # Get broker while true; do - log_message "\nValid brokers: fivepaisa,fivepaisaxts,aliceblue,angel,compositedge,definedge,dhan,dhan_sandbox,firstock,flattrade,fyers,groww,ibulls,iifl,indmoney,kotak,motilal,paytm,pocketful,shoonya,tradejini,upstox,wisdom,zebu,zerodha" "$BLUE" + log_message "\nValid brokers: fivepaisa,fivepaisaxts,aliceblue,angel,compositedge,definedge,dhan,dhan_sandbox,firstock,flattrade,fyers,groww,ibulls,iifl,indmoney,kotak,motilal,paytm,pocketful,shoonya,tradejini,upstox,wisdom,zebu,zerodha,mstock" "$BLUE" read -p "Enter broker name for instance $i: " broker if validate_broker "$broker"; then BROKERS+=("$broker") diff --git a/install/install.sh b/install/install.sh index 08dd4411..1942787f 100644 --- a/install/install.sh +++ b/install/install.sh @@ -110,7 +110,7 @@ generate_hex() { validate_broker() { local broker=$1 - local valid_brokers="fivepaisa,fivepaisaxts,aliceblue,angel,compositedge,definedge,dhan,dhan_sandbox,firstock,flattrade,fyers,groww,ibulls,iifl,indmoney,kotak,motilal,paytm,pocketful,shoonya,tradejini,upstox,wisdom,zebu,zerodha" + local valid_brokers="fivepaisa,fivepaisaxts,aliceblue,angel,compositedge,definedge,dhan,dhan_sandbox,firstock,flattrade,fyers,groww,ibulls,iifl,indmoney,kotak,motilal,paytm,pocketful,shoonya,tradejini,upstox,wisdom,zebu,zerodha,mstock" if [[ $valid_brokers == *"$broker"* ]]; then return 0 @@ -355,7 +355,7 @@ done # Get broker name while true; do - log_message "\nValid brokers: fivepaisa,fivepaisaxts,aliceblue,angel,compositedge,definedge,dhan,dhan_sandbox,firstock,flattrade,fyers,groww,ibulls,iifl,indmoney,kotak,motilal,paytm,pocketful,shoonya,tradejini,upstox,wisdom,zebu,zerodha" "$BLUE" + log_message "\nValid brokers: fivepaisa,fivepaisaxts,aliceblue,angel,compositedge,definedge,dhan,dhan_sandbox,firstock,flattrade,fyers,groww,ibulls,iifl,indmoney,kotak,motilal,paytm,pocketful,shoonya,tradejini,upstox,wisdom,zebu,zerodha,mstock" "$BLUE" read -p "Enter your broker name: " BROKER_NAME if validate_broker "$BROKER_NAME"; then diff --git a/templates/broker.html b/templates/broker.html index 79a54b87..0f75a818 100644 --- a/templates/broker.html +++ b/templates/broker.html @@ -41,7 +41,9 @@ case 'angel': loginUrl = '/angel/callback'; break; - + case 'mstock': + loginUrl = '/mstock/callback'; + break; case 'dhan': // Directly initiate OAuth flow (client_id from .env) loginUrl = '/dhan/initiate-oauth'; @@ -159,6 +161,7 @@

Connect Your Trading + +
+ + + +
+ + {% if error_message %} +
+ + + + {{ error_message }} +
+ {% endif %} + +
+ +
+ + +
OR
+ + + + + + Back to Broker Selection + + + + + +
+

Connect MStock

+

+ Enter your 6-digit TOTP from your MStock authenticator app to securely connect your trading account with OpenAlgo. +

+ +
+
+ + + +
+

Security Note

+
Only enter your TOTP code generated by your registered authenticator app.
+
+
+ + +
+
+ + + +{% endblock %} diff --git a/test/test_mstock.py b/test/test_mstock.py new file mode 100644 index 00000000..0f3a4e39 --- /dev/null +++ b/test/test_mstock.py @@ -0,0 +1,51 @@ +import os +import unittest +from openalgo import api as OAClient +from dotenv import load_dotenv + +# Load environment variables from .env file +load_dotenv() + +class TestMstockBroker(unittest.TestCase): + def setUp(self): + """Set up for the test case.""" + # The test assumes that the OpenAlgo server is running and + # the user is already logged into the mstock broker. + self.api_key = os.getenv( + "OPENALGO_API_KEY", + "3bb8d260915ff680a7258108c0483b9eb7675ced31309a36f5846366943ee9fa" + ) + self.client = OAClient(api_key=self.api_key, host="http://127.0.0.1:5000") + + def test_place_order(self): + """Test placing a simple order.""" + # This test requires an active mstock session in the OpenAlgo server + order_response = self.client.placeorder( + strategy="TEST", + symbol="TCS", + exchange="NSE", + price_type="MARKET", + product="MIS", + action="BUY", + quantity=1 + ) + self.assertEqual(order_response.get("status"), "success") + self.assertIn("orderid", order_response) + + def test_get_positions(self): + """Test retrieving positions.""" + positions_response = self.client.positionbook() + self.assertEqual(positions_response.get("status"), "success") + + def test_get_holdings(self): + """Test retrieving holdings.""" + holdings_response = self.client.holdings() + self.assertEqual(holdings_response.get("status"), "success") + + def test_get_funds(self): + """Test retrieving funds.""" + funds_response = self.client.funds() + self.assertEqual(funds_response.get("status"), "success") + +if __name__ == '__main__': + unittest.main()