Browse Source

Threshold line for HSR #727 (#729)

pull/1/head
Evgeny Smyshlyaev 6 years ago committed by rozetko
parent
commit
b1bd20f030
  1. 11
      analytics/analytics/analytic_types/detector_typing.py
  2. 26
      analytics/analytics/detectors/anomaly_detector.py
  3. 16
      analytics/analytics/detectors/detector.py
  4. 26
      analytics/analytics/detectors/threshold_detector.py
  5. 22
      server/src/controllers/analytics_controller.ts

11
analytics/analytics/analytic_types/detector_typing.py

@ -32,17 +32,6 @@ class DetectionResult:
@utils.meta.JSONClass
class ProcessingResult():
def __init__(
self,
data: Optional[TimeSeries] = None
):
if data is None:
data = []
self.data = data
@utils.meta.JSONClass
class AnomalyProcessingResult():
def __init__(
self,
lower_bound: Optional[TimeSeries] = None,

26
analytics/analytics/detectors/anomaly_detector.py

@ -6,7 +6,7 @@ import math
from typing import Optional, Union, List, Tuple
from analytic_types import AnalyticUnitId, ModelCache
from analytic_types.detector_typing import DetectionResult, AnomalyProcessingResult
from analytic_types.detector_typing import DetectionResult, ProcessingResult
from analytic_types.data_bucket import DataBucket
from analytic_types.segment import Segment
from detectors import Detector, ProcessingDetector
@ -173,7 +173,7 @@ class AnomalyDetector(ProcessingDetector):
return result
# TODO: remove duplication with detect()
def process_data(self, dataframe: pd.DataFrame, cache: ModelCache) -> AnomalyProcessingResult:
def process_data(self, dataframe: pd.DataFrame, cache: ModelCache) -> ProcessingResult:
segments = self.get_value_from_cache(cache, 'segments')
alpha = self.get_value_from_cache(cache, 'alpha', required = True)
confidence = self.get_value_from_cache(cache, 'confidence', required = True)
@ -212,11 +212,11 @@ class AnomalyDetector(ProcessingDetector):
upper_bound_timeseries = list(zip(timestamps, upper_bound.values.tolist()))
if enable_bounds == Bound.ALL:
return AnomalyProcessingResult(lower_bound_timeseries, upper_bound_timeseries)
return ProcessingResult(lower_bound_timeseries, upper_bound_timeseries)
elif enable_bounds == Bound.UPPER:
return AnomalyProcessingResult(upper_bound = upper_bound_timeseries)
return ProcessingResult(upper_bound = upper_bound_timeseries)
elif enable_bounds == Bound.LOWER:
return AnomalyProcessingResult(lower_bound = lower_bound_timeseries)
return ProcessingResult(lower_bound = lower_bound_timeseries)
def add_season_to_data(self, data: pd.Series, segment: pd.Series, offset: int, seasonality: int, bound_type: Bound) -> pd.Series:
#data - smoothed data to which seasonality will be added
@ -239,22 +239,6 @@ class AnomalyDetector(ProcessingDetector):
return data[:len_smoothed_data]
def concat_processing_results(self, processing_results: List[AnomalyProcessingResult]) -> Optional[AnomalyProcessingResult]:
if len(processing_results) == 0:
return None
united_result = AnomalyProcessingResult()
for result in processing_results:
if result.lower_bound is not None:
if united_result.lower_bound is None: united_result.lower_bound = []
united_result.lower_bound.extend(result.lower_bound)
if result.upper_bound is not None:
if united_result.upper_bound is None: united_result.upper_bound = []
united_result.upper_bound.extend(result.upper_bound)
return united_result
def get_bounds_for_segment(self, segment: pd.Series) -> Tuple[pd.Series, pd.Series]:
'''
segment is divided by the median to determine its top and bottom parts

16
analytics/analytics/detectors/detector.py

@ -53,14 +53,28 @@ class ProcessingDetector(Detector):
@abstractmethod
def process_data(self, data: TimeSeries, cache: Optional[ModelCache]) -> ProcessingResult:
'''
Data processing to receive additional time series that represents detector's settings
'''
pass
def concat_processing_results(self, processing_results: List[ProcessingResult]) -> Optional[ProcessingResult]:
'''
Concatenate sequential ProcessingResults that received via
splitting dataset to chunks in analytic worker
'''
if len(processing_results) == 0:
return None
united_result = ProcessingResult()
for result in processing_results:
united_result.data.extend(result.data)
if result.lower_bound is not None:
if united_result.lower_bound is None: united_result.lower_bound = []
united_result.lower_bound.extend(result.lower_bound)
if result.upper_bound is not None:
if united_result.upper_bound is None: united_result.upper_bound = []
united_result.upper_bound.extend(result.upper_bound)
return united_result

26
analytics/analytics/detectors/threshold_detector.py

@ -6,9 +6,9 @@ import numpy as np
from typing import Optional, List
from analytic_types import ModelCache, AnalyticUnitId
from analytic_types.detector_typing import DetectionResult
from analytic_types.detector_typing import DetectionResult, ProcessingResult
from analytic_types.segment import Segment
from detectors import Detector
from detectors import ProcessingDetector
from time import time
import utils
@ -16,7 +16,7 @@ import utils
logger = log.getLogger('THRESHOLD_DETECTOR')
class ThresholdDetector(Detector):
class ThresholdDetector(ProcessingDetector):
WINDOW_SIZE = 3
@ -89,3 +89,23 @@ class ThresholdDetector(Detector):
result.cache = detection.cache
result.segments = utils.merge_intersecting_segments(result.segments, time_step)
return result
def process_data(self, dataframe: pd.DataFrame, cache: ModelCache) -> ProcessingResult:
data = dataframe['value']
value = self.get_value_from_cache(cache, 'value', required = True)
condition = self.get_value_from_cache(cache, 'condition', required = True)
if condition == 'NO_DATA':
return ProcessingResult()
data.values[:] = value
timestamps = utils.convert_series_to_timestamp_list(dataframe.timestamp)
result_series = list(zip(timestamps, data.values.tolist()))
if condition in ['>', '>=', '=']:
return ProcessingResult(upper_bound = result_series)
if condition in ['<', '<=']:
return ProcessingResult(lower_bound = result_series)
raise ValueError(f'{condition} condition not supported')

22
server/src/controllers/analytics_controller.ts

@ -30,6 +30,12 @@ type TableTimeSeries = { values: [number, number][], columns: string[] };
type TimeRange = { from: number, to: number };
export type TaskResolver = (taskResult: TaskResult) => void;
type HSRResult = {
hsr: TableTimeSeries,
lowerBound?: TableTimeSeries
upperBound?: TableTimeSeries
}
const taskResolvers = new Map<AnalyticsTaskId, TaskResolver>();
let analyticsService: AnalyticsService = undefined;
@ -631,10 +637,14 @@ export async function getHSR(
try {
const grafanaUrl = getGrafanaUrl(analyticUnit.grafanaUrl);
const data = await queryByMetric(analyticUnit.metric, grafanaUrl, from, to, HASTIC_API_KEY);
let resultSeries: HSRResult = {
hsr: data
}
if(analyticUnit.detectorType !== AnalyticUnit.DetectorType.ANOMALY) {
return { hsr: data };
if(analyticUnit.detectorType === AnalyticUnit.DetectorType.PATTERN) {
return resultSeries;
}
let cache = await AnalyticUnitCache.findById(analyticUnit.id);
if(
cache === null ||
@ -662,16 +672,12 @@ export async function getHSR(
throw new Error(`Data processing error: ${result.error}`);
}
let resultSeries = {
hsr: data
}
if(result.payload.lowerBound !== undefined) {
resultSeries['lowerBound'] = { values: result.payload.lowerBound, columns: data.columns };
resultSeries.lowerBound = { values: result.payload.lowerBound, columns: data.columns };
}
if(result.payload.upperBound !== undefined) {
resultSeries['upperBound'] = { values: result.payload.upperBound, columns: data.columns };
resultSeries.upperBound = { values: result.payload.upperBound, columns: data.columns };
}
return resultSeries;

Loading…
Cancel
Save