From 7731007b6916972fc9dc9c070a2eb1cdea939a40 Mon Sep 17 00:00:00 2001 From: Evgeny Smyshlyaev Date: Mon, 21 Jan 2019 06:22:12 +0300 Subject: [PATCH] Segments from data puller not in db #350 (#351) * improve logging * set constant window size * fix * improve logging, save detected segments from push\pull process * fix * fix * fix according review * save segemnts, improve bucket managing in analytics --- .../analytics/detectors/pattern_detector.py | 16 +++++----------- server/src/controllers/analytics_controller.ts | 12 +++++++++--- server/src/services/data_puller.ts | 2 -- 3 files changed, 14 insertions(+), 16 deletions(-) diff --git a/analytics/analytics/detectors/pattern_detector.py b/analytics/analytics/detectors/pattern_detector.py index e328a25..844cb0a 100644 --- a/analytics/analytics/detectors/pattern_detector.py +++ b/analytics/analytics/detectors/pattern_detector.py @@ -36,9 +36,9 @@ class PatternDetector(Detector): self.analytic_unit_id = analytic_unit_id self.pattern_type = pattern_type self.model = resolve_model_by_pattern(self.pattern_type) - self.window_size = 150 + self.max_window_size = 150 + self.window_size = 0 self.bucket = DataBucket() - self.bucket_full_reported = False def train(self, dataframe: pd.DataFrame, segments: list, cache: Optional[models.ModelCache]) -> models.ModelCache: # TODO: pass only part of dataframe that has segments @@ -66,19 +66,13 @@ class PatternDetector(Detector): def recieve_data(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> Optional[dict]: self.bucket.receive_data(data.dropna()) + if cache and self.window_size == 0: + self.window_size = cache['WINDOW_SIZE'] if len(self.bucket.data) >= self.window_size and cache != None: - if not self.bucket_full_reported: - logging.debug('{} unit`s bucket full, run detect'.format(self.analytic_unit_id)) - self.bucket_full_reported = True - res = self.detect(self.bucket.data, cache) - - excess_data = len(self.bucket.data) - self.window_size + excess_data = len(self.bucket.data) - self.max_window_size self.bucket.drop_data(excess_data) return res - else: - filling = len(self.bucket.data)*100 / self.window_size - logging.debug('bucket for {} {}% full'.format(self.analytic_unit_id, filling)) return None diff --git a/server/src/controllers/analytics_controller.ts b/server/src/controllers/analytics_controller.ts index 1cfce9b..8bff203 100644 --- a/server/src/controllers/analytics_controller.ts +++ b/server/src/controllers/analytics_controller.ts @@ -44,8 +44,14 @@ function onTaskResult(taskResult: TaskResult) { } } -function onDetect(detectionResult: DetectionResult) { - processDetectionResult(detectionResult.analyticUnitId, detectionResult); +async function onDetect(detectionResult: DetectionResult) { + let id = detectionResult.analyticUnitId; + let payload = await processDetectionResult(id, detectionResult); + await Promise.all([ + Segment.insertSegments(payload.segments), + AnalyticUnitCache.setData(id, payload.cache), + AnalyticUnit.setDetectionTime(id, payload.lastDetectionTime), + ]); } async function onMessage(message: AnalyticsMessage) { @@ -58,7 +64,7 @@ async function onMessage(message: AnalyticsMessage) { } if(message.method === AnalyticsMessageMethod.DETECT) { - onDetect(message.payload.payload); + await onDetect(message.payload.payload); methodResolved = true; } diff --git a/server/src/services/data_puller.ts b/server/src/services/data_puller.ts index f46d750..3e20eed 100644 --- a/server/src/services/data_puller.ts +++ b/server/src/services/data_puller.ts @@ -43,9 +43,7 @@ export class DataPuller { panelUrl = unit.panelUrl; } - let startTime = Date.now(); let data = queryByMetric(unit.metric, panelUrl, from, to, HASTIC_API_KEY); - console.log(`data puller: query took ${Date.now() - startTime}ms for unit id ${unit.id}`); return data; }