Browse Source

Segments from data puller not in db #350 (#351)

* improve logging

* set constant window size

* fix

* improve logging, save detected segments from push\pull process

* fix

* fix

* fix according review

* save segemnts, improve bucket managing in analytics
pull/1/head
Evgeny Smyshlyaev 6 years ago committed by Alexey Velikiy
parent
commit
7731007b69
  1. 16
      analytics/analytics/detectors/pattern_detector.py
  2. 12
      server/src/controllers/analytics_controller.ts
  3. 2
      server/src/services/data_puller.ts

16
analytics/analytics/detectors/pattern_detector.py

@ -36,9 +36,9 @@ class PatternDetector(Detector):
self.analytic_unit_id = analytic_unit_id self.analytic_unit_id = analytic_unit_id
self.pattern_type = pattern_type self.pattern_type = pattern_type
self.model = resolve_model_by_pattern(self.pattern_type) self.model = resolve_model_by_pattern(self.pattern_type)
self.window_size = 150 self.max_window_size = 150
self.window_size = 0
self.bucket = DataBucket() self.bucket = DataBucket()
self.bucket_full_reported = False
def train(self, dataframe: pd.DataFrame, segments: list, cache: Optional[models.ModelCache]) -> models.ModelCache: def train(self, dataframe: pd.DataFrame, segments: list, cache: Optional[models.ModelCache]) -> models.ModelCache:
# TODO: pass only part of dataframe that has segments # TODO: pass only part of dataframe that has segments
@ -66,19 +66,13 @@ class PatternDetector(Detector):
def recieve_data(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> Optional[dict]: def recieve_data(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> Optional[dict]:
self.bucket.receive_data(data.dropna()) self.bucket.receive_data(data.dropna())
if cache and self.window_size == 0:
self.window_size = cache['WINDOW_SIZE']
if len(self.bucket.data) >= self.window_size and cache != None: if len(self.bucket.data) >= self.window_size and cache != None:
if not self.bucket_full_reported:
logging.debug('{} unit`s bucket full, run detect'.format(self.analytic_unit_id))
self.bucket_full_reported = True
res = self.detect(self.bucket.data, cache) res = self.detect(self.bucket.data, cache)
excess_data = len(self.bucket.data) - self.max_window_size
excess_data = len(self.bucket.data) - self.window_size
self.bucket.drop_data(excess_data) self.bucket.drop_data(excess_data)
return res return res
else:
filling = len(self.bucket.data)*100 / self.window_size
logging.debug('bucket for {} {}% full'.format(self.analytic_unit_id, filling))
return None return None

12
server/src/controllers/analytics_controller.ts

@ -44,8 +44,14 @@ function onTaskResult(taskResult: TaskResult) {
} }
} }
function onDetect(detectionResult: DetectionResult) { async function onDetect(detectionResult: DetectionResult) {
processDetectionResult(detectionResult.analyticUnitId, detectionResult); let id = detectionResult.analyticUnitId;
let payload = await processDetectionResult(id, detectionResult);
await Promise.all([
Segment.insertSegments(payload.segments),
AnalyticUnitCache.setData(id, payload.cache),
AnalyticUnit.setDetectionTime(id, payload.lastDetectionTime),
]);
} }
async function onMessage(message: AnalyticsMessage) { async function onMessage(message: AnalyticsMessage) {
@ -58,7 +64,7 @@ async function onMessage(message: AnalyticsMessage) {
} }
if(message.method === AnalyticsMessageMethod.DETECT) { if(message.method === AnalyticsMessageMethod.DETECT) {
onDetect(message.payload.payload); await onDetect(message.payload.payload);
methodResolved = true; methodResolved = true;
} }

2
server/src/services/data_puller.ts

@ -43,9 +43,7 @@ export class DataPuller {
panelUrl = unit.panelUrl; panelUrl = unit.panelUrl;
} }
let startTime = Date.now();
let data = queryByMetric(unit.metric, panelUrl, from, to, HASTIC_API_KEY); let data = queryByMetric(unit.metric, panelUrl, from, to, HASTIC_API_KEY);
console.log(`data puller: query took ${Date.now() - startTime}ms for unit id ${unit.id}`);
return data; return data;
} }

Loading…
Cancel
Save