@@ -134,42 +134,60 @@ def _inverse_scale_transform(v, a0, b0, a1, b1):
134134 return k * (b1 - a1 ) + a1
135135
136136
137- def _split_large_prediction_data (doc , signal ):
138- current_year = - 1
139- current_month = - 1
140- year_month_data = list ()
141-
142- signal_start_dt = datetime .utcfromtimestamp (signal .start_time )
143-
144- for d in doc ['data' ]:
145- dt = datetime .utcfromtimestamp (d [0 ])
146- y_idx = dt .year - signal_start_dt .year
147- m_idx = dt .month
148- index = y_idx * 12 + (m_idx - 1 )
149- if (dt .year != current_year or current_month != dt .month ):
150- if len (year_month_data ) > 0 :
151- pred_doc = {
152- 'signalrun' : doc ['signalrun' ],
153- 'attrs' : doc ['attrs' ],
154- 'index' : index ,
155- 'data' : year_month_data
156- }
157- schema .Prediction .insert (** pred_doc )
158- year_month_data = list ()
159- current_year = dt .year
160- current_month = dt .month
161-
162- year_month_data .append (d )
163-
164- # handle the last one
165- if len (year_month_data ) > 0 :
166- pred_doc = {
167- 'signalrun' : doc ['signalrun' ],
168- 'attrs' : doc ['attrs' ],
169- 'index' : index ,
170- 'data' : year_month_data
171- }
172- schema .Prediction .insert (** pred_doc )
137+ def _split_large_prediction_data (doc , signalrun ):
138+
139+ # save as gridfs
140+ kwargs = {
141+ "filename" : f'sp-{ signalrun .id } ' ,
142+ "variable" : 'prediction doc'
143+ }
144+ with g_fs .new_file (** kwargs ) as f :
145+ pickle .dump (doc , f )
146+
147+ return
148+ # test load
149+ # for grid_out in g_fs.find({'filename': f'sp-{signalrun.id}'}, no_cursor_timeout=True):
150+ # daa = pickle.loads(grid_out.read())
151+ # print(daa.keys())
152+ # grid_out_doc = g_fs.find_one({'filename': f'sp-{signalrun.id}'}, no_cursor_timeout=True)
153+ # daa = pickle.loads(grid_out_doc.read())
154+ # print(daa.keys())
155+
156+ # current_year = -1
157+ # current_month = -1
158+ # year_month_data = list()
159+
160+ # signal_start_dt = datetime.utcfromtimestamp(signalrun.signal.start_time)
161+
162+ # for d in doc['data']:
163+ # dt = datetime.utcfromtimestamp(d[0])
164+ # y_idx = dt.year - signal_start_dt.year
165+ # m_idx = dt.month
166+ # index = y_idx * 12 + (m_idx - 1)
167+ # if (dt.year != current_year or current_month != dt.month):
168+ # if len(year_month_data) > 0:
169+ # pred_doc = {
170+ # 'signalrun': doc['signalrun'],
171+ # 'attrs': doc['attrs'],
172+ # 'index': index,
173+ # 'data': year_month_data
174+ # }
175+ # schema.Prediction.insert(**pred_doc)
176+ # year_month_data = list()
177+ # current_year = dt.year
178+ # current_month = dt.month
179+
180+ # year_month_data.append(d)
181+
182+ # # handle the last one
183+ # if len(year_month_data) > 0:
184+ # pred_doc = {
185+ # 'signalrun': doc['signalrun'],
186+ # 'attrs': doc['attrs'],
187+ # 'index': index,
188+ # 'data': year_month_data
189+ # }
190+ # schema.Prediction.insert(**pred_doc)
173191
174192
175193def _update_prediction (signalrun , v , stock = False ):
@@ -288,7 +306,7 @@ def _update_prediction(signalrun, v, stock=False):
288306 'data' : data_
289307 }
290308
291- _split_large_prediction_data (doc , signalrun . signal )
309+ _split_large_prediction_data (doc , signalrun )
292310 except Exception as e :
293311 print (e )
294312
@@ -303,13 +321,15 @@ def _update_period(signalrun, v, stock=False):
303321 # optimal interval for periodical description
304322 diff = (v ['raw_index' ][1 ] - v ['raw_index' ][0 ]) / 60
305323 my_interval = 1440
306- for interval in [30 , 60 , 120 , 180 , 240 , 360 , 480 , 720 ]:
324+ for interval in [6 , 30 , 60 , 120 , 180 , 240 , 360 , 480 , 720 ]:
307325 if diff <= interval :
308326 my_interval = interval
309327 break
310328
311329 day_bin_num = 24 * 60 // my_interval
312330
331+ print (f'*update period* my_interval: { my_interval } m, day_bin_num: { day_bin_num } ' )
332+
313333 docs = []
314334 # year
315335 for y in range (year_start , year_end + 1 ):
@@ -348,11 +368,12 @@ def _update_period(signalrun, v, stock=False):
348368 schema .Period .insert_many (docs )
349369
350370
351- def _update_raw (signal , interval = 21600 , method = ['mean' ], stock = False ):
371+ def _update_raw (signal , interval = 360 , method = ['mean' ], stock = False ):
352372 # interval should be changed case by case
353373 # ses -> 360 seconds
354374 # nasa -> 4 hours
355375 # stock -> 1 day
376+ print (f'*update raw* interval: { interval } s' )
356377 X = load_signal (signal .data_location , timestamp_column = signal .timestamp_column ,
357378 value_column = signal .value_column , stock = stock )
358379
@@ -428,6 +449,9 @@ def _update_raw(signal, interval=21600, method=['mean'], stock=False):
428449
429450def update_db (fs , exp_filter = None , stock = False ):
430451
452+ global g_fs
453+ g_fs = fs
454+
431455 # get signalrun list
432456
433457 # TODO: remove utc setting, it should be always True
0 commit comments