From 3b9bebad4f7e4bc417d342079ad12b460132f122 Mon Sep 17 00:00:00 2001 From: kalai-de Date: Thu, 6 Nov 2025 17:13:34 +0800 Subject: [PATCH] feat(broker): add custom broker API endpoint and refactor zerodha API calls - Introduce new RESTX endpoint for custom broker API requests - Extract common API response logic into shared module for zerodha - Refactor funds and order APIs to use shared API response module --- broker/zerodha/api/api_response.py | 75 +++++++++++++++++++++++++++++ broker/zerodha/api/funds.py | 36 +------------- broker/zerodha/api/order_api.py | 77 +----------------------------- restx_api/__init__.py | 2 + restx_api/broker_custom.py | 57 ++++++++++++++++++++++ restx_api/data_schemas.py | 6 +++ services/broker_custom_service.py | 58 ++++++++++++++++++++++ 7 files changed, 201 insertions(+), 110 deletions(-) create mode 100644 broker/zerodha/api/api_response.py create mode 100644 restx_api/broker_custom.py create mode 100644 services/broker_custom_service.py diff --git a/broker/zerodha/api/api_response.py b/broker/zerodha/api/api_response.py new file mode 100644 index 00000000..4ec20e8e --- /dev/null +++ b/broker/zerodha/api/api_response.py @@ -0,0 +1,75 @@ +from utils.httpx_client import get_httpx_client +from utils.logging import get_logger + +logger = get_logger(__name__) + +def get_api_response(endpoint, auth, method="GET", payload=None): + """ + Make an API request to Zerodha's API using shared httpx client with connection pooling. + + Args: + endpoint (str): API endpoint (e.g., '/orders') + auth (str): Authentication token + method (str): HTTP method (GET, POST, etc.) + payload (dict/str, optional): Request payload + + Returns: + dict: API response data + """ + AUTH_TOKEN = auth + base_url = 'https://api.kite.trade' + + # Get the shared httpx client with connection pooling + client = get_httpx_client() + + headers = { + 'X-Kite-Version': '3', + 'Authorization': f'token {AUTH_TOKEN}' + } + + url = f"{base_url}{endpoint}" + + try: + # Handle different HTTP methods + if method.upper() == 'GET': + response = client.get( + url, + headers=headers + ) + elif method.upper() == 'POST': + if isinstance(payload, str): + # For form-urlencoded data + headers['Content-Type'] = 'application/x-www-form-urlencoded' + response = client.post( + url, + headers=headers, + content=payload + ) + else: + # For JSON data + headers['Content-Type'] = 'application/json' + response = client.post( + url, + headers=headers, + json=payload + ) + else: + raise ValueError(f"Unsupported HTTP method: {method}") + + # Parse and return JSON response + response.raise_for_status() + return response.json() + + except Exception as e: + error_msg = str(e) + # Try to extract more error details if available + try: + if hasattr(e, 'response') and e.response is not None: + error_detail = e.response.json() + error_msg = error_detail.get('message', error_msg) + except: + pass + + logger.exception(f"API request failed: {error_msg}") + raise + diff --git a/broker/zerodha/api/funds.py b/broker/zerodha/api/funds.py index cff321af..6a1bf7a6 100644 --- a/broker/zerodha/api/funds.py +++ b/broker/zerodha/api/funds.py @@ -1,8 +1,6 @@ # api/funds.py -import os -import json -from utils.httpx_client import get_httpx_client +from broker.zerodha.api.api_response import get_api_response from utils.logging import get_logger logger = get_logger(__name__) @@ -11,37 +9,7 @@ def get_margin_data(auth_token): """Fetch margin data from Zerodha's API using the provided auth token.""" - api_key = os.getenv('BROKER_API_KEY') - - # Get the shared httpx client with connection pooling - client = get_httpx_client() - - headers = { - 'X-Kite-Version': '3', - 'Authorization': f'token {auth_token}' - } - - try: - # Make the GET request using the shared client - response = client.get( - 'https://api.kite.trade/user/margins', - headers=headers - ) - response.raise_for_status() # Raises an exception for 4XX/5XX responses - - # Parse the response - margin_data = response.json() - except Exception as e: - error_message = str(e) - try: - if hasattr(e, 'response') and e.response is not None: - error_detail = e.response.json() - error_message = error_detail.get('message', str(e)) - except: - pass - - logger.error(f"Error fetching margin data: {error_message}") - return {} + margin_data = get_api_response("/user/margins",auth_token) logger.info(f"Funds Details: {margin_data}") diff --git a/broker/zerodha/api/order_api.py b/broker/zerodha/api/order_api.py index a5fbc01a..4c5c9292 100644 --- a/broker/zerodha/api/order_api.py +++ b/broker/zerodha/api/order_api.py @@ -1,8 +1,6 @@ -import http.client -import json import os import urllib.parse -from database.auth_db import get_auth_token +from broker.zerodha.api.api_response import get_api_response from database.token_db import get_br_symbol, get_oa_symbol from broker.zerodha.mapping.transform_data import transform_data, map_product_type, reverse_map_product_type, transform_modify_order_data from utils.httpx_client import get_httpx_client @@ -10,79 +8,6 @@ logger = get_logger(__name__) - - - -def get_api_response(endpoint, auth, method="GET", payload=None): - """ - Make an API request to Zerodha's API using shared httpx client with connection pooling. - - Args: - endpoint (str): API endpoint (e.g., '/orders') - auth (str): Authentication token - method (str): HTTP method (GET, POST, etc.) - payload (dict/str, optional): Request payload - - Returns: - dict: API response data - """ - AUTH_TOKEN = auth - base_url = 'https://api.kite.trade' - - # Get the shared httpx client with connection pooling - client = get_httpx_client() - - headers = { - 'X-Kite-Version': '3', - 'Authorization': f'token {AUTH_TOKEN}' - } - - url = f"{base_url}{endpoint}" - - try: - # Handle different HTTP methods - if method.upper() == 'GET': - response = client.get( - url, - headers=headers - ) - elif method.upper() == 'POST': - if isinstance(payload, str): - # For form-urlencoded data - headers['Content-Type'] = 'application/x-www-form-urlencoded' - response = client.post( - url, - headers=headers, - content=payload - ) - else: - # For JSON data - headers['Content-Type'] = 'application/json' - response = client.post( - url, - headers=headers, - json=payload - ) - else: - raise ValueError(f"Unsupported HTTP method: {method}") - - # Parse and return JSON response - response.raise_for_status() - return response.json() - - except Exception as e: - error_msg = str(e) - # Try to extract more error details if available - try: - if hasattr(e, 'response') and e.response is not None: - error_detail = e.response.json() - error_msg = error_detail.get('message', error_msg) - except: - pass - - logger.exception(f"API request failed: {error_msg}") - raise - def get_order_book(auth): return get_api_response("/orders",auth) diff --git a/restx_api/__init__.py b/restx_api/__init__.py index 14d2e472..cb529b68 100644 --- a/restx_api/__init__.py +++ b/restx_api/__init__.py @@ -34,6 +34,7 @@ from .analyzer import api as analyzer_ns from .ping import api as ping_ns from .telegram_bot import api as telegram_ns +from .broker_custom import api as broker_custom_ns # Add namespaces api.add_namespace(place_order_ns, path='/placeorder') @@ -65,3 +66,4 @@ api.add_namespace(analyzer_ns, path='/analyzer') api.add_namespace(ping_ns, path='/ping') api.add_namespace(telegram_ns, path='/telegram') +api.add_namespace(broker_custom_ns, path='/broker/custom') diff --git a/restx_api/broker_custom.py b/restx_api/broker_custom.py new file mode 100644 index 00000000..4f1e35d2 --- /dev/null +++ b/restx_api/broker_custom.py @@ -0,0 +1,57 @@ +""" +Broker-custom RESTX endpoint. +Accepts a JSON payload that specifies broker, endpoint, method, and payload, +then forwards the call to the selected broker via execute_custom. +""" +from flask_restx import Namespace, Resource +from flask import request, jsonify, make_response +from marshmallow import ValidationError + +from restx_api.data_schemas import CustomBrokerSchema +from services.get_token_service import get_auth_token_brokers +from services.broker_custom_service import execute_custom +from utils.logging import get_logger + +api = Namespace('broker_custom', description='Execute custom broker API requests') +logger = get_logger(__name__) +schema = CustomBrokerSchema() + + +def _build_error_response(message: str, status_code: int): + """Helper to create a consistent error payload.""" + return make_response(jsonify({'status': 'error', 'message': message}), status_code) + + +@api.route('/', strict_slashes=False) +class CustomBroker(Resource): + @api.doc('execute_custom_broker_request') + def post(self): + """Execute custom broker API request.""" + try: + payload = schema.load(request.json) + except ValidationError as err: + return _build_error_response(err.messages, 400) + + api_key = payload['apikey'] + endpoint = payload['endpoint'] + method = payload['method'] + body = payload['payload'] + + auth_token, _, broker_name = get_auth_token_brokers(api_key=api_key) + if auth_token is None: + # Skip logging to avoid DB flooding from bad keys + return _build_error_response('Invalid openalgo apikey', 403) + + try: + data = execute_custom( + auth_token=auth_token, + broker_name=broker_name, + endpoint=endpoint, + method=method, + payload=body, + ) + except Exception as exc: # noqa: BLE001 # service already logs internals + logger.exception('Custom broker request failed: %s', exc) + return _build_error_response('An unexpected error occurred', 500) + + return make_response(data, 200) diff --git a/restx_api/data_schemas.py b/restx_api/data_schemas.py index 0fd4ef6d..cf16cd8b 100644 --- a/restx_api/data_schemas.py +++ b/restx_api/data_schemas.py @@ -103,3 +103,9 @@ class OptionGreeksSchema(Schema): underlying_symbol = fields.Str(required=False) # Optional: Specify underlying symbol (e.g., NIFTY or NIFTY28NOV24FUT) underlying_exchange = fields.Str(required=False) # Optional: Specify underlying exchange (NSE_INDEX, NFO, etc.) expiry_time = fields.Str(required=False) # Optional: Custom expiry time in HH:MM format (e.g., "15:30", "19:00"). If not provided, uses exchange defaults + +class CustomBrokerSchema(Schema): + apikey = fields.Str(required=True) + endpoint = fields.Str(required=True) + method = fields.Str(required=False, default="GET") + payload = fields.Raw(required=False, default=None) diff --git a/services/broker_custom_service.py b/services/broker_custom_service.py new file mode 100644 index 00000000..ed7c4464 --- /dev/null +++ b/services/broker_custom_service.py @@ -0,0 +1,58 @@ +""" +Broker-specific custom API service. + +Provides a thin wrapper to dynamically invoke broker modules +that expose a `get_api_response` function. +""" +from __future__ import annotations + +import importlib +from typing import Any + +from utils.logging import get_logger + +logger = get_logger(__name__) + +def import_broker_module(broker_name: str) -> Any | None: + """Dynamically import the broker-specific order API module. + + Args: + broker_name: Name of the broker (e.g., 'zerodha', 'angel'). + + Returns: + The imported module or None if the import fails. + """ + module_path = f"broker.{broker_name}.api.api_response" + try: + return importlib.import_module(module_path) + except ImportError as exc: + logger.error("Error importing broker module '%s': %s", module_path, exc) + return None + +def execute_custom( + endpoint: str, + auth_token: str, + broker_name: str, + method: str = "POST", + payload: dict[str, Any] | None = None, +) -> dict[str, Any]: + """Execute a custom API request through the specified broker module. + + Args: + endpoint: API endpoint (e.g., "/orders"). + auth_token: Authentication token for the broker API. + broker_name: Name of the broker. + method: HTTP method ("GET" or "POST"). + payload: Optional request payload. + + Returns: + Response data dict. On broker module failure returns + {'status': 'error', 'message': ''}. + """ + broker_module = import_broker_module(broker_name) + if broker_module is None: + return { + "status": "error", + "message": f"Broker module for {broker_name} not found", + } + return broker_module.get_api_response(endpoint, auth_token, method, payload or {})