diff --git a/analytics/analytics/detectors/threshold_detector.py b/analytics/analytics/detectors/threshold_detector.py index 8d92130..ad4fd13 100644 --- a/analytics/analytics/detectors/threshold_detector.py +++ b/analytics/analytics/detectors/threshold_detector.py @@ -1,6 +1,7 @@ import logging as log import pandas as pd +import numpy as np from typing import Optional from detectors import Detector @@ -30,44 +31,47 @@ class ThresholdDetector(Detector): def detect(self, dataframe: pd.DataFrame, cache: ModelCache) -> dict: if cache is None or cache == {}: raise ValueError('Threshold detector error: cannot detect before learning') + if len(dataframe) == 0: + return None + value = cache['value'] condition = cache['condition'] - now = convert_sec_to_ms(time()) segments = [] + for index, row in dataframe.iterrows(): + current_timestamp = convert_pd_timestamp_to_ms(row['timestamp']) + segment = { 'from': current_timestamp, 'to': current_timestamp } + # TODO: merge segments + if pd.isnull(row['value']): + if condition == 'NO_DATA': + segment['params'] = { value: None } + segments.append(segment) + continue - dataframe_without_nans = dataframe.dropna() - if len(dataframe_without_nans) == 0: - if condition == 'NO_DATA': - segments.append({ 'from': now, 'to': now , 'params': { value: 'NO_DATA' } }) - else: - return None - else: - last_entry = dataframe_without_nans.iloc[-1] - last_time = convert_pd_timestamp_to_ms(last_entry['timestamp']) - last_value = float(last_entry['value']) - segment = { 'from': last_time, 'to': last_time, 'params': { value: last_value } } - + current_value = row['value'] + segment['params'] = { value: row['value'] } if condition == '>': - if last_value > value: + if current_value > value: segments.append(segment) elif condition == '>=': - if last_value >= value: + if current_value >= value: segments.append(segment) elif condition == '=': - if last_value == value: + if current_value == value: segments.append(segment) elif condition == '<=': - if last_value <= value: + if current_value <= value: segments.append(segment) elif condition == '<': - if last_value < value: + if current_value < value: segments.append(segment) + last_entry = dataframe.iloc[-1] + last_detection_time = convert_pd_timestamp_to_ms(last_entry['timestamp']) return { 'cache': cache, 'segments': segments, - 'lastDetectionTime': now + 'lastDetectionTime': last_detection_time } def consume_data(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> Optional[dict]: