Browse Source

Anomaly detector webhooks fix (#670)

pull/1/head
Evgeny Smyshlyaev 5 years ago committed by rozetko
parent
commit
0fa4268d57
  1. 7
      analytics/analytics/analytic_types/detector_typing.py
  2. 6
      analytics/analytics/analytic_unit_worker.py
  3. 8
      analytics/analytics/detectors/anomaly_detector.py
  4. 2
      analytics/analytics/detectors/detector.py
  5. 5
      analytics/analytics/detectors/pattern_detector.py
  6. 6
      analytics/analytics/detectors/threshold_detector.py
  7. 7
      analytics/analytics/utils/common.py

7
analytics/analytics/analytic_types/detector_typing.py

@ -11,7 +11,8 @@ class DetectionResult:
self, self,
cache: Optional[ModelCache] = None, cache: Optional[ModelCache] = None,
segments: Optional[List[Segment]] = None, segments: Optional[List[Segment]] = None,
last_detection_time: int = None last_detection_time: int = None,
time_step: int = None
): ):
if cache is None: if cache is None:
cache = {} cache = {}
@ -20,13 +21,15 @@ class DetectionResult:
self.cache = cache self.cache = cache
self.segments = segments self.segments = segments
self.last_detection_time = last_detection_time self.last_detection_time = last_detection_time
self.time_step = time_step
# TODO: use @utils.meta.JSONClass (now it can't serialize list of objects) # TODO: use @utils.meta.JSONClass (now it can't serialize list of objects)
def to_json(self): def to_json(self):
return { return {
'cache': self.cache, 'cache': self.cache,
'segments': list(map(lambda segment: segment.to_json(), self.segments)), 'segments': list(map(lambda segment: segment.to_json(), self.segments)),
'lastDetectionTime': self.last_detection_time 'lastDetectionTime': self.last_detection_time,
'timeStep': self.time_step
} }
@utils.meta.JSONClass @utils.meta.JSONClass

6
analytics/analytics/analytic_unit_worker.py

@ -69,8 +69,7 @@ class AnalyticUnitWorker:
if len(detections) == 0: if len(detections) == 0:
raise RuntimeError(f'do_detect for {self.analytic_unit_id} got empty detection results') raise RuntimeError(f'do_detect for {self.analytic_unit_id} got empty detection results')
time_step = utils.find_interval(data) detection_result = self._detector.concat_detection_results(detections)
detection_result = self._detector.concat_detection_results(detections, time_step)
return detection_result.to_json() return detection_result.to_json()
def cancel(self): def cancel(self):
@ -92,8 +91,7 @@ class AnalyticUnitWorker:
if len(detections) == 0: if len(detections) == 0:
return None return None
else: else:
time_step = utils.find_interval(data) detection_result = self._detector.concat_detection_results(detections)
detection_result = self._detector.concat_detection_results(detections, time_step)
return detection_result.to_json() return detection_result.to_json()
async def process_data(self, data: list, cache: ModelCache) -> dict: async def process_data(self, data: list, cache: ModelCache) -> dict:

8
analytics/analytics/detectors/anomaly_detector.py

@ -66,6 +66,7 @@ class AnomalyDetector(ProcessingDetector):
if cache is not None: if cache is not None:
last_value = cache.get('last_value') last_value = cache.get('last_value')
time_step = utils.find_interval(dataframe)
smoothed_data = utils.exponential_smoothing(data, cache['alpha'], last_value) smoothed_data = utils.exponential_smoothing(data, cache['alpha'], last_value)
# TODO: use class for cache to avoid using string literals # TODO: use class for cache to avoid using string literals
@ -115,7 +116,7 @@ class AnomalyDetector(ProcessingDetector):
# TODO: ['lastValue'] -> .last_value # TODO: ['lastValue'] -> .last_value
cache['lastValue'] = smoothed_data.values[-1] cache['lastValue'] = smoothed_data.values[-1]
return DetectionResult(cache, segments, last_detection_time) return DetectionResult(cache, segments, last_detection_time, time_step)
def consume_data(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> Optional[DetectionResult]: def consume_data(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> Optional[DetectionResult]:
if cache is None: if cache is None:
@ -131,7 +132,7 @@ class AnomalyDetector(ProcessingDetector):
self.bucket.receive_data(data_without_nan) self.bucket.receive_data(data_without_nan)
if len(self.bucket.data) >= self.get_window_size(cache): if len(self.bucket.data) >= self.get_window_size(cache):
return self.detect(self.bucket, cache) return self.detect(self.bucket.data, cache)
return None return None
@ -155,8 +156,9 @@ class AnomalyDetector(ProcessingDetector):
seasonality = cache['seasonality'] // cache['timeStep'] seasonality = cache['seasonality'] // cache['timeStep']
return max(level, seasonality) return max(level, seasonality)
def concat_detection_results(self, detections: List[DetectionResult], time_step: int) -> DetectionResult: def concat_detection_results(self, detections: List[DetectionResult]) -> DetectionResult:
result = DetectionResult() result = DetectionResult()
time_step = detections[0].time_step
for detection in detections: for detection in detections:
result.segments.extend(detection.segments) result.segments.extend(detection.segments)
result.last_detection_time = detection.last_detection_time result.last_detection_time = detection.last_detection_time

2
analytics/analytics/detectors/detector.py

@ -31,7 +31,7 @@ class Detector(ABC):
def is_detection_intersected(self) -> bool: def is_detection_intersected(self) -> bool:
return True return True
def concat_detection_results(self, detections: List[DetectionResult], time_step: int) -> DetectionResult: def concat_detection_results(self, detections: List[DetectionResult]) -> DetectionResult:
result = DetectionResult() result = DetectionResult()
for detection in detections: for detection in detections:
result.segments.extend(detection.segments) result.segments.extend(detection.segments)

5
analytics/analytics/detectors/pattern_detector.py

@ -13,7 +13,7 @@ from utils import convert_pd_timestamp_to_ms
from analytic_types import AnalyticUnitId, ModelCache from analytic_types import AnalyticUnitId, ModelCache
from analytic_types.detector_typing import DetectionResult from analytic_types.detector_typing import DetectionResult
from analytic_types.segment import Segment from analytic_types.segment import Segment
import utils
logger = logging.getLogger('PATTERN_DETECTOR') logger = logging.getLogger('PATTERN_DETECTOR')
@ -77,13 +77,14 @@ class PatternDetector(Detector):
logger.error(message) logger.error(message)
raise ValueError(message) raise ValueError(message)
time_step = utils.find_interval(dataframe)
detected = self.model.detect(dataframe, self.analytic_unit_id) detected = self.model.detect(dataframe, self.analytic_unit_id)
segments = [Segment(segment[0], segment[1]) for segment in detected['segments']] segments = [Segment(segment[0], segment[1]) for segment in detected['segments']]
new_cache = detected['cache'].to_json() new_cache = detected['cache'].to_json()
last_dataframe_time = dataframe.iloc[-1]['timestamp'] last_dataframe_time = dataframe.iloc[-1]['timestamp']
last_detection_time = convert_pd_timestamp_to_ms(last_dataframe_time) last_detection_time = convert_pd_timestamp_to_ms(last_dataframe_time)
return DetectionResult(new_cache, segments, last_detection_time) return DetectionResult(new_cache, segments, last_detection_time, time_step)
def consume_data(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> Optional[DetectionResult]: def consume_data(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> Optional[DetectionResult]:
logging.debug('Start consume_data for analytic unit {}'.format(self.analytic_unit_id)) logging.debug('Start consume_data for analytic unit {}'.format(self.analytic_unit_id))

6
analytics/analytics/detectors/threshold_detector.py

@ -38,6 +38,7 @@ class ThresholdDetector(Detector):
value = cache['value'] value = cache['value']
condition = cache['condition'] condition = cache['condition']
time_step = utils.find_interval(dataframe)
segments = [] segments = []
for index, row in dataframe.iterrows(): for index, row in dataframe.iterrows():
@ -68,7 +69,7 @@ class ThresholdDetector(Detector):
last_entry = dataframe.iloc[-1] last_entry = dataframe.iloc[-1]
last_detection_time = utils.convert_pd_timestamp_to_ms(last_entry['timestamp']) last_detection_time = utils.convert_pd_timestamp_to_ms(last_entry['timestamp'])
return DetectionResult(cache, segments, last_detection_time) return DetectionResult(cache, segments, last_detection_time, time_step)
def consume_data(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> Optional[DetectionResult]: def consume_data(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> Optional[DetectionResult]:
@ -78,8 +79,9 @@ class ThresholdDetector(Detector):
def get_window_size(self, cache: Optional[ModelCache]) -> int: def get_window_size(self, cache: Optional[ModelCache]) -> int:
return self.WINDOW_SIZE return self.WINDOW_SIZE
def concat_detection_results(self, detections: List[DetectionResult], time_step: int) -> DetectionResult: def concat_detection_results(self, detections: List[DetectionResult]) -> DetectionResult:
result = DetectionResult() result = DetectionResult()
time_step = detections[0].time_step
for detection in detections: for detection in detections:
result.segments.extend(detection.segments) result.segments.extend(detection.segments)
result.last_detection_time = detection.last_detection_time result.last_detection_time = detection.last_detection_time

7
analytics/analytics/utils/common.py

@ -146,10 +146,11 @@ def merge_intersecting_segments(segments: List[Segment], time_step: int) -> List
segments = [x for x in segments if x is not None] segments = [x for x in segments if x is not None]
return segments return segments
def find_interval(data: TimeSeries) -> int: def find_interval(dataframe: pd.DataFrame) -> int:
if len(data) < 2: if len(dataframe) < 2:
raise ValueError('Can`t find interval: length of data must be at least 2') raise ValueError('Can`t find interval: length of data must be at least 2')
return int(data[1][0] - data[0][0]) delta = utils.convert_pd_timestamp_to_ms(dataframe.timestamp[1]) - utils.convert_pd_timestamp_to_ms(dataframe.timestamp[0])
return delta
def get_start_and_end_of_segments(segments: List[List[int]]) -> List[Tuple[int, int]]: def get_start_and_end_of_segments(segments: List[List[int]]) -> List[Tuple[int, int]]:
''' '''

Loading…
Cancel
Save