diff --git a/app/default.env b/app/default.env deleted file mode 100644 index 6bbb56a..0000000 --- a/app/default.env +++ /dev/null @@ -1,11 +0,0 @@ -BITFINEX_API_URL=https://api.bitfinex.com/ -BITMEX_API_URL=https://www.bitmex.com/api/v1/ -BITTREX_API_URL=https://bittrex.com/api/ -GDAX_API_URL=https://api.gdax.com/ -GEMINI_API_URL=https://api.gemini.com/v1/ -KRAKEN_API_URL=https://api.kraken.com/ -POLONIEX_API_URL=https://poloniex.com/ -OKCOIN_API_URL=https://www.okcoin.com/api/ -ELASTICSEARCH_CONNECT_STRING=elasticsearch:9200 -TRACKER_LOG_LEVEL=DEBUG -INITIAL_SLEEP=20 diff --git a/app/exchange_harness.py b/app/exchange_harness.py new file mode 100644 index 0000000..eca53fc --- /dev/null +++ b/app/exchange_harness.py @@ -0,0 +1,57 @@ +#!/usr/bin/env python2 +# -*- coding: utf-8 -*- +from json import loads +from datetime import datetime +import settings +import logging +import requests +import utils +import ccxt +from time import sleep +class ExchangeHarness(object): + """Poloniex Market Data""" + def __init__(self,exchange_id): + self.symbols = ['BTC/USD','ETH/USD'] + self.exchange_id = exchange_id.lower() + self.products = {'ETH/USD': 'eth.{}.ticker'.format(self.exchange_id), + 'BTC/USD': 'btc.{}.ticker'.format(self.exchange_id)} + self.exchange = getattr(ccxt,self.exchange_id)({ + 'enableRateLimit': True, # this option enables the built-in rate limiter + }) + # self.markets = self.exchange.load_markets() + def clean_ticker(self,data): + clean_data = dict() + now = datetime.utcnow() + clean_data['tracker_time'] = now + clean_data["ask"] = float(data["ask"]) + clean_data["bid"] = float(data["bid"]) + clean_data["price"] = float(data["last"]) + clean_data["exchange"] = self.exchange_id + clean_data["product"] = data['symbol'] + clean_data['info'] = data['info'] + clean_data["size"] = float(data['baseVolume']) + clean_data["volume"] = float(data['quoteVolume']) + clean_data['time'] = data['timestamp'] + for k,v in data.items(): + if k not in clean_data.keys(): + clean_data[k]=v + # if not clean_data["last"]: + # clean_data['last'] = + return clean_data + + def get_ticker(self,symbol): + ticker = self.exchange.fetch_ticker(symbol) + clean = self.clean_ticker(ticker) + return clean + + def record_ticker(self, es): + """Record current tick""" + for product in self.products.keys(): + es_body=self.get_ticker(product) + # print(es_body) + # if 'price' in es_body: + try: + es.create(index=self.products[product], id=utils.generate_nonce(), doc_type='ticker', body=es_body) + except: + raise ValueError("Misformed Body for Elastic Search on " + self.exchange_id) + diff --git a/app/requirements.txt b/app/requirements.txt index 315b2e0..ab43807 100644 --- a/app/requirements.txt +++ b/app/requirements.txt @@ -16,3 +16,5 @@ six==1.10.0 urllib3==1.21.1 websocket==0.2.1 websocket-client==0.40.0 +futures +ccxt \ No newline at end of file diff --git a/app/tracker.py b/app/tracker.py index a5a8b23..36283e0 100644 --- a/app/tracker.py +++ b/app/tracker.py @@ -1,58 +1,52 @@ #!/usr/bin/env python2 # -*- coding: utf-8 -*- -""" -@authors: dconroy - avelkoski -""" from elasticsearch import Elasticsearch, helpers -from public.bitfinex import BitFinex_Market -from public.bitmex import BitMex_Market -from public.bittrex import BitTrex_Market -from public.gdax import GDAX_Market -from public.gemini import Gemini_Market -from public.kraken import Kraken_Market -from public.okcoin import OKCoin_Market -from public.poloniex import Poloniex_Market -from dotenv import Dotenv -from time import sleep +from exchange_harness import ExchangeHarness import logging -import schedule +# import schedule import settings import utils import random import time +from concurrent.futures import ThreadPoolExecutor +from time import sleep def main(): logging.basicConfig(format='%(levelname)s:%(asctime)s %(message)s',level=settings.LOGLEVEL) es = Elasticsearch(settings.ELASTICSEARCH_CONNECT_STRING) logging.info('Market Refresh Rate: ' + str(settings.MARKET_REFRESH_RATE) + ' seconds.') - logging.info('Initial Sleep: ' + str(settings.INITIAL_SLEEP) + ' seconds.') + logging.info('Initial Sleep: ' + str(5) + ' seconds.') + - sleep(settings.INITIAL_SLEEP) logging.info('Application Started.') #supported_exchanges = [BitFinex_Market(), BitMex_Market(), BitTrex_Market(), GDAX_Market(), Gemini_Market(), Kraken_Market(), OKCoin_Market(), Poloniex_Market()] - exchanges = [BitFinex_Market(), BitMex_Market(), BitTrex_Market(), GDAX_Market(), Gemini_Market(), Kraken_Market(), OKCoin_Market(), Poloniex_Market()] + tmp = ['bitstamp', 'gdax', 'kraken', 'gemini'] + exchanges = [ExchangeHarness(x) for x in tmp] #print active exchanges and create indexes in kibana based on products listed in each market for exchange in exchanges: - logging.info(exchange.exchange + ': activated and indexed.') - for product, kibana_index in exchange.products.iteritems(): + logging.info(exchange.exchange_id + ': activated and indexed.') + for product, kibana_index in exchange.products.items(): utils.create_index(es, kibana_index) logging.warn('Initiating Market Tracking.') #Record Ticks while True: - sleep(settings.MARKET_REFRESH_RATE) - try: - for exchange in exchanges: - exchange.record_ticker(es) + with ThreadPoolExecutor(max_workers=5) as executor: + try: + sleep(settings.MARKET_REFRESH_RATE) + executor.map(lambda ex: ex.record_ticker(es), exchanges) + logging.info("added another ticker record") + except Exception as e: + logging.warning(e) + sleep(settings.RETRY_RATE) + + - except Exception as e: - logging.warning(e) - sleep(settings.RETRY_RATE) if __name__ == '__main__': + main()