Browse Source

Anomaly analytic unit: send confidence bounds instead of smoothed data #656 (#657)

pull/1/head
rozetko 5 years ago committed by GitHub
parent
commit
43f6306744
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      analytics/analytics/analytic_types/__init__.py
  2. 20
      analytics/analytics/analytic_types/detector_typing.py
  3. 10
      analytics/analytics/analytic_unit_worker.py
  4. 22
      analytics/analytics/detectors/anomaly_detector.py
  5. 21
      server/src/controllers/analytics_controller.ts

5
analytics/analytics/analytic_types/__init__.py

@ -10,12 +10,15 @@ like PatternDetectionCache, then it should not be here.
"""
import pandas as pd
from typing import Union, List
from typing import Union, List, Tuple
AnalyticUnitId = str
ModelCache = dict
# TODO: explicit timestamp / value
TimeSeries = [List[Tuple[int, int]]]
"""
Example:

20
analytics/analytics/analytic_types/detector_typing.py

@ -1,4 +1,4 @@
from analytic_types import ModelCache
from analytic_types import ModelCache, TimeSeries
from analytic_types.segment import Segment
from typing import List, Optional, Tuple
@ -34,8 +34,24 @@ class ProcessingResult():
def __init__(
self,
data: Optional[List[Tuple[int, int]]] = None
data: Optional[TimeSeries] = None
):
if data is None:
data = []
self.data = data
@utils.meta.JSONClass
class AnomalyProcessingResult():
def __init__(
self,
lower_bound: Optional[TimeSeries] = None,
upper_bound: Optional[TimeSeries] = None,
):
if lower_bound is None:
lower_bound = []
self.lower_bound = lower_bound
if upper_bound is None:
upper_bound = []
self.upper_bound = upper_bound

10
analytics/analytics/analytic_unit_worker.py

@ -9,7 +9,7 @@ import utils
from utils import get_intersected_chunks, get_chunks, prepare_data
from analytic_types import ModelCache
from analytic_types.detector_typing import DetectionResult, ProcessingResult
from analytic_types.detector_typing import DetectionResult
logger = logging.getLogger('AnalyticUnitWorker')
@ -95,20 +95,22 @@ class AnalyticUnitWorker:
return detection_result.to_json()
async def process_data(self, data: list, cache: ModelCache) -> dict:
assert isinstance(self._detector, detectors.ProcessingDetector), f'{self.analytic_unit_id} detector is not ProcessingDetector, can`t process data'
assert isinstance(self._detector, detectors.ProcessingDetector), \
f'{self.analytic_unit_id} detector is not ProcessingDetector, can`t process data'
assert cache is not None, f'{self.analytic_unit_id} got empty cache for processing data'
processed_chunks: List[ProcessingResult] = []
processed_chunks = []
window_size = self._detector.get_window_size(cache)
for chunk in get_chunks(data, window_size * self.CHUNK_WINDOW_SIZE_FACTOR):
await asyncio.sleep(0)
chunk_dataframe = prepare_data(chunk)
processed: ProcessingResult = self._detector.process_data(chunk_dataframe, cache)
processed = self._detector.process_data(chunk_dataframe, cache)
if processed is not None:
processed_chunks.append(processed)
if len(processed_chunks) == 0:
raise RuntimeError(f'process_data for {self.analytic_unit_id} got empty processing results')
# TODO: maybe we should process all chunks inside of detector?
result = self._detector.concat_processing_results(processed_chunks)
return result.to_json()

22
analytics/analytics/detectors/anomaly_detector.py

@ -4,7 +4,7 @@ import pandas as pd
from typing import Optional, Union, List, Tuple
from analytic_types import AnalyticUnitId, ModelCache
from analytic_types.detector_typing import DetectionResult, ProcessingResult
from analytic_types.detector_typing import DetectionResult, AnomalyProcessingResult
from analytic_types.data_bucket import DataBucket
from analytic_types.segment import Segment
from detectors import Detector, ProcessingDetector
@ -167,13 +167,15 @@ class AnomalyDetector(ProcessingDetector):
return result
# TODO: ModelCache -> ModelState (don't use string literals)
def process_data(self, dataframe: pd.DataFrame, cache: ModelCache) -> ProcessingResult:
def process_data(self, dataframe: pd.DataFrame, cache: ModelCache) -> AnomalyProcessingResult:
segments = cache.get('segments')
# TODO: exponential_smoothing should return dataframe with related timestamps
smoothed = utils.exponential_smoothing(dataframe['value'], cache['alpha'], cache.get('lastValue'))
# TODO: remove duplication with detect()
upper_bound = dataframe['value'] + cache['confidence']
lower_bound = dataframe['value'] - cache['confidence']
if segments is not None:
seasonality = cache.get('seasonality')
assert seasonality is not None and seasonality > 0, \
@ -201,8 +203,9 @@ class AnomalyDetector(ProcessingDetector):
lower_bound = lower_seasonality_curve - cache['confidence']
timestamps = utils.convert_series_to_timestamp_list(dataframe.timestamp)
smoothed_dataset = list(zip(timestamps, smoothed.values.tolist()))
result = ProcessingResult(smoothed_dataset)
upper_bound_timeseries = list(zip(timestamps, upper_bound.values.tolist()))
lower_bound_timeseries = list(zip(timestamps, lower_bound.values.tolist()))
result = AnomalyProcessingResult(upper_bound_timeseries, lower_bound_timeseries)
return result
def add_season_to_data(self, data: pd.Series, segment: pd.Series, offset: int, seasonality: int, addition: bool) -> pd.Series:
@ -219,3 +222,14 @@ class AnomalyDetector(ProcessingDetector):
else:
data = data.add(pd.Series(segment.values * -1, index = segment.index + idx), fill_value = 0)
return data[:len_smoothed_data]
def concat_processing_results(self, processing_results: List[AnomalyProcessingResult]) -> Optional[AnomalyProcessingResult]:
if len(processing_results) == 0:
return None
united_result = AnomalyProcessingResult()
for result in processing_results:
united_result.lower_bound.extend(result.lower_bound)
united_result.upper_bound.extend(result.upper_bound)
return united_result

21
server/src/controllers/analytics_controller.ts

@ -23,6 +23,9 @@ const SECONDS_IN_MINUTE = 60;
type TaskResult = any;
type DetectionResult = any;
// TODO: move TableTimeSeries to grafana-datasource-kit
// TODO: TableTimeSeries is bad name
type TableTimeSeries = { values: [number, number][], columns: string[] };
// TODO: move type definitions somewhere
type TimeRange = { from: number, to: number };
export type TaskResolver = (taskResult: TaskResult) => void;
@ -600,7 +603,15 @@ async function runDetectionOnExtendedSpan(
return detection;
}
export async function getHSR(analyticUnit: AnalyticUnit.AnalyticUnit, from: number, to: number) {
export async function getHSR(
analyticUnit: AnalyticUnit.AnalyticUnit,
from: number,
to: number
): Promise<{
hsr: TableTimeSeries,
lowerBound?: TableTimeSeries,
upperBound?: TableTimeSeries
}> {
try {
const grafanaUrl = getGrafanaUrl(analyticUnit.grafanaUrl);
const data = await queryByMetric(analyticUnit.metric, grafanaUrl, from, to, HASTIC_API_KEY);
@ -619,7 +630,7 @@ export async function getHSR(analyticUnit: AnalyticUnit.AnalyticUnit, from: numb
}
cache = cache.data;
const analyticUnitType = analyticUnit.type;
const detector = analyticUnit.detectorType;
const payload = {
@ -634,7 +645,11 @@ export async function getHSR(analyticUnit: AnalyticUnit.AnalyticUnit, from: numb
if(result.status !== AnalyticUnit.AnalyticUnitStatus.SUCCESS) {
throw new Error(`Data processing error: ${result.error}`);
}
return { hsr: data, smoothed: { values: result.payload.data, columns: data.columns } };
return {
hsr: data,
lowerBound: { values: result.payload.lowerBound, columns: data.columns },
upperBound: { values: result.payload.upperBound, columns: data.columns }
};
} catch (err) {
const message = err.message || JSON.stringify(err);
await AnalyticUnit.setStatus(analyticUnit.id, AnalyticUnit.AnalyticUnitStatus.FAILED, message);

Loading…
Cancel
Save