Browse Source

Detect thresholds on the whole dataset #505 (#625)

pull/1/head
rozetko 6 years ago committed by GitHub
parent
commit
616bc46123
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 42
      analytics/analytics/detectors/threshold_detector.py

42
analytics/analytics/detectors/threshold_detector.py

@ -1,6 +1,7 @@
import logging as log import logging as log
import pandas as pd import pandas as pd
import numpy as np
from typing import Optional from typing import Optional
from detectors import Detector from detectors import Detector
@ -30,44 +31,47 @@ class ThresholdDetector(Detector):
def detect(self, dataframe: pd.DataFrame, cache: ModelCache) -> dict: def detect(self, dataframe: pd.DataFrame, cache: ModelCache) -> dict:
if cache is None or cache == {}: if cache is None or cache == {}:
raise ValueError('Threshold detector error: cannot detect before learning') raise ValueError('Threshold detector error: cannot detect before learning')
if len(dataframe) == 0:
return None
value = cache['value'] value = cache['value']
condition = cache['condition'] condition = cache['condition']
now = convert_sec_to_ms(time())
segments = [] segments = []
for index, row in dataframe.iterrows():
current_timestamp = convert_pd_timestamp_to_ms(row['timestamp'])
segment = { 'from': current_timestamp, 'to': current_timestamp }
# TODO: merge segments
if pd.isnull(row['value']):
if condition == 'NO_DATA':
segment['params'] = { value: None }
segments.append(segment)
continue
dataframe_without_nans = dataframe.dropna() current_value = row['value']
if len(dataframe_without_nans) == 0: segment['params'] = { value: row['value'] }
if condition == 'NO_DATA':
segments.append({ 'from': now, 'to': now , 'params': { value: 'NO_DATA' } })
else:
return None
else:
last_entry = dataframe_without_nans.iloc[-1]
last_time = convert_pd_timestamp_to_ms(last_entry['timestamp'])
last_value = float(last_entry['value'])
segment = { 'from': last_time, 'to': last_time, 'params': { value: last_value } }
if condition == '>': if condition == '>':
if last_value > value: if current_value > value:
segments.append(segment) segments.append(segment)
elif condition == '>=': elif condition == '>=':
if last_value >= value: if current_value >= value:
segments.append(segment) segments.append(segment)
elif condition == '=': elif condition == '=':
if last_value == value: if current_value == value:
segments.append(segment) segments.append(segment)
elif condition == '<=': elif condition == '<=':
if last_value <= value: if current_value <= value:
segments.append(segment) segments.append(segment)
elif condition == '<': elif condition == '<':
if last_value < value: if current_value < value:
segments.append(segment) segments.append(segment)
last_entry = dataframe.iloc[-1]
last_detection_time = convert_pd_timestamp_to_ms(last_entry['timestamp'])
return { return {
'cache': cache, 'cache': cache,
'segments': segments, 'segments': segments,
'lastDetectionTime': now 'lastDetectionTime': last_detection_time
} }
def consume_data(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> Optional[dict]: def consume_data(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> Optional[dict]:

Loading…
Cancel
Save