diff --git a/beanprice/price.py b/beanprice/price.py index 38fb04a..8b6afee 100644 --- a/beanprice/price.py +++ b/beanprice/price.py @@ -30,6 +30,7 @@ from beancount.ops import find_prices from beanprice import date_utils +from beanprice.source import MissingDate import beanprice @@ -589,7 +590,11 @@ def fetch_price(dprice: DatedPrice, swap_inverted: bool = False) -> Optional[dat source = psource.module.Source() except AttributeError: continue - srcprice = fetch_cached_price(source, psource.symbol, dprice.date) + try: + srcprice = fetch_cached_price(source, psource.symbol, dprice.date) + except MissingDate: + logging.debug("Missing date {} for symbol {}".format(dprice.date, psource.symbol)) + return None if srcprice is not None: break else: diff --git a/beanprice/source.py b/beanprice/source.py index b17ffb8..1893444 100644 --- a/beanprice/source.py +++ b/beanprice/source.py @@ -31,6 +31,10 @@ ('quote_currency', Optional[str])]) +class MissingDate(BaseException): + """An attempt to read a missing date, ignore and continue""" + + class Source: """Interface to be implemented by all price sources. diff --git a/beanprice/sources/csif.py b/beanprice/sources/csif.py new file mode 100644 index 0000000..ac371a8 --- /dev/null +++ b/beanprice/sources/csif.py @@ -0,0 +1,134 @@ +"""A source fetching prices and exchangerates from https://amfunds.credit-suisse.com + +Valid tickers for prices are in the form "IBAN", such as "CH0031341875". + +Here is the API documentation: +https://www.alphavantage.co/documentation/ + +Example: + https://amfunds.credit-suisse.com/ch/de/institutional/fund/history/CH0031341875 + +Based on: https://github.com/buchen/portfolio/blob/effa5b7baf9a918e1b5fe83942ddc480e0fd48b9/name.abuchen.portfolio/src/name/abuchen/portfolio/online/impl/CSQuoteFeed.java + +""" + +from decimal import Decimal +from typing import Optional +from dateutil.tz import tz +from dateutil.parser import parse +import logging +import subprocess +from beanprice import source +from pathlib import Path + + +class CsifApiError(ValueError): + """An error from the CSIF API.""" + + +def _fetch_response(ticker): + # Download file, or was it cached? + filename = '/tmp/beanprice_csif_{}.html'.format(ticker) + path = Path(filename) + if not path.is_file(): + logging.debug('Fetching data from server for ticker {}'.format(ticker)) + # Fetch the HTML workbook, we have to use curl, see PortfolioPerformance documentation + link = 'https://amfunds.credit-suisse.com/ch/de/institutional/fund/history/{}?currency=CHF'.format(ticker) + try: + response = subprocess.check_output(['curl', '-s', link]).decode("utf-8") + except BaseException as e: + raise CsifApiError('Error connecting to server on URL {}'.format(link)) + + # Save to file for future access + with open(path, "w") as text_file: + text_file.write(response) + else: + # Read the response from text file + logging.debug('Retrieving cached data for ticker {}'.format(ticker)) + with open(path, "r") as text_file: + response = text_file.read() + + # Find first occurrence of HTML tag "IBAN" + pos = response.find('{}'.format(ticker)) + if pos < 0: + raise CsifApiError('Ticker {} not fund'.format(ticker)) + pos = pos + 4 + + # Next occurrence of "": security number + pos = pos + response[pos:].find('') + 4 + end_pos = pos + response[pos:].find('') + sec_number = response[pos:end_pos] + + # Next occurrence of "": currency + pos = pos + response[pos:].find('') + 4 + end_pos = pos + response[pos:].find('') + currency = response[pos:end_pos] + logging.debug('Ticker {} data loaded: sec. number {}, currency {}'.format( + ticker, + sec_number, + currency + )) + + return response, currency, sec_number + + +class Source(source.Source): + + def get_latest_price(self, ticker) -> Optional[source.SourcePrice]: + # Fetch data + response, currency, sec_number = _fetch_response(ticker) + + # Find first occurrence of security number + pos = response.find('{}'.format(sec_number)) + if pos < 0: + return None + pos = pos + 4 + + # Next two occurrences of HTML tags "" and "" + pos = pos + response[pos:].find('') + 4 + pos = pos + response[pos:].find('') + 4 + end_pos = pos + response[pos:].find('') + + # Parse date + date_str = response[pos:end_pos] + logging.debug('Date: {}'.format(date_str)) + date = parse(date_str).replace(tzinfo=tz.gettz('Europe/Zurich')) + + # Next occurrence of HTML tags "" and "" + pos = pos + response[pos:].find('') + 4 + end_pos = pos + response[pos:].find('') + + # Parse value + logging.debug('Price: {}'.format(response[pos:end_pos])) + price = Decimal(response[pos:end_pos]) + + logging.debug('Latest price: {} {}, {}'.format(price, currency, date_str)) + return source.SourcePrice(price, date, currency) + + def get_historical_price(self, ticker, time) -> Optional[source.SourcePrice]: + # Fetch data + response, currency, sec_number = _fetch_response(ticker) + + # Find relevant date + date_str = time.strftime("%d.%m.%Y") + find_str = '' + date_str + '' + pos = response.find(find_str) + + # Found? + if pos < 0: + # It can happen that a date is missing + raise source.MissingDate + pos = pos + 14 + + # Next occurrences of HTML tags "" and "" + pos = pos + response[pos:].find('') + 4 + end_pos = pos + response[pos:].find('') + + # Parse value + try: + price = Decimal(response[pos:end_pos]) + except BaseException as e: + raise CsifApiError('Error parsing price {} for date {}'.format(response[pos:end_pos], date_str)) + + logging.debug('Historical price: {} {}, {}'.format(price, currency, date_str)) + return source.SourcePrice(price, time, currency) diff --git a/beanprice/sources/csif_test.py b/beanprice/sources/csif_test.py new file mode 100644 index 0000000..b9053f8 --- /dev/null +++ b/beanprice/sources/csif_test.py @@ -0,0 +1,50 @@ +import unittest +from unittest import mock +import datetime +from dateutil import tz +from dateutil.parser import parse + +from decimal import Decimal +from beanprice.sources import csif +from beanprice.source import MissingDate, SourcePrice + + +def _fetch_response(contents): + """Return a context manager to patch a JSON response.""" + response = mock.Mock() + response.status_code = 404 + response.text = "" + response.json.return_value = contents + return mock.patch('subprocess.check_output', return_value=response) + + +class CsifPriceFetcher(unittest.TestCase): + def test_error_invalid_ticker(self): + with self.assertRaises(csif.CsifApiError): + csif.Source().get_latest_price('INVALID') + + def test_error_invalid_date(self): + with self.assertRaises(MissingDate): + csif.Source().get_historical_price('CH0030849712', parse("2050-01-01")) + + def test_valid_response(self): + data = '' \ + '' \ + ' CH0030849712' \ + ' 3084971' \ + ' USD ' \ + ' 21.03.2022' \ + ' 3037.75000' \ + '' + + with _fetch_response(data): + srcprice: SourcePrice = csif.Source().get_latest_price('CH0030849712') + self.assertIsInstance(srcprice, SourcePrice) + self.assertEqual(Decimal('3037.75000'), srcprice.price) + self.assertEqual('USD', srcprice.quote_currency) + self.assertEqual(datetime.datetime(2022, 3, 21, 0, 0, 0, tzinfo=tz.gettz('Europe/Zurich')), srcprice.time) + pass + + +if __name__ == '__main__': + unittest.main() diff --git a/beanprice/sources/sfd.py b/beanprice/sources/sfd.py new file mode 100644 index 0000000..cbac8b5 --- /dev/null +++ b/beanprice/sources/sfd.py @@ -0,0 +1,99 @@ +""" +A source fetching prices from https://www.swissfunddata.ch/sfdpub/investment-funds + +""" +import datetime +from decimal import Decimal +from typing import Optional, Dict +from csv import DictReader +from dateutil.tz import tz +from dateutil.parser import parse +import logging +import subprocess +from beanprice import source +from pathlib import Path + + +class SfdApiError(ValueError): + """An error from the SCIF API.""" + + +def _fetch_prices(fund_id: str) -> (Dict[str, source.SourcePrice], source.SourcePrice): + # Download file, or was it cached? + filename = '/tmp/beanprice_sfd_{}.csv'.format(fund_id) + path = Path(filename) + if not path.is_file(): + logging.debug('Fetching data from server for fund {}'.format(fund_id)) + # Fetch the HTML workbook, we have to use curl, see PortfolioPerformance documentation + link = 'https://www.swissfunddata.ch/sfdpub/en/funds/excelData/{}'.format(fund_id) + try: + response = subprocess.check_output(['curl', '-s', link]).decode("utf-8") + except BaseException as e: + raise SfdApiError('Error connecting to server on URL {}'.format(link)) + + # Save to file for future access + with open(path, "w") as text_file: + text_file.write(response) + + # Read CSV file + prices: Dict[str, source.SourcePrice] = dict() + latest_price = None + + with open(filename, 'r', encoding='utf8') as csvfile: + reader = DictReader( + csvfile, + fieldnames=[ + "date", + "ccy", + "price", + ], + delimiter=";" + ) + + # This skips the first row of the CSV file. + next(reader) + next(reader) + next(reader) + + for row in reader: + the_date = parse(row["date"]).replace(tzinfo=tz.gettz('Europe/Zurich')) + key = the_date.strftime("%Y%m%d") + latest_price = source.SourcePrice( + Decimal(row["price"].replace("'", '')), + the_date, + row["ccy"].strip() + ) + prices[key] = latest_price + + return prices, latest_price + + +class Source(source.Source): + + def get_latest_price(self, ticker) -> Optional[source.SourcePrice]: + # Fetch data + _, latest_price = _fetch_prices(ticker) + + logging.debug('Latest price: {} {}, {}'.format( + latest_price.price, + latest_price.time, + latest_price.quote_currency + )) + return latest_price + + def get_historical_price(self, ticker, time) -> Optional[source.SourcePrice]: + # Fetch data + prices, _ = _fetch_prices(ticker) + + # Find relevant date + key = time.strftime("%Y%m%d") + if key not in prices: + return None + else: + the_price = prices[key] + logging.debug('Historical price: {} {}, {}'.format( + the_price.price, + the_price.time, + the_price.quote_currency + )) + return the_price