From 6bf1114b7b6ca99b6b8b0f14c8c775db36b972ef Mon Sep 17 00:00:00 2001 From: Evgeny Smyshlyaev Date: Mon, 21 Jan 2019 02:13:57 +0300 Subject: [PATCH] Detection return empty result #347 (#348) * set constant window size * improve logging, save detected segments from push\pull process --- analytics/analytics/analytic_unit_manager.py | 6 +++--- .../analytics/detectors/pattern_detector.py | 18 +++++++++++++----- .../analytics/detectors/threshold_detector.py | 3 --- server/src/services/data_puller.ts | 2 +- 4 files changed, 17 insertions(+), 12 deletions(-) diff --git a/analytics/analytics/analytic_unit_manager.py b/analytics/analytics/analytic_unit_manager.py index 4f88a42..b2f028e 100644 --- a/analytics/analytics/analytic_unit_manager.py +++ b/analytics/analytics/analytic_unit_manager.py @@ -16,9 +16,9 @@ WORKERS_EXECUTORS = 20 AnalyticUnitId = str -def get_detector_by_type(detector_type: str, analytic_unit_type: str) -> detectors.Detector: +def get_detector_by_type(detector_type: str, analytic_unit_type: str, analytic_unit_id: AnalyticUnitId) -> detectors.Detector: if detector_type == 'pattern': - return detectors.PatternDetector(analytic_unit_type) + return detectors.PatternDetector(analytic_unit_type, analytic_unit_id) elif detector_type == 'threshold': return detectors.ThresholdDetector() @@ -52,7 +52,7 @@ class AnalyticUnitManager: if analytic_unit_id in self.analytic_workers: # TODO: check that type is the same return self.analytic_workers[analytic_unit_id] - detector = get_detector_by_type(detector_type, analytic_unit_type) + detector = get_detector_by_type(detector_type, analytic_unit_type, analytic_unit_id) worker = AnalyticUnitWorker(analytic_unit_id, detector, self.workers_executor) self.analytic_workers[analytic_unit_id] = worker return worker diff --git a/analytics/analytics/detectors/pattern_detector.py b/analytics/analytics/detectors/pattern_detector.py index bced9de..e328a25 100644 --- a/analytics/analytics/detectors/pattern_detector.py +++ b/analytics/analytics/detectors/pattern_detector.py @@ -29,14 +29,16 @@ def resolve_model_by_pattern(pattern: str) -> models.Model: return models.CustomModel() raise ValueError('Unknown pattern "%s"' % pattern) - +AnalyticUnitId = str class PatternDetector(Detector): - def __init__(self, pattern_type): + def __init__(self, pattern_type: str, analytic_unit_id: AnalyticUnitId): + self.analytic_unit_id = analytic_unit_id self.pattern_type = pattern_type self.model = resolve_model_by_pattern(self.pattern_type) - self.window_size = 100 + self.window_size = 150 self.bucket = DataBucket() + self.bucket_full_reported = False def train(self, dataframe: pd.DataFrame, segments: list, cache: Optional[models.ModelCache]) -> models.ModelCache: # TODO: pass only part of dataframe that has segments @@ -46,6 +48,7 @@ class PatternDetector(Detector): } def detect(self, dataframe: pd.DataFrame, cache: Optional[models.ModelCache]) -> dict: + logger.debug('Unit {} got {} data points for detection'.format(self.analytic_unit_id, len(dataframe))) # TODO: split and sleep (https://github.com/hastic/hastic-server/pull/124#discussion_r214085643) detected = self.model.detect(dataframe, cache) @@ -63,14 +66,19 @@ class PatternDetector(Detector): def recieve_data(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> Optional[dict]: self.bucket.receive_data(data.dropna()) - if cache != None: - self.window_size = cache['WINDOW_SIZE'] if len(self.bucket.data) >= self.window_size and cache != None: + if not self.bucket_full_reported: + logging.debug('{} unit`s bucket full, run detect'.format(self.analytic_unit_id)) + self.bucket_full_reported = True + res = self.detect(self.bucket.data, cache) excess_data = len(self.bucket.data) - self.window_size self.bucket.drop_data(excess_data) return res + else: + filling = len(self.bucket.data)*100 / self.window_size + logging.debug('bucket for {} {}% full'.format(self.analytic_unit_id, filling)) return None diff --git a/analytics/analytics/detectors/threshold_detector.py b/analytics/analytics/detectors/threshold_detector.py index f96299d..c9f8e43 100644 --- a/analytics/analytics/detectors/threshold_detector.py +++ b/analytics/analytics/detectors/threshold_detector.py @@ -17,7 +17,6 @@ class ThresholdDetector(Detector): pass def train(self, dataframe: pd.DataFrame, threshold: dict, cache: Optional[ModelCache]) -> ModelCache: - log.debug('run train for threshold detector') return { 'cache': { 'value': threshold['value'], @@ -26,7 +25,6 @@ class ThresholdDetector(Detector): } def detect(self, dataframe: pd.DataFrame, cache: Optional[ModelCache]) -> dict: - log.debug('run detect for threshold detector') value = cache['value'] condition = cache['condition'] @@ -62,5 +60,4 @@ class ThresholdDetector(Detector): } def recieve_data(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> Optional[dict]: - log.debug('threshhold recieve data') return self.detect(data, cache) diff --git a/server/src/services/data_puller.ts b/server/src/services/data_puller.ts index 035051c..f46d750 100644 --- a/server/src/services/data_puller.ts +++ b/server/src/services/data_puller.ts @@ -58,7 +58,7 @@ export class DataPuller { try { this.analyticsService.sendTask(task); - console.log(`data puller successfuly pushed data for unit id: ${unit.id}`); + console.log(`data puller successfuly pushed ${data.data.length} points for unit id: ${unit.id}`); } catch(e) { console.log(`data puller got error while push data ${e.message}`); }