Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 16 additions & 10 deletions clickhouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def get_ch_field_name(field_name):
prefixes = ['ym:s:', 'ym:pv:']
for prefix in prefixes:
field_name = field_name.replace(prefix, '')
return field_name[0].upper() + field_name[1:]
return field_name[0] + field_name[1:]


def drop_table(source):
Expand All @@ -111,27 +111,33 @@ def create_table(source, fields):
table_name = get_source_table_name(source)
if source == 'hits':
if ('ym:pv:date' in fields) and ('ym:pv:clientID' in fields):
engine = 'MergeTree(Date, intHash32(ClientID), (Date, intHash32(ClientID)), 8192)'
engine = 'MergeTree(date, intHash32(clientID), (date, intHash32(clientID)), 8192)'
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Почему названия полей стали с маленькой буквы? Вроде в кликхаусе принято с большой

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Максим попросил сделать с маленькой, ему так проще сказал

else:
engine = 'Log'

if source == 'visits':
if ('ym:s:date' in fields) and ('ym:s:clientID' in fields):
engine = 'MergeTree(Date, intHash32(ClientID), (Date, intHash32(ClientID)), 8192)'
engine = 'MergeTree(date, intHash32(clientID), (date, intHash32(clientID)), 8192)'
else:
engine = 'Log'

ch_field_types = utils.get_ch_fields_config()
ch_fields = map(get_ch_field_name, fields)

for i in range(len(fields)):
field_statements.append(field_tmpl.format(name= ch_fields[i],
ch_field_types = utils.get_ch_fields_type_config()
ch_fields_config = utils.get_ch_fields_config()
ch_fields = ch_fields_config['{source}_fields'.format(source=source)]

for item in ch_fields:
fields += (item,)

ch_fields_map = map(get_ch_field_name, fields)

for i in range(len(ch_fields_map)):
field_statements.append(field_tmpl.format(name= ch_fields_map[i],
type=ch_field_types[fields[i]]))

field_statements = sorted(field_statements)
# field_statements = sorted(field_statements)
query = tmpl.format(table_name=table_name,
engine=engine,
fields=',\n'.join(sorted(field_statements)))
fields=',\n'.join(field_statements))

get_clickhouse_data(query)

Expand Down
39 changes: 39 additions & 0 deletions configs/ch_fields.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
{
"visits_fields": [
"ym:s:region",
"ym:s:cityName",
"ym:s:microdistrict",
"ym:s:district",
"ym:s:street",
"ym:s:from",
"ym:s:fromBlock"
],

"hits_fields": [
"ym:pv:app",
"ym:pv:hitType",
"ym:pv:offerID",
"ym:pv:region",
"ym:pv:cityName",
"ym:pv:microdistrict",
"ym:pv:district",
"ym:pv:street",
"ym:pv:page",
"ym:pv:from",
"ym:pv:fromBlock",
"ym:pv:rubric",
"ym:pv:realtyType",
"ym:pv:dealType",
"ym:pv:special",
"ym:pv:premium",
"ym:pv:top",
"ym:pv:highlight",
"ym:pv:autoraiseX2",
"ym:pv:autoraiseX4",
"ym:pv:autoraiseX8",
"ym:pv:price",
"ym:pv:numberRooms",
"ym:pv:ownerOffer",
"ym:pv:phone"
]
}
34 changes: 30 additions & 4 deletions configs/ch_types.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
"ym:s:visitDuration": "UInt32",
"ym:s:bounce": "UInt8",
"ym:s:ipAddress": "String",
"ym:s:params": "Array(String)",
"ym:s:params": "String",
"ym:s:goalsID": "Array(UInt32)",
"ym:s:goalsSerialNumber": "Array(UInt32)",
"ym:s:goalsDateTime": "Array(DateTime)",
Expand All @@ -35,7 +35,6 @@
"ym:s:lastDirectSearchPhrase": "String",
"ym:s:lastDirectConditionType": "String",
"ym:s:lastCurrencyID": "String",
"ym:s:from": "String",
"ym:s:UTMCampaign": "String",
"ym:s:UTMContent": "String",
"ym:s:UTMMedium": "String",
Expand All @@ -46,6 +45,8 @@
"ym:s:openstatService": "String",
"ym:s:openstatSource": "String",
"ym:s:hasGCLID": "UInt8",
"ym:s:region": "String",
"ym:s:cityName": "String",
"ym:s:regionCountry": "String",
"ym:s:regionCity": "String",
"ym:s:browserLanguage": "String",
Expand Down Expand Up @@ -150,7 +151,6 @@
"ym:pv:deviceCategory": "String",
"ym:pv:flashMajor": "UInt8",
"ym:pv:flashMinor": "UInt8",
"ym:pv:from": "String",
"ym:pv:hasGCLID": "UInt8",
"ym:pv:ipAddress": "String",
"ym:pv:javascriptEnabled": "UInt8",
Expand Down Expand Up @@ -194,5 +194,31 @@
"ym:pv:shareURL": "String",
"ym:pv:shareTitle": "String",
"ym:pv:iFrame": "UInt8",
"ym:pv:date": "Date"
"ym:pv:date": "Date",

"ym:pv:app": "String",
"ym:pv:hitType": "String",
"ym:pv:offerID": "String",
"ym:pv:region": "String",
"ym:pv:cityName": "String",
"ym:pv:microdistrict": "String",
"ym:pv:district": "String",
"ym:pv:street": "String",
"ym:pv:page": "String",
"ym:pv:from": "String",
"ym:pv:fromBlock": "String",
"ym:pv:rubric": "String",
"ym:pv:realtyType": "String",
"ym:pv:dealType": "String",
"ym:pv:price": "UInt32",
"ym:pv:numberRooms": "String",
"ym:pv:ownerOffer": "String",
"ym:pv:special": "String",
"ym:pv:premium": "String",
"ym:pv:highlight": "String",
"ym:pv:autoraiseX2": "String",
"ym:pv:autoraiseX4": "String",
"ym:pv:autoraiseX8": "String",
"ym:pv:top": "String",
"ym:pv:phone": "String"
}
61 changes: 60 additions & 1 deletion logs_api.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import requests
import urllib
import json
import utils
import StringIO
import clickhouse
import datetime
import logging
import parsing_params


logger = logging.getLogger('logs_api')
Expand Down Expand Up @@ -158,6 +160,43 @@ def save_data(api_request, part):
if num_filtered != 0:
logger.warning('%d rows were filtered out' % num_filtered)

#Get additional fields for clickHouse
ch_fields_config = utils.get_ch_fields_config()
ch_fields = ch_fields_config['{source}_fields'.format(source=api_request.user_request.source)]
prefix = 'ym:pv:'
if api_request.user_request.source == 'visits':
prefix = 'ym:s:'

#adds additional fields to the end
if len(ch_fields) > 0:
splitted_text_filtered[0] += '\t' + '\t'.join(ch_fields)

headers = splitted_text[0].split('\t')

if prefix + 'params' in headers and prefix + 'URL' in headers:
params_index = headers.index(prefix + 'params')

url_index = headers.index(prefix + 'URL')

#parse the params
i = 1
while i < len(splitted_text_filtered):
value = splitted_text_filtered[i].split('\t')

for field in ch_fields:
splitted_text_filtered[i] += "\t"
params_json = clear_json(value[params_index])
url = clear_json(value[url_index])
if not is_json(params_json):
continue
params = json.loads(params_json)
if len(params) > 0:
if type(params) is list:
params = params[0]
data = parsing_params.get_data_from_params(prefix, params, field, url)
splitted_text_filtered[i] += unicode(data)
i += 1

output_data = '\n'.join(splitted_text_filtered).encode('utf-8')
output_data = output_data.replace(r"\'", "'") # to correct escapes in params

Expand All @@ -167,6 +206,26 @@ def save_data(api_request, part):

api_request.status = 'saved'

#Removes duplicate characters
def clear_json(str):
return str.encode('utf8') \
.replace("'", "") \
.replace('""', '"') \
.replace('"{', "{") \
.replace('}"', '}') \
.replace("\ ", "") \
.replace('\\"', "\"") \
.replace('"[', "[") \
.replace(']"', ']')

#checks whether the JSON string is
def is_json(myjson):
try:
json.loads(myjson)
except ValueError, e:
return False
return True

def clean_data(api_request):
'''Cleans generated data on server'''
url = '{host}/management/v1/counter/{counter_id}/logrequest/{request_id}/clean?oauth_token={token}' \
Expand Down
109 changes: 109 additions & 0 deletions parsing_params.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from urlparse import urlparse
import logging

def get_data_from_params(prefix, params, field, url):
logger = logging.getLogger('logs_api')
try:
action = params.keys()[0]

if field == prefix + 'hitType':
return action

if field == prefix + 'app' and params[action].has_key('app'):
return params[action]['app']

if field == prefix + 'offerID':
return get_offerId_from_url(url)

if params[action].has_key('region'):

if type(params[action]['region']) is dict:
region = params[action]['region'].keys()[0]
city = params[action]['region'][region].keys()[0]
else:
region = params[action]['region']
city = None

if field == prefix + 'region':
return region

if city != None:
if field == prefix + 'cityName':
return city
if field == prefix + 'microdistrict' and params[action]['region'][region][city].has_key(u'Микрорайон'):
return params[action]['region'][region][city][u'Микрорайон']
if field == prefix + 'district'and params[action]['region'][region][city].has_key(u'Район'):
return params[action]['region'][region][city][u'Район']
if field == prefix + 'street' and params[action]['region'][region][city].has_key(u'Улица'):
return params[action]['region'][region][city][u'Улица']

if params[action].has_key('from'):
from_p = params[action]['from'].keys()[0] if type(params[action]['from']) is dict else params[action]['from']

if field == prefix + 'from':
return from_p
if field == prefix + 'fromBlock' and type(params[action]['from']) is dict:
return params[action]['from'][from_p]

if field == prefix + 'page' and params[action].has_key('page'):
return params[action]['page']

if params[action].has_key('rubric'):
rubric = params[action]['rubric'].keys()[0] if type(params[action]['rubric']) is dict else params[action]['rubric']

if field == prefix + 'rubric':
return rubric
if field == prefix + 'realtyType' and rubric == 'flats':
return params[action]['rubric'][rubric]

if field == prefix + 'dealType' and params[action].has_key('dealType'):
return params[action]['dealType']

if field == prefix + 'price' and params[action].has_key('price') and params[action]['price'] != 0:
return params[action]['price']

if field == prefix + 'numberRooms' and params[action].has_key('numberRooms'):
return params[action]['numberRooms']

if field == prefix + 'ownerOffer' and params[action].has_key('ownerOffer'):
return params[action]['ownerOffer']

offer_services = params[action]['offerServices'] if params[action].has_key('offerServices') else {}

if field == prefix + 'special':
return 'true' if offer_services.has_key('special') else 'false'

if field == prefix + 'premium':
return 'true' if offer_services.has_key('premium') else 'false'

if field == prefix + 'highlight':
return 'true' if offer_services.has_key('highlight') else 'false'

if field == prefix + 'autoraiseX2':
return 'true' if offer_services.has_key('autoraiseX2') else 'false'

if field == prefix + 'autoraiseX4':
return 'true' if offer_services.has_key('autoraiseX4') else 'false'

if field == prefix + 'autoraiseX8':
return 'true' if offer_services.has_key('autoraiseX8') else 'false'

if field == prefix + 'top':
return 'true' if offer_services.has_key('top') else 'false'

if field == prefix + 'phone' and params[action].has_key('phone'):
return params[action]['phone']

except TypeError, e:
logger.warning('%s the nesting structure does not match' % field)

return ''

def get_offerId_from_url(url):
url_params = urlparse(url)
path = url_params.path.split('/')
if len(path) > 1 and path[1] == 'view':
return path[2]
return ''
8 changes: 7 additions & 1 deletion utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,15 @@ def get_config():
return config


def get_ch_fields_config():
def get_ch_fields_type_config():
'''Returns config for ClickHouse columns\'s datatypes'''
with open('./configs/ch_types.json') as input_file:
ch_field_types = json.loads(input_file.read())
return ch_field_types

def get_ch_fields_config():
'''Returns config for ClickHouse columns\'s datatypes'''
with open('./configs/ch_fields.json') as input_file:
ch_field_types = json.loads(input_file.read())
return ch_field_types