diff --git a/analytics/analytics/analytic_types/detector_typing.py b/analytics/analytics/analytic_types/detector_typing.py index 2291875..0c9e278 100644 --- a/analytics/analytics/analytic_types/detector_typing.py +++ b/analytics/analytics/analytic_types/detector_typing.py @@ -11,8 +11,7 @@ class DetectionResult: self, cache: Optional[ModelCache] = None, segments: Optional[List[Segment]] = None, - last_detection_time: int = None, - time_step: int = None + last_detection_time: int = None ): if cache is None: cache = {} @@ -21,15 +20,13 @@ class DetectionResult: self.cache = cache self.segments = segments self.last_detection_time = last_detection_time - self.time_step = time_step # TODO: use @utils.meta.JSONClass (now it can't serialize list of objects) def to_json(self): return { 'cache': self.cache, 'segments': list(map(lambda segment: segment.to_json(), self.segments)), - 'lastDetectionTime': self.last_detection_time, - 'timeStep': self.time_step + 'lastDetectionTime': self.last_detection_time } @utils.meta.JSONClass diff --git a/analytics/analytics/detectors/anomaly_detector.py b/analytics/analytics/detectors/anomaly_detector.py index c72f3a7..05a86ae 100644 --- a/analytics/analytics/detectors/anomaly_detector.py +++ b/analytics/analytics/detectors/anomaly_detector.py @@ -26,10 +26,12 @@ class AnomalyDetector(ProcessingDetector): def train(self, dataframe: pd.DataFrame, payload: Union[list, dict], cache: Optional[ModelCache]) -> ModelCache: segments = payload.get('segments') prepared_segments = [] + time_step = utils.find_interval(dataframe) new_cache = { 'confidence': payload['confidence'], - 'alpha': payload['alpha'] + 'alpha': payload['alpha'], + 'timeStep': time_step } if segments is not None: @@ -47,12 +49,8 @@ class AnomalyDetector(ProcessingDetector): segment_data = dataframe[from_index : to_index] prepared_segments.append({'from': segment['from'], 'data': segment_data.value.tolist()}) - data_start_time = utils.convert_pd_timestamp_to_ms(dataframe['timestamp'][0]) - data_second_time = utils.convert_pd_timestamp_to_ms(dataframe['timestamp'][1]) - time_step = data_second_time - data_start_time new_cache['seasonality'] = seasonality new_cache['segments'] = prepared_segments - new_cache['timeStep'] = time_step return { 'cache': new_cache @@ -61,9 +59,9 @@ class AnomalyDetector(ProcessingDetector): # TODO: ModelCache -> ModelState def detect(self, dataframe: pd.DataFrame, cache: Optional[ModelCache]) -> DetectionResult: data = dataframe['value'] + time_step = cache['timeStep'] segments = cache.get('segments') - time_step = utils.find_interval(dataframe) smoothed_data = utils.exponential_smoothing(data, cache['alpha']) # TODO: use class for cache to avoid using string literals @@ -78,7 +76,6 @@ class AnomalyDetector(ProcessingDetector): data_start_time = utils.convert_pd_timestamp_to_ms(dataframe['timestamp'][0]) data_second_time = utils.convert_pd_timestamp_to_ms(dataframe['timestamp'][1]) - time_step = data_second_time - data_start_time for segment in segments: seasonality_index = seasonality // time_step @@ -113,7 +110,7 @@ class AnomalyDetector(ProcessingDetector): last_dataframe_time = dataframe.iloc[-1]['timestamp'] last_detection_time = utils.convert_pd_timestamp_to_ms(last_dataframe_time) - return DetectionResult(cache, segments, last_detection_time, time_step) + return DetectionResult(cache, segments, last_detection_time) def consume_data(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> Optional[DetectionResult]: if cache is None: @@ -155,7 +152,7 @@ class AnomalyDetector(ProcessingDetector): def concat_detection_results(self, detections: List[DetectionResult]) -> DetectionResult: result = DetectionResult() - time_step = detections[0].time_step + time_step = detections[0].cache['timeStep'] for detection in detections: result.segments.extend(detection.segments) result.last_detection_time = detection.last_detection_time @@ -180,7 +177,7 @@ class AnomalyDetector(ProcessingDetector): f'{self.analytic_unit_id} got invalid seasonality {seasonality}' data_start_time = utils.convert_pd_timestamp_to_ms(dataframe['timestamp'][0]) - time_step = utils.convert_pd_timestamp_to_ms(dataframe['timestamp'][1]) - utils.convert_pd_timestamp_to_ms(dataframe['timestamp'][0]) + time_step = cache['timeStep'] for segment in segments: seasonality_index = seasonality // time_step diff --git a/analytics/analytics/detectors/pattern_detector.py b/analytics/analytics/detectors/pattern_detector.py index 7052294..b95b9c1 100644 --- a/analytics/analytics/detectors/pattern_detector.py +++ b/analytics/analytics/detectors/pattern_detector.py @@ -65,8 +65,10 @@ class PatternDetector(Detector): msg = f'{self.analytic_unit_id} detection got invalid cache, skip detection' logger.error(msg) raise ValueError(msg) + self.model.state = self.model.get_state(cache) window_size = self.model.state.window_size + if window_size is None: message = '{} got cache without window_size for detection'.format(self.analytic_unit_id) logger.error(message) @@ -77,14 +79,13 @@ class PatternDetector(Detector): logger.error(message) raise ValueError(message) - time_step = utils.find_interval(dataframe) detected = self.model.detect(dataframe, self.analytic_unit_id) segments = [Segment(segment[0], segment[1]) for segment in detected['segments']] new_cache = detected['cache'].to_json() last_dataframe_time = dataframe.iloc[-1]['timestamp'] last_detection_time = convert_pd_timestamp_to_ms(last_dataframe_time) - return DetectionResult(new_cache, segments, last_detection_time, time_step) + return DetectionResult(new_cache, segments, last_detection_time) def consume_data(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> Optional[DetectionResult]: logging.debug('Start consume_data for analytic unit {}'.format(self.analytic_unit_id)) diff --git a/analytics/analytics/detectors/threshold_detector.py b/analytics/analytics/detectors/threshold_detector.py index 74fa13d..6ed0fdb 100644 --- a/analytics/analytics/detectors/threshold_detector.py +++ b/analytics/analytics/detectors/threshold_detector.py @@ -23,10 +23,12 @@ class ThresholdDetector(Detector): pass def train(self, dataframe: pd.DataFrame, threshold: dict, cache: Optional[ModelCache]) -> ModelCache: + time_step = utils.find_interval(dataframe) return { 'cache': { 'value': threshold['value'], - 'condition': threshold['condition'] + 'condition': threshold['condition'], + 'timeStep': time_step } } @@ -38,7 +40,6 @@ class ThresholdDetector(Detector): value = cache['value'] condition = cache['condition'] - time_step = utils.find_interval(dataframe) segments = [] for index, row in dataframe.iterrows(): @@ -69,7 +70,7 @@ class ThresholdDetector(Detector): last_entry = dataframe.iloc[-1] last_detection_time = utils.convert_pd_timestamp_to_ms(last_entry['timestamp']) - return DetectionResult(cache, segments, last_detection_time, time_step) + return DetectionResult(cache, segments, last_detection_time) def consume_data(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> Optional[DetectionResult]: @@ -81,7 +82,7 @@ class ThresholdDetector(Detector): def concat_detection_results(self, detections: List[DetectionResult]) -> DetectionResult: result = DetectionResult() - time_step = detections[0].time_step + time_step = detections[0].cache['timeStep'] for detection in detections: result.segments.extend(detection.segments) result.last_detection_time = detection.last_detection_time diff --git a/analytics/tests/test_detectors.py b/analytics/tests/test_detectors.py index 78acbdb..61f13f3 100644 --- a/analytics/tests/test_detectors.py +++ b/analytics/tests/test_detectors.py @@ -41,6 +41,7 @@ class TestAnomalyDetector(unittest.TestCase): cache = { 'confidence': 2, 'alpha': 0.1, + 'timeStep': 1 } detector = anomaly_detector.AnomalyDetector('test_id') detect_result = detector.detect(dataframe, cache) diff --git a/server/src/controllers/analytics_controller.ts b/server/src/controllers/analytics_controller.ts index 0a146b5..b91d339 100644 --- a/server/src/controllers/analytics_controller.ts +++ b/server/src/controllers/analytics_controller.ts @@ -256,6 +256,7 @@ export async function runLearning(id: AnalyticUnit.AnalyticUnitId, from?: number value: (analyticUnit as ThresholdAnalyticUnit).value, condition: (analyticUnit as ThresholdAnalyticUnit).condition }; + taskPayload.data = await getPayloadData(analyticUnit, from, to); break; case AnalyticUnit.DetectorType.ANOMALY: taskPayload.anomaly = { @@ -263,6 +264,8 @@ export async function runLearning(id: AnalyticUnit.AnalyticUnitId, from?: number confidence: (analyticUnit as AnomalyAnalyticUnit).confidence }; + taskPayload.data = await getPayloadData(analyticUnit, from, to); + const seasonality = (analyticUnit as AnomalyAnalyticUnit).seasonality; if(seasonality > 0) { let segments = await Segment.findMany(id, { deleted: true }); @@ -274,7 +277,6 @@ export async function runLearning(id: AnalyticUnit.AnalyticUnitId, from?: number let segmentObjs = segments.map(s => s.toObject()); taskPayload.anomaly.segments = segmentObjs; - taskPayload.data = await getPayloadData(analyticUnit, from, to); } break; default: