diff --git a/clickhouse.py b/clickhouse.py index 793d577..b278a8d 100644 --- a/clickhouse.py +++ b/clickhouse.py @@ -88,7 +88,7 @@ def get_ch_field_name(field_name): prefixes = ['ym:s:', 'ym:pv:'] for prefix in prefixes: field_name = field_name.replace(prefix, '') - return field_name[0].upper() + field_name[1:] + return field_name[0] + field_name[1:] def drop_table(source): @@ -111,27 +111,33 @@ def create_table(source, fields): table_name = get_source_table_name(source) if source == 'hits': if ('ym:pv:date' in fields) and ('ym:pv:clientID' in fields): - engine = 'MergeTree(Date, intHash32(ClientID), (Date, intHash32(ClientID)), 8192)' + engine = 'MergeTree(date, intHash32(clientID), (date, intHash32(clientID)), 8192)' else: engine = 'Log' if source == 'visits': if ('ym:s:date' in fields) and ('ym:s:clientID' in fields): - engine = 'MergeTree(Date, intHash32(ClientID), (Date, intHash32(ClientID)), 8192)' + engine = 'MergeTree(date, intHash32(clientID), (date, intHash32(clientID)), 8192)' else: engine = 'Log' - ch_field_types = utils.get_ch_fields_config() - ch_fields = map(get_ch_field_name, fields) - - for i in range(len(fields)): - field_statements.append(field_tmpl.format(name= ch_fields[i], + ch_field_types = utils.get_ch_fields_type_config() + ch_fields_config = utils.get_ch_fields_config() + ch_fields = ch_fields_config['{source}_fields'.format(source=source)] + + for item in ch_fields: + fields += (item,) + + ch_fields_map = map(get_ch_field_name, fields) + + for i in range(len(ch_fields_map)): + field_statements.append(field_tmpl.format(name= ch_fields_map[i], type=ch_field_types[fields[i]])) - field_statements = sorted(field_statements) + # field_statements = sorted(field_statements) query = tmpl.format(table_name=table_name, engine=engine, - fields=',\n'.join(sorted(field_statements))) + fields=',\n'.join(field_statements)) get_clickhouse_data(query) diff --git a/configs/ch_fields.json b/configs/ch_fields.json new file mode 100644 index 0000000..6200fc0 --- /dev/null +++ b/configs/ch_fields.json @@ -0,0 +1,39 @@ +{ + "visits_fields": [ + "ym:s:region", + "ym:s:cityName", + "ym:s:microdistrict", + "ym:s:district", + "ym:s:street", + "ym:s:from", + "ym:s:fromBlock" + ], + + "hits_fields": [ + "ym:pv:app", + "ym:pv:hitType", + "ym:pv:offerID", + "ym:pv:region", + "ym:pv:cityName", + "ym:pv:microdistrict", + "ym:pv:district", + "ym:pv:street", + "ym:pv:page", + "ym:pv:from", + "ym:pv:fromBlock", + "ym:pv:rubric", + "ym:pv:realtyType", + "ym:pv:dealType", + "ym:pv:special", + "ym:pv:premium", + "ym:pv:top", + "ym:pv:highlight", + "ym:pv:autoraiseX2", + "ym:pv:autoraiseX4", + "ym:pv:autoraiseX8", + "ym:pv:price", + "ym:pv:numberRooms", + "ym:pv:ownerOffer", + "ym:pv:phone" + ] +} \ No newline at end of file diff --git a/configs/ch_types.json b/configs/ch_types.json index 6d934e1..d0642db 100644 --- a/configs/ch_types.json +++ b/configs/ch_types.json @@ -10,7 +10,7 @@ "ym:s:visitDuration": "UInt32", "ym:s:bounce": "UInt8", "ym:s:ipAddress": "String", - "ym:s:params": "Array(String)", + "ym:s:params": "String", "ym:s:goalsID": "Array(UInt32)", "ym:s:goalsSerialNumber": "Array(UInt32)", "ym:s:goalsDateTime": "Array(DateTime)", @@ -35,7 +35,6 @@ "ym:s:lastDirectSearchPhrase": "String", "ym:s:lastDirectConditionType": "String", "ym:s:lastCurrencyID": "String", - "ym:s:from": "String", "ym:s:UTMCampaign": "String", "ym:s:UTMContent": "String", "ym:s:UTMMedium": "String", @@ -46,6 +45,8 @@ "ym:s:openstatService": "String", "ym:s:openstatSource": "String", "ym:s:hasGCLID": "UInt8", + "ym:s:region": "String", + "ym:s:cityName": "String", "ym:s:regionCountry": "String", "ym:s:regionCity": "String", "ym:s:browserLanguage": "String", @@ -150,7 +151,6 @@ "ym:pv:deviceCategory": "String", "ym:pv:flashMajor": "UInt8", "ym:pv:flashMinor": "UInt8", - "ym:pv:from": "String", "ym:pv:hasGCLID": "UInt8", "ym:pv:ipAddress": "String", "ym:pv:javascriptEnabled": "UInt8", @@ -194,5 +194,31 @@ "ym:pv:shareURL": "String", "ym:pv:shareTitle": "String", "ym:pv:iFrame": "UInt8", - "ym:pv:date": "Date" + "ym:pv:date": "Date", + + "ym:pv:app": "String", + "ym:pv:hitType": "String", + "ym:pv:offerID": "String", + "ym:pv:region": "String", + "ym:pv:cityName": "String", + "ym:pv:microdistrict": "String", + "ym:pv:district": "String", + "ym:pv:street": "String", + "ym:pv:page": "String", + "ym:pv:from": "String", + "ym:pv:fromBlock": "String", + "ym:pv:rubric": "String", + "ym:pv:realtyType": "String", + "ym:pv:dealType": "String", + "ym:pv:price": "UInt32", + "ym:pv:numberRooms": "String", + "ym:pv:ownerOffer": "String", + "ym:pv:special": "String", + "ym:pv:premium": "String", + "ym:pv:highlight": "String", + "ym:pv:autoraiseX2": "String", + "ym:pv:autoraiseX4": "String", + "ym:pv:autoraiseX8": "String", + "ym:pv:top": "String", + "ym:pv:phone": "String" } diff --git a/logs_api.py b/logs_api.py index df48bf6..943fb70 100644 --- a/logs_api.py +++ b/logs_api.py @@ -1,11 +1,13 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- import requests import urllib import json import utils -import StringIO import clickhouse import datetime import logging +import parsing_params logger = logging.getLogger('logs_api') @@ -158,6 +160,43 @@ def save_data(api_request, part): if num_filtered != 0: logger.warning('%d rows were filtered out' % num_filtered) + #Get additional fields for clickHouse + ch_fields_config = utils.get_ch_fields_config() + ch_fields = ch_fields_config['{source}_fields'.format(source=api_request.user_request.source)] + prefix = 'ym:pv:' + if api_request.user_request.source == 'visits': + prefix = 'ym:s:' + + #adds additional fields to the end + if len(ch_fields) > 0: + splitted_text_filtered[0] += '\t' + '\t'.join(ch_fields) + + headers = splitted_text[0].split('\t') + + if prefix + 'params' in headers and prefix + 'URL' in headers: + params_index = headers.index(prefix + 'params') + + url_index = headers.index(prefix + 'URL') + + #parse the params + i = 1 + while i < len(splitted_text_filtered): + value = splitted_text_filtered[i].split('\t') + + for field in ch_fields: + splitted_text_filtered[i] += "\t" + params_json = clear_json(value[params_index]) + url = clear_json(value[url_index]) + if not is_json(params_json): + continue + params = json.loads(params_json) + if len(params) > 0: + if type(params) is list: + params = params[0] + data = parsing_params.get_data_from_params(prefix, params, field, url) + splitted_text_filtered[i] += unicode(data) + i += 1 + output_data = '\n'.join(splitted_text_filtered).encode('utf-8') output_data = output_data.replace(r"\'", "'") # to correct escapes in params @@ -167,6 +206,26 @@ def save_data(api_request, part): api_request.status = 'saved' +#Removes duplicate characters +def clear_json(str): + return str.encode('utf8') \ + .replace("'", "") \ + .replace('""', '"') \ + .replace('"{', "{") \ + .replace('}"', '}') \ + .replace("\ ", "") \ + .replace('\\"', "\"") \ + .replace('"[', "[") \ + .replace(']"', ']') + +#checks whether the JSON string is +def is_json(myjson): + try: + json.loads(myjson) + except ValueError, e: + return False + return True + def clean_data(api_request): '''Cleans generated data on server''' url = '{host}/management/v1/counter/{counter_id}/logrequest/{request_id}/clean?oauth_token={token}' \ diff --git a/parsing_params.py b/parsing_params.py new file mode 100644 index 0000000..4c6120e --- /dev/null +++ b/parsing_params.py @@ -0,0 +1,109 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +from urlparse import urlparse +import logging + +def get_data_from_params(prefix, params, field, url): + logger = logging.getLogger('logs_api') + try: + action = params.keys()[0] + + if field == prefix + 'hitType': + return action + + if field == prefix + 'app' and params[action].has_key('app'): + return params[action]['app'] + + if field == prefix + 'offerID': + return get_offerId_from_url(url) + + if params[action].has_key('region'): + + if type(params[action]['region']) is dict: + region = params[action]['region'].keys()[0] + city = params[action]['region'][region].keys()[0] + else: + region = params[action]['region'] + city = None + + if field == prefix + 'region': + return region + + if city != None: + if field == prefix + 'cityName': + return city + if field == prefix + 'microdistrict' and params[action]['region'][region][city].has_key(u'Микрорайон'): + return params[action]['region'][region][city][u'Микрорайон'] + if field == prefix + 'district'and params[action]['region'][region][city].has_key(u'Район'): + return params[action]['region'][region][city][u'Район'] + if field == prefix + 'street' and params[action]['region'][region][city].has_key(u'Улица'): + return params[action]['region'][region][city][u'Улица'] + + if params[action].has_key('from'): + from_p = params[action]['from'].keys()[0] if type(params[action]['from']) is dict else params[action]['from'] + + if field == prefix + 'from': + return from_p + if field == prefix + 'fromBlock' and type(params[action]['from']) is dict: + return params[action]['from'][from_p] + + if field == prefix + 'page' and params[action].has_key('page'): + return params[action]['page'] + + if params[action].has_key('rubric'): + rubric = params[action]['rubric'].keys()[0] if type(params[action]['rubric']) is dict else params[action]['rubric'] + + if field == prefix + 'rubric': + return rubric + if field == prefix + 'realtyType' and rubric == 'flats': + return params[action]['rubric'][rubric] + + if field == prefix + 'dealType' and params[action].has_key('dealType'): + return params[action]['dealType'] + + if field == prefix + 'price' and params[action].has_key('price') and params[action]['price'] != 0: + return params[action]['price'] + + if field == prefix + 'numberRooms' and params[action].has_key('numberRooms'): + return params[action]['numberRooms'] + + if field == prefix + 'ownerOffer' and params[action].has_key('ownerOffer'): + return params[action]['ownerOffer'] + + offer_services = params[action]['offerServices'] if params[action].has_key('offerServices') else {} + + if field == prefix + 'special': + return 'true' if offer_services.has_key('special') else 'false' + + if field == prefix + 'premium': + return 'true' if offer_services.has_key('premium') else 'false' + + if field == prefix + 'highlight': + return 'true' if offer_services.has_key('highlight') else 'false' + + if field == prefix + 'autoraiseX2': + return 'true' if offer_services.has_key('autoraiseX2') else 'false' + + if field == prefix + 'autoraiseX4': + return 'true' if offer_services.has_key('autoraiseX4') else 'false' + + if field == prefix + 'autoraiseX8': + return 'true' if offer_services.has_key('autoraiseX8') else 'false' + + if field == prefix + 'top': + return 'true' if offer_services.has_key('top') else 'false' + + if field == prefix + 'phone' and params[action].has_key('phone'): + return params[action]['phone'] + + except TypeError, e: + logger.warning('%s the nesting structure does not match' % field) + + return '' + +def get_offerId_from_url(url): + url_params = urlparse(url) + path = url_params.path.split('/') + if len(path) > 1 and path[1] == 'view': + return path[2] + return '' diff --git a/utils.py b/utils.py index 75f6d1b..8d27660 100644 --- a/utils.py +++ b/utils.py @@ -69,9 +69,15 @@ def get_config(): return config -def get_ch_fields_config(): +def get_ch_fields_type_config(): '''Returns config for ClickHouse columns\'s datatypes''' with open('./configs/ch_types.json') as input_file: ch_field_types = json.loads(input_file.read()) return ch_field_types +def get_ch_fields_config(): + '''Returns config for ClickHouse columns\'s datatypes''' + with open('./configs/ch_fields.json') as input_file: + ch_field_types = json.loads(input_file.read()) + return ch_field_types +