Browse Source

Merge branch 'master' of github.com:hastic/hastic-server

pull/1/head
amper43 5 years ago
parent
commit
1a36f5111a
  1. 11
      analytics/analytics/analytic_types/detector_typing.py
  2. 2
      analytics/analytics/analytic_unit_manager.py
  3. 138
      analytics/analytics/detectors/anomaly_detector.py
  4. 27
      analytics/analytics/detectors/detector.py
  5. 2
      analytics/analytics/detectors/pattern_detector.py
  6. 32
      analytics/analytics/detectors/threshold_detector.py
  7. 3
      analytics/analytics/utils/common.py
  8. 92
      analytics/tests/test_detectors.py
  9. 22
      server/src/controllers/analytics_controller.ts

11
analytics/analytics/analytic_types/detector_typing.py

@ -32,17 +32,6 @@ class DetectionResult:
@utils.meta.JSONClass
class ProcessingResult():
def __init__(
self,
data: Optional[TimeSeries] = None
):
if data is None:
data = []
self.data = data
@utils.meta.JSONClass
class AnomalyProcessingResult():
def __init__(
self,
lower_bound: Optional[TimeSeries] = None,

2
analytics/analytics/analytic_unit_manager.py

@ -17,7 +17,7 @@ def get_detector_by_type(
if detector_type == 'pattern':
return detectors.PatternDetector(analytic_unit_type, analytic_unit_id)
elif detector_type == 'threshold':
return detectors.ThresholdDetector()
return detectors.ThresholdDetector(analytic_unit_id)
elif detector_type == 'anomaly':
return detectors.AnomalyDetector(analytic_unit_id)

138
analytics/analytics/detectors/anomaly_detector.py

@ -1,14 +1,12 @@
from enum import Enum
import logging
import numpy as np
import operator
from collections import OrderedDict
import pandas as pd
import math
from typing import Optional, Union, List, Tuple
from analytic_types import AnalyticUnitId, ModelCache
from analytic_types.detector_typing import DetectionResult, AnomalyProcessingResult
from analytic_types.detector_typing import DetectionResult, ProcessingResult
from analytic_types.data_bucket import DataBucket
from analytic_types.segment import Segment
from detectors import Detector, ProcessingDetector
@ -27,12 +25,12 @@ class Bound(Enum):
class AnomalyDetector(ProcessingDetector):
def __init__(self, analytic_unit_id: AnalyticUnitId):
self.analytic_unit_id = analytic_unit_id
super().__init__(analytic_unit_id)
self.bucket = DataBucket()
def train(self, dataframe: pd.DataFrame, payload: Union[list, dict], cache: Optional[ModelCache]) -> ModelCache:
segments = payload.get('segments')
enable_bounds: str = payload.get('enableBounds') or 'ALL'
enable_bounds = Bound(payload.get('enableBounds') or 'ALL')
prepared_segments = []
time_step = utils.find_interval(dataframe)
@ -40,7 +38,7 @@ class AnomalyDetector(ProcessingDetector):
'confidence': payload['confidence'],
'alpha': payload['alpha'],
'timeStep': time_step,
'enableBounds': enable_bounds
'enableBounds': enable_bounds.value
}
if segments is not None:
@ -65,55 +63,53 @@ class AnomalyDetector(ProcessingDetector):
'cache': new_cache
}
# TODO: ModelCache -> ModelState
# TODO: ModelCache -> DetectorState
def detect(self, dataframe: pd.DataFrame, cache: Optional[ModelCache]) -> DetectionResult:
if cache == None:
raise f'Analytic unit {self.analytic_unit_id} got empty cache'
data = dataframe['value']
time_step = cache['timeStep']
segments = cache.get('segments')
enable_bounds: str = cache.get('enableBounds') or 'ALL'
smoothed_data = utils.exponential_smoothing(data, cache['alpha'])
# TODO: use class for cache to avoid using string literals
alpha = self.get_value_from_cache(cache, 'alpha', required = True)
confidence = self.get_value_from_cache(cache, 'confidence', required = True)
segments = self.get_value_from_cache(cache, 'segments')
enable_bounds = Bound(self.get_value_from_cache(cache, 'enableBounds') or 'ALL')
# TODO: use class for cache to avoid using string literals and Bound.TYPE.value
bounds = OrderedDict()
bounds[Bound.LOWER.value] = ( smoothed_data - cache['confidence'], operator.lt )
bounds[Bound.UPPER.value] = ( smoothed_data + cache['confidence'], operator.gt )
if enable_bounds == Bound.LOWER.value:
del bounds[Bound.UPPER.value]
if enable_bounds == Bound.UPPER.value:
del bounds[Bound.LOWER.value]
smoothed_data = utils.exponential_smoothing(data, alpha)
lower_bound = smoothed_data - confidence
upper_bound = smoothed_data + confidence
if segments is not None:
seasonality = cache.get('seasonality')
assert seasonality is not None and seasonality > 0, \
time_step = self.get_value_from_cache(cache, 'timeStep', required = True)
seasonality = self.get_value_from_cache(cache, 'seasonality', required = True)
assert seasonality > 0, \
f'{self.analytic_unit_id} got invalid seasonality {seasonality}'
data_start_time = utils.convert_pd_timestamp_to_ms(dataframe['timestamp'][0])
data_second_time = utils.convert_pd_timestamp_to_ms(dataframe['timestamp'][1])
for segment in segments:
seasonality_index = seasonality // time_step
season_count = math.ceil(abs(segment['from'] - data_start_time) / seasonality)
start_seasonal_segment = segment['from'] + seasonality * season_count
seasonality_offset = (abs(start_seasonal_segment - data_start_time) % seasonality) // time_step
#TODO: upper and lower bounds for segment_data
segment_data = pd.Series(segment['data'])
for bound_type, bound_data in bounds.items():
bound_data, _ = bound_data
bounds[bound_type] = self.add_season_to_data(bound_data, segment_data, seasonality_offset, seasonality_index, bound_type)
assert len(smoothed_data) == len(bounds[bound_type]), \
f'len smoothed {len(smoothed_data)} != len seasonality {len(bounds[bound_type])}'
lower_bound = self.add_season_to_data(lower_bound, segment_data, seasonality_offset, seasonality_index, Bound.LOWER)
upper_bound = self.add_season_to_data(upper_bound, segment_data, seasonality_offset, seasonality_index, Bound.UPPER)
anomaly_indexes = []
for idx, val in enumerate(data.values):
for bound_type, bound_data in bounds.items():
bound_data, comparator = bound_data
if comparator(val, bound_data.values[idx]):
if val > upper_bound.values[idx]:
if enable_bounds == Bound.UPPER or enable_bounds == Bound.ALL:
anomaly_indexes.append(data.index[idx])
if val < lower_bound.values[idx]:
if enable_bounds == Bound.LOWER or enable_bounds == Bound.ALL:
anomaly_indexes.append(data.index[idx])
# TODO: use Segment in utils
segments = utils.close_filtering(anomaly_indexes, 1)
segments = utils.get_start_and_end_of_segments(segments)
@ -176,34 +172,27 @@ class AnomalyDetector(ProcessingDetector):
result.segments = utils.merge_intersecting_segments(result.segments, time_step)
return result
# TODO: ModelCache -> ModelState (don't use string literals)
def process_data(self, dataframe: pd.DataFrame, cache: ModelCache) -> AnomalyProcessingResult:
segments = cache.get('segments')
enable_bounds: str = cache.get('enableBounds') or 'ALL'
# TODO: remove duplication with detect()
def process_data(self, dataframe: pd.DataFrame, cache: ModelCache) -> ProcessingResult:
segments = self.get_value_from_cache(cache, 'segments')
alpha = self.get_value_from_cache(cache, 'alpha', required = True)
confidence = self.get_value_from_cache(cache, 'confidence', required = True)
enable_bounds = Bound(self.get_value_from_cache(cache, 'enableBounds') or 'ALL')
# TODO: exponential_smoothing should return dataframe with related timestamps
smoothed_data = utils.exponential_smoothing(dataframe['value'], cache['alpha'])
bounds = OrderedDict()
bounds[Bound.LOWER.value] = smoothed_data - cache['confidence']
bounds[Bound.UPPER.value] = smoothed_data + cache['confidence']
if enable_bounds == Bound.LOWER.value:
del bounds[Bound.UPPER.value]
if enable_bounds == Bound.UPPER.value:
del bounds[Bound.LOWER.value]
smoothed_data = utils.exponential_smoothing(dataframe['value'], alpha)
# TODO: remove duplication with detect()
lower_bound = smoothed_data - confidence
upper_bound = smoothed_data + confidence
if segments is not None:
seasonality = cache.get('seasonality')
assert seasonality is not None and seasonality > 0, \
seasonality = self.get_value_from_cache(cache, 'seasonality', required = True)
assert seasonality > 0, \
f'{self.analytic_unit_id} got invalid seasonality {seasonality}'
data_start_time = utils.convert_pd_timestamp_to_ms(dataframe['timestamp'][0])
time_step = cache['timeStep']
time_step = self.get_value_from_cache(cache, 'timeStep', required = True)
for segment in segments:
seasonality_index = seasonality // time_step
@ -212,19 +201,22 @@ class AnomalyDetector(ProcessingDetector):
start_seasonal_segment = segment['from'] + seasonality * season_count
seasonality_offset = (abs(start_seasonal_segment - data_start_time) % seasonality) // time_step
segment_data = pd.Series(segment['data'])
for bound_type, bound_data in bounds.items():
bounds[bound_type] = self.add_season_to_data(bound_data, segment_data, seasonality_offset, seasonality_index, bound_type)
assert len(smoothed_data) == len(bounds[bound_type]), \
f'len smoothed {len(smoothed_data)} != len seasonality {len(bounds[bound_type])}'
lower_bound = self.add_season_to_data(lower_bound, segment_data, seasonality_offset, seasonality_index, Bound.LOWER)
upper_bound = self.add_season_to_data(upper_bound, segment_data, seasonality_offset, seasonality_index, Bound.UPPER)
# TODO: support multiple segments
timestamps = utils.convert_series_to_timestamp_list(dataframe.timestamp)
result_bounds = {}
for bound_type, bound_data in bounds.items():
result_bounds[bound_type] = list(zip(timestamps, bound_data.values.tolist()))
result = AnomalyProcessingResult(lower_bound=result_bounds.get(Bound.LOWER.value), upper_bound=result_bounds.get(Bound.UPPER.value))
return result
lower_bound_timeseries = list(zip(timestamps, lower_bound.values.tolist()))
upper_bound_timeseries = list(zip(timestamps, upper_bound.values.tolist()))
if enable_bounds == Bound.ALL:
return ProcessingResult(lower_bound_timeseries, upper_bound_timeseries)
elif enable_bounds == Bound.UPPER:
return ProcessingResult(upper_bound = upper_bound_timeseries)
elif enable_bounds == Bound.LOWER:
return ProcessingResult(lower_bound = lower_bound_timeseries)
def add_season_to_data(self, data: pd.Series, segment: pd.Series, offset: int, seasonality: int, bound_type: Bound) -> pd.Series:
#data - smoothed data to which seasonality will be added
@ -236,33 +228,17 @@ class AnomalyDetector(ProcessingDetector):
#TODO: add seasonality for non empty parts
continue
if (idx - offset) % seasonality == 0:
if bound_type == Bound.UPPER.value:
if bound_type == Bound.UPPER:
upper_segment_bound = self.get_bounds_for_segment(segment)[0]
data = data.add(pd.Series(upper_segment_bound.values, index = segment.index + idx), fill_value = 0)
elif bound_type == Bound.LOWER.value:
elif bound_type == Bound.LOWER:
lower_segment_bound = self.get_bounds_for_segment(segment)[1]
data = data.add(pd.Series(lower_segment_bound.values * -1, index = segment.index + idx), fill_value = 0)
else:
raise ValueError(f'unknown {bound_type}')
raise ValueError(f'unknown bound type: {bound_type.value}')
return data[:len_smoothed_data]
def concat_processing_results(self, processing_results: List[AnomalyProcessingResult]) -> Optional[AnomalyProcessingResult]:
if len(processing_results) == 0:
return None
united_result = AnomalyProcessingResult()
for result in processing_results:
if result.lower_bound is not None:
if united_result.lower_bound is None: united_result.lower_bound = []
united_result.lower_bound.extend(result.lower_bound)
if result.upper_bound is not None:
if united_result.upper_bound is None: united_result.upper_bound = []
united_result.upper_bound.extend(result.upper_bound)
return united_result
def get_bounds_for_segment(self, segment: pd.Series) -> Tuple[pd.Series, pd.Series]:
'''
segment is divided by the median to determine its top and bottom parts

27
analytics/analytics/detectors/detector.py

@ -2,13 +2,16 @@ from abc import ABC, abstractmethod
from pandas import DataFrame
from typing import Optional, Union, List
from analytic_types import ModelCache, TimeSeries
from analytic_types import ModelCache, TimeSeries, AnalyticUnitId
from analytic_types.detector_typing import DetectionResult, ProcessingResult
from analytic_types.segment import Segment
class Detector(ABC):
def __init__(self, analytic_unit_id: AnalyticUnitId):
self.analytic_unit_id = analytic_unit_id
@abstractmethod
def train(self, dataframe: DataFrame, payload: Union[list, dict], cache: Optional[ModelCache]) -> ModelCache:
"""
@ -39,19 +42,39 @@ class Detector(ABC):
result.cache = detection.cache
return result
def get_value_from_cache(self, cache: ModelCache, key: str, required = False):
value = cache.get(key)
if value == None and required:
raise ValueError(f'Missing required "{key}" field in cache for analytic unit {self.analytic_unit_id}')
return value
class ProcessingDetector(Detector):
@abstractmethod
def process_data(self, data: TimeSeries, cache: Optional[ModelCache]) -> ProcessingResult:
'''
Data processing to receive additional time series that represents detector's settings
'''
pass
def concat_processing_results(self, processing_results: List[ProcessingResult]) -> Optional[ProcessingResult]:
'''
Concatenate sequential ProcessingResults that received via
splitting dataset to chunks in analytic worker
'''
if len(processing_results) == 0:
return None
united_result = ProcessingResult()
for result in processing_results:
united_result.data.extend(result.data)
if result.lower_bound is not None:
if united_result.lower_bound is None: united_result.lower_bound = []
united_result.lower_bound.extend(result.lower_bound)
if result.upper_bound is not None:
if united_result.upper_bound is None: united_result.upper_bound = []
united_result.upper_bound.extend(result.upper_bound)
return united_result

2
analytics/analytics/detectors/pattern_detector.py

@ -41,7 +41,7 @@ class PatternDetector(Detector):
DEFAULT_WINDOW_SIZE = 1
def __init__(self, pattern_type: str, analytic_unit_id: AnalyticUnitId):
self.analytic_unit_id = analytic_unit_id
super().__init__(analytic_unit_id)
self.model = resolve_model_by_pattern(pattern_type)
self.bucket = DataBucket()

32
analytics/analytics/detectors/threshold_detector.py

@ -5,10 +5,10 @@ import pandas as pd
import numpy as np
from typing import Optional, List
from analytic_types import ModelCache
from analytic_types.detector_typing import DetectionResult
from analytic_types import ModelCache, AnalyticUnitId
from analytic_types.detector_typing import DetectionResult, ProcessingResult
from analytic_types.segment import Segment
from detectors import Detector
from detectors import ProcessingDetector
from time import time
import utils
@ -16,12 +16,12 @@ import utils
logger = log.getLogger('THRESHOLD_DETECTOR')
class ThresholdDetector(Detector):
class ThresholdDetector(ProcessingDetector):
WINDOW_SIZE = 3
def __init__(self):
pass
def __init__(self, analytic_unit_id: AnalyticUnitId):
super().__init__(analytic_unit_id)
def train(self, dataframe: pd.DataFrame, threshold: dict, cache: Optional[ModelCache]) -> ModelCache:
time_step = utils.find_interval(dataframe)
@ -89,3 +89,23 @@ class ThresholdDetector(Detector):
result.cache = detection.cache
result.segments = utils.merge_intersecting_segments(result.segments, time_step)
return result
def process_data(self, dataframe: pd.DataFrame, cache: ModelCache) -> ProcessingResult:
data = dataframe['value']
value = self.get_value_from_cache(cache, 'value', required = True)
condition = self.get_value_from_cache(cache, 'condition', required = True)
if condition == 'NO_DATA':
return ProcessingResult()
data.values[:] = value
timestamps = utils.convert_series_to_timestamp_list(dataframe.timestamp)
result_series = list(zip(timestamps, data.values.tolist()))
if condition in ['>', '>=', '=']:
return ProcessingResult(upper_bound = result_series)
if condition in ['<', '<=']:
return ProcessingResult(lower_bound = result_series)
raise ValueError(f'{condition} condition not supported')

3
analytics/analytics/utils/common.py

@ -36,6 +36,9 @@ def exponential_smoothing(series: pd.Series, alpha: float, last_smoothed_value:
series.values[n] = result[n]
else:
result.append(alpha * series[n] + (1 - alpha) * result[n - 1])
assert len(result) == len(series), \
f'len of smoothed data {len(result)} != len of original dataset {len(series)}'
return pd.Series(result, index = series.index)
def find_pattern(data: pd.Series, height: float, length: int, pattern_type: str) -> list:

92
analytics/tests/test_detectors.py

@ -2,7 +2,7 @@ import unittest
import pandas as pd
from detectors import pattern_detector, threshold_detector, anomaly_detector
from analytic_types.detector_typing import DetectionResult
from analytic_types.detector_typing import DetectionResult, ProcessingResult
class TestPatternDetector(unittest.TestCase):
@ -13,7 +13,6 @@ class TestPatternDetector(unittest.TestCase):
cache = { 'windowSize': 10 }
detector = pattern_detector.PatternDetector('GENERAL', 'test_id')
with self.assertRaises(ValueError):
detector.detect(dataframe, cache)
@ -22,7 +21,7 @@ class TestThresholdDetector(unittest.TestCase):
def test_invalid_cache(self):
detector = threshold_detector.ThresholdDetector()
detector = threshold_detector.ThresholdDetector('test_id')
with self.assertRaises(ValueError):
detector.detect([], None)
@ -33,7 +32,7 @@ class TestThresholdDetector(unittest.TestCase):
class TestAnomalyDetector(unittest.TestCase):
def test_dataframe(self):
def test_detect(self):
data_val = [0, 1, 2, 1, 2, 10, 1, 2, 1]
data_ind = [1523889000000 + i for i in range(len(data_val))]
data = {'timestamp': data_ind, 'value': data_val}
@ -45,8 +44,91 @@ class TestAnomalyDetector(unittest.TestCase):
'timeStep': 1
}
detector = anomaly_detector.AnomalyDetector('test_id')
detect_result: DetectionResult = detector.detect(dataframe, cache)
detect_result: DetectionResult = detector.detect(dataframe, cache)
detected_segments = list(map(lambda s: {'from': s.from_timestamp, 'to': s.to_timestamp}, detect_result.segments))
result = [{ 'from': 1523889000005.0, 'to': 1523889000005.0 }]
self.assertEqual(result, detected_segments)
cache = {
'confidence': 2,
'alpha': 0.1,
'timeStep': 1,
'seasonality': 4,
'segments': [{ 'from': 1523889000001, 'to': 1523889000002, 'data': [10] }]
}
detect_result: DetectionResult = detector.detect(dataframe, cache)
detected_segments = list(map(lambda s: {'from': s.from_timestamp, 'to': s.to_timestamp}, detect_result.segments))
result = []
self.assertEqual(result, detected_segments)
def test_process_data(self):
data_val = [0, 1, 2, 1, 2, 10, 1, 2, 1]
data_ind = [1523889000000 + i for i in range(len(data_val))]
data = {'timestamp': data_ind, 'value': data_val}
dataframe = pd.DataFrame(data = data)
dataframe['timestamp'] = pd.to_datetime(dataframe['timestamp'], unit='ms')
cache = {
'confidence': 2,
'alpha': 0.1,
'timeStep': 1
}
detector = anomaly_detector.AnomalyDetector('test_id')
detect_result: ProcessingResult = detector.process_data(dataframe, cache)
expected_result = {
'lowerBound': [
(1523889000000, -2.0),
(1523889000001, -1.9),
(1523889000002, -1.71),
(1523889000003, -1.6389999999999998),
(1523889000004, -1.4750999999999999),
(1523889000005, -0.5275899999999998),
(1523889000006, -0.5748309999999996),
(1523889000007, -0.5173478999999996),
(1523889000008, -0.5656131099999995)
],
'upperBound': [
(1523889000000, 2.0),
(1523889000001, 2.1),
(1523889000002, 2.29),
(1523889000003, 2.361),
(1523889000004, 2.5249),
(1523889000005, 3.47241),
(1523889000006, 3.4251690000000004),
(1523889000007, 3.4826521),
(1523889000008, 3.4343868900000007)
]}
self.assertEqual(detect_result.to_json(), expected_result)
cache = {
'confidence': 2,
'alpha': 0.1,
'timeStep': 1,
'seasonality': 5,
'segments': [{ 'from': 1523889000001, 'to': 1523889000002, 'data': [1] }]
}
detect_result: ProcessingResult = detector.process_data(dataframe, cache)
expected_result = {
'lowerBound': [
(1523889000000, -2.0),
(1523889000001, -2.9),
(1523889000002, -1.71),
(1523889000003, -1.6389999999999998),
(1523889000004, -1.4750999999999999),
(1523889000005, -0.5275899999999998),
(1523889000006, -1.5748309999999996),
(1523889000007, -0.5173478999999996),
(1523889000008, -0.5656131099999995)
],
'upperBound': [
(1523889000000, 2.0),
(1523889000001, 3.1),
(1523889000002, 2.29),
(1523889000003, 2.361),
(1523889000004, 2.5249),
(1523889000005, 3.47241),
(1523889000006, 4.425169),
(1523889000007, 3.4826521),
(1523889000008, 3.4343868900000007)
]}
self.assertEqual(detect_result.to_json(), expected_result)

22
server/src/controllers/analytics_controller.ts

@ -30,6 +30,12 @@ type TableTimeSeries = { values: [number, number][], columns: string[] };
type TimeRange = { from: number, to: number };
export type TaskResolver = (taskResult: TaskResult) => void;
type HSRResult = {
hsr: TableTimeSeries,
lowerBound?: TableTimeSeries
upperBound?: TableTimeSeries
}
const taskResolvers = new Map<AnalyticsTaskId, TaskResolver>();
let analyticsService: AnalyticsService = undefined;
@ -631,10 +637,14 @@ export async function getHSR(
try {
const grafanaUrl = getGrafanaUrl(analyticUnit.grafanaUrl);
const data = await queryByMetric(analyticUnit.metric, grafanaUrl, from, to, HASTIC_API_KEY);
let resultSeries: HSRResult = {
hsr: data
}
if(analyticUnit.detectorType !== AnalyticUnit.DetectorType.ANOMALY) {
return { hsr: data };
if(analyticUnit.detectorType === AnalyticUnit.DetectorType.PATTERN) {
return resultSeries;
}
let cache = await AnalyticUnitCache.findById(analyticUnit.id);
if(
cache === null ||
@ -662,16 +672,12 @@ export async function getHSR(
throw new Error(`Data processing error: ${result.error}`);
}
let resultSeries = {
hsr: data
}
if(result.payload.lowerBound !== undefined) {
resultSeries['lowerBound'] = { values: result.payload.lowerBound, columns: data.columns };
resultSeries.lowerBound = { values: result.payload.lowerBound, columns: data.columns };
}
if(result.payload.upperBound !== undefined) {
resultSeries['upperBound'] = { values: result.payload.upperBound, columns: data.columns };
resultSeries.upperBound = { values: result.payload.upperBound, columns: data.columns };
}
return resultSeries;

Loading…
Cancel
Save