Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions broker/zerodha/api/api_response.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
from utils.httpx_client import get_httpx_client
from utils.logging import get_logger

logger = get_logger(__name__)

def get_api_response(endpoint, auth, method="GET", payload=None):
"""
Make an API request to Zerodha's API using shared httpx client with connection pooling.

Args:
endpoint (str): API endpoint (e.g., '/orders')
auth (str): Authentication token
method (str): HTTP method (GET, POST, etc.)
payload (dict/str, optional): Request payload

Returns:
dict: API response data
"""
AUTH_TOKEN = auth
base_url = 'https://api.kite.trade'

# Get the shared httpx client with connection pooling
client = get_httpx_client()

headers = {
'X-Kite-Version': '3',
'Authorization': f'token {AUTH_TOKEN}'
}

url = f"{base_url}{endpoint}"

try:
# Handle different HTTP methods
if method.upper() == 'GET':
response = client.get(
url,
headers=headers
)
elif method.upper() == 'POST':
if isinstance(payload, str):
# For form-urlencoded data
headers['Content-Type'] = 'application/x-www-form-urlencoded'
response = client.post(
url,
headers=headers,
content=payload
)
else:
# For JSON data
headers['Content-Type'] = 'application/json'
response = client.post(
url,
headers=headers,
json=payload
)
else:
raise ValueError(f"Unsupported HTTP method: {method}")

# Parse and return JSON response
response.raise_for_status()
return response.json()

except Exception as e:
error_msg = str(e)
# Try to extract more error details if available
try:
if hasattr(e, 'response') and e.response is not None:
error_detail = e.response.json()
error_msg = error_detail.get('message', error_msg)
except:
pass

logger.exception(f"API request failed: {error_msg}")
raise

36 changes: 2 additions & 34 deletions broker/zerodha/api/funds.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
# api/funds.py

import os
import json
from utils.httpx_client import get_httpx_client
from broker.zerodha.api.api_response import get_api_response
from utils.logging import get_logger

logger = get_logger(__name__)
Expand All @@ -11,37 +9,7 @@

def get_margin_data(auth_token):
"""Fetch margin data from Zerodha's API using the provided auth token."""
api_key = os.getenv('BROKER_API_KEY')

# Get the shared httpx client with connection pooling
client = get_httpx_client()

headers = {
'X-Kite-Version': '3',
'Authorization': f'token {auth_token}'
}

try:
# Make the GET request using the shared client
response = client.get(
'https://api.kite.trade/user/margins',
headers=headers
)
response.raise_for_status() # Raises an exception for 4XX/5XX responses

# Parse the response
margin_data = response.json()
except Exception as e:
error_message = str(e)
try:
if hasattr(e, 'response') and e.response is not None:
error_detail = e.response.json()
error_message = error_detail.get('message', str(e))
except:
pass

logger.error(f"Error fetching margin data: {error_message}")
return {}
margin_data = get_api_response("/user/margins",auth_token)

logger.info(f"Funds Details: {margin_data}")

Expand Down
77 changes: 1 addition & 76 deletions broker/zerodha/api/order_api.py
Original file line number Diff line number Diff line change
@@ -1,88 +1,13 @@
import http.client
import json
import os
import urllib.parse
from database.auth_db import get_auth_token
from broker.zerodha.api.api_response import get_api_response
from database.token_db import get_br_symbol, get_oa_symbol
from broker.zerodha.mapping.transform_data import transform_data, map_product_type, reverse_map_product_type, transform_modify_order_data
from utils.httpx_client import get_httpx_client
from utils.logging import get_logger

logger = get_logger(__name__)




def get_api_response(endpoint, auth, method="GET", payload=None):
"""
Make an API request to Zerodha's API using shared httpx client with connection pooling.
Args:
endpoint (str): API endpoint (e.g., '/orders')
auth (str): Authentication token
method (str): HTTP method (GET, POST, etc.)
payload (dict/str, optional): Request payload
Returns:
dict: API response data
"""
AUTH_TOKEN = auth
base_url = 'https://api.kite.trade'

# Get the shared httpx client with connection pooling
client = get_httpx_client()

headers = {
'X-Kite-Version': '3',
'Authorization': f'token {AUTH_TOKEN}'
}

url = f"{base_url}{endpoint}"

try:
# Handle different HTTP methods
if method.upper() == 'GET':
response = client.get(
url,
headers=headers
)
elif method.upper() == 'POST':
if isinstance(payload, str):
# For form-urlencoded data
headers['Content-Type'] = 'application/x-www-form-urlencoded'
response = client.post(
url,
headers=headers,
content=payload
)
else:
# For JSON data
headers['Content-Type'] = 'application/json'
response = client.post(
url,
headers=headers,
json=payload
)
else:
raise ValueError(f"Unsupported HTTP method: {method}")

# Parse and return JSON response
response.raise_for_status()
return response.json()

except Exception as e:
error_msg = str(e)
# Try to extract more error details if available
try:
if hasattr(e, 'response') and e.response is not None:
error_detail = e.response.json()
error_msg = error_detail.get('message', error_msg)
except:
pass

logger.exception(f"API request failed: {error_msg}")
raise

def get_order_book(auth):
return get_api_response("/orders",auth)

Expand Down
2 changes: 2 additions & 0 deletions restx_api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from .analyzer import api as analyzer_ns
from .ping import api as ping_ns
from .telegram_bot import api as telegram_ns
from .broker_custom import api as broker_custom_ns

# Add namespaces
api.add_namespace(place_order_ns, path='/placeorder')
Expand Down Expand Up @@ -65,3 +66,4 @@
api.add_namespace(analyzer_ns, path='/analyzer')
api.add_namespace(ping_ns, path='/ping')
api.add_namespace(telegram_ns, path='/telegram')
api.add_namespace(broker_custom_ns, path='/broker/custom')
57 changes: 57 additions & 0 deletions restx_api/broker_custom.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
"""
Broker-custom RESTX endpoint.
Accepts a JSON payload that specifies broker, endpoint, method, and payload,
then forwards the call to the selected broker via execute_custom.
"""
from flask_restx import Namespace, Resource
from flask import request, jsonify, make_response
from marshmallow import ValidationError

from restx_api.data_schemas import CustomBrokerSchema
from services.get_token_service import get_auth_token_brokers
from services.broker_custom_service import execute_custom
from utils.logging import get_logger

api = Namespace('broker_custom', description='Execute custom broker API requests')
logger = get_logger(__name__)
schema = CustomBrokerSchema()


def _build_error_response(message: str, status_code: int):
"""Helper to create a consistent error payload."""
return make_response(jsonify({'status': 'error', 'message': message}), status_code)


@api.route('/', strict_slashes=False)
class CustomBroker(Resource):
@api.doc('execute_custom_broker_request')
def post(self):
"""Execute custom broker API request."""
try:
payload = schema.load(request.json)
except ValidationError as err:
return _build_error_response(err.messages, 400)

api_key = payload['apikey']
endpoint = payload['endpoint']
method = payload['method']
body = payload['payload']

auth_token, _, broker_name = get_auth_token_brokers(api_key=api_key)
if auth_token is None:
# Skip logging to avoid DB flooding from bad keys
return _build_error_response('Invalid openalgo apikey', 403)

try:
data = execute_custom(
auth_token=auth_token,
broker_name=broker_name,
endpoint=endpoint,
method=method,
payload=body,
)
except Exception as exc: # noqa: BLE001 # service already logs internals
logger.exception('Custom broker request failed: %s', exc)
return _build_error_response('An unexpected error occurred', 500)

return make_response(data, 200)
6 changes: 6 additions & 0 deletions restx_api/data_schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,9 @@ class OptionGreeksSchema(Schema):
underlying_symbol = fields.Str(required=False) # Optional: Specify underlying symbol (e.g., NIFTY or NIFTY28NOV24FUT)
underlying_exchange = fields.Str(required=False) # Optional: Specify underlying exchange (NSE_INDEX, NFO, etc.)
expiry_time = fields.Str(required=False) # Optional: Custom expiry time in HH:MM format (e.g., "15:30", "19:00"). If not provided, uses exchange defaults

class CustomBrokerSchema(Schema):
apikey = fields.Str(required=True)
endpoint = fields.Str(required=True)
method = fields.Str(required=False, default="GET")
payload = fields.Raw(required=False, default=None)
58 changes: 58 additions & 0 deletions services/broker_custom_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
"""
Broker-specific custom API service.

Provides a thin wrapper to dynamically invoke broker modules
that expose a `get_api_response` function.
"""
from __future__ import annotations

import importlib
from typing import Any

from utils.logging import get_logger

logger = get_logger(__name__)

def import_broker_module(broker_name: str) -> Any | None:
"""Dynamically import the broker-specific order API module.

Args:
broker_name: Name of the broker (e.g., 'zerodha', 'angel').

Returns:
The imported module or None if the import fails.
"""
module_path = f"broker.{broker_name}.api.api_response"
try:
return importlib.import_module(module_path)
except ImportError as exc:
logger.error("Error importing broker module '%s': %s", module_path, exc)
return None

def execute_custom(
endpoint: str,
auth_token: str,
broker_name: str,
method: str = "POST",
payload: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Execute a custom API request through the specified broker module.

Args:
endpoint: API endpoint (e.g., "/orders").
auth_token: Authentication token for the broker API.
broker_name: Name of the broker.
method: HTTP method ("GET" or "POST").
payload: Optional request payload.

Returns:
Response data dict. On broker module failure returns
{'status': 'error', 'message': '<reason>'}.
"""
broker_module = import_broker_module(broker_name)
if broker_module is None:
return {
"status": "error",
"message": f"Broker module for {broker_name} not found",
}
return broker_module.get_api_response(endpoint, auth_token, method, payload or {})