Browse Source

Endpoint for smoothing data #612 (#639)

pull/1/head
rozetko 6 years ago committed by GitHub
parent
commit
a56ac6ec4e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 14
      analytics/analytics/analytic_types/detector_typing.py
  2. 6
      analytics/analytics/analytic_unit_manager.py
  3. 60
      analytics/analytics/analytic_unit_worker.py
  4. 2
      analytics/analytics/detectors/__init__.py
  5. 62
      analytics/analytics/detectors/anomaly_detector.py
  6. 29
      analytics/analytics/detectors/detector.py
  7. 1
      analytics/analytics/detectors/threshold_detector.py
  8. 5
      analytics/analytics/utils/time.py
  9. 2
      analytics/tests/test_detectors.py
  10. 96
      server/src/controllers/analytics_controller.ts
  11. 2
      server/src/models/analytic_units/anomaly_analytic_unit_model.ts
  12. 3
      server/src/models/analytic_units/index.ts
  13. 3
      server/src/models/analytics_task_model.ts
  14. 8
      server/src/routes/data_router.ts
  15. 1
      server/src/services/alert_service.ts

14
analytics/analytics/analytic_types/detector_typing.py

@ -1,8 +1,9 @@
from analytic_types import ModelCache
from analytic_types.segment import Segment
from typing import List, Optional
from typing import List, Optional, Tuple
import utils.meta
class DetectionResult:
@ -27,3 +28,14 @@ class DetectionResult:
'segments': list(map(lambda segment: segment.to_json(), self.segments)),
'lastDetectionTime': self.last_detection_time
}
@utils.meta.JSONClass
class ProcessingResult():
def __init__(
self,
data: Optional[List[Tuple[int, int]]] = None
):
if data is None:
data = []
self.data = data

6
analytics/analytics/analytic_unit_manager.py

@ -19,7 +19,7 @@ def get_detector_by_type(
elif detector_type == 'threshold':
return detectors.ThresholdDetector()
elif detector_type == 'anomaly':
return detectors.AnomalyDetector()
return detectors.AnomalyDetector(analytic_unit_id)
raise ValueError('Unknown detector type "%s"' % detector_type)
@ -69,10 +69,14 @@ class AnalyticUnitManager:
return await worker.do_train(payload['segments'], data, payload['cache'])
elif 'threshold' in payload:
return await worker.do_train(payload['threshold'], data, payload['cache'])
elif 'anomaly' in payload:
return await worker.do_train(payload['anomaly'], data, payload['cache'])
else:
raise ValueError('No segments or threshold in LEARN payload')
elif task['type'] == 'DETECT':
return await worker.do_detect(data, payload['cache'])
elif task['type'] == 'PROCESS':
return await worker.process_data(data, payload['cache'])
raise ValueError('Unknown task type "%s"' % task['type'])

60
analytics/analytics/analytic_unit_worker.py

@ -2,14 +2,14 @@ import config
import detectors
import logging
import pandas as pd
from typing import Optional, Union, Generator, List
from typing import Optional, Union, Generator, List, Tuple
import concurrent.futures
import asyncio
import utils
from utils import get_intersected_chunks, get_chunks, prepare_data
from analytic_types import ModelCache
from analytic_types.detector_typing import DetectionResult
from analytic_types.detector_typing import DetectionResult, ProcessingResult
logger = logging.getLogger('AnalyticUnitWorker')
@ -52,41 +52,63 @@ class AnalyticUnitWorker:
chunk_size = window_size * self.CHUNK_WINDOW_SIZE_FACTOR
chunk_intersection = window_size * self.CHUNK_INTERSECTION_FACTOR
detection_result = DetectionResult()
detections: List[DetectionResult] = []
chunks = []
# XXX: get_chunks(data, chunk_size) == get_intersected_chunks(data, 0, chunk_size)
if self._detector.is_detection_intersected():
chunks = get_intersected_chunks(data, chunk_intersection, chunk_size)
else:
chunks = get_chunks(data, chunk_size)
for chunk in get_intersected_chunks(data, chunk_intersection, chunk_size):
for chunk in chunks:
await asyncio.sleep(0)
chunk_dataframe = prepare_data(chunk)
detected = self._detector.detect(chunk_dataframe, cache)
self.__append_detection_result(detection_result, detected)
detection_result.segments = self._detector.merge_segments(detection_result.segments)
detected: DetectionResult = self._detector.detect(chunk_dataframe, cache)
detections.append(detected)
if len(detections) == 0:
raise RuntimeError(f'do_detect for {self.analytic_unit_id} got empty detection results')
detection_result = self._detector.concat_detection_results(detections)
return detection_result.to_json()
def cancel(self):
if self._training_future is not None:
self._training_future.cancel()
async def consume_data(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> Optional[DetectionResult]:
async def consume_data(self, data: list, cache: Optional[ModelCache]) -> Optional[dict]:
window_size = self._detector.get_window_size(cache)
detection_result = DetectionResult()
detections: List[DetectionResult] = []
for chunk in get_chunks(data, window_size * self.CHUNK_WINDOW_SIZE_FACTOR):
await asyncio.sleep(0)
chunk_dataframe = prepare_data(chunk)
detected = self._detector.consume_data(chunk_dataframe, cache)
self.__append_detection_result(detection_result, detected)
if detected is not None:
detections.append(detected)
detection_result.segments = self._detector.merge_segments(detection_result.segments)
if detection_result.last_detection_time is None:
if len(detections) == 0:
return None
else:
detection_result = self._detector.concat_detection_results(detections)
return detection_result.to_json()
# TODO: move result concatenation to Detectors
def __append_detection_result(self, detection_result: DetectionResult, new_chunk: DetectionResult):
if new_chunk is not None:
detection_result.cache = new_chunk.cache
detection_result.last_detection_time = new_chunk.last_detection_time
detection_result.segments.extend(new_chunk.segments)
async def process_data(self, data: list, cache: ModelCache) -> dict:
assert isinstance(self._detector, detectors.ProcessingDetector), f'{self.analytic_unit_id} detector is not ProcessingDetector, can`t process data'
assert cache is not None, f'{self.analytic_unit_id} got empty cache for processing data'
processed_chunks: List[ProcessingResult] = []
window_size = self._detector.get_window_size(cache)
for chunk in get_chunks(data, window_size * self.CHUNK_WINDOW_SIZE_FACTOR):
await asyncio.sleep(0)
chunk_dataframe = prepare_data(chunk)
processed: ProcessingResult = self._detector.process_data(chunk_dataframe, cache)
if processed is not None:
processed_chunks.append(processed)
if len(processed_chunks) == 0:
raise RuntimeError(f'process_data for {self.analytic_unit_id} got empty processing results')
result = self._detector.concat_processing_results(processed_chunks)
return result.to_json()

2
analytics/analytics/detectors/__init__.py

@ -1,4 +1,4 @@
from detectors.detector import Detector
from detectors.detector import Detector, ProcessingDetector
from detectors.pattern_detector import PatternDetector
from detectors.threshold_detector import ThresholdDetector
from detectors.anomaly_detector import AnomalyDetector

62
analytics/analytics/detectors/anomaly_detector.py

@ -1,12 +1,13 @@
import logging
import numpy as np
import pandas as pd
from typing import Optional, Union, List, Tuple
from analytic_types import AnalyticUnitId, ModelCache
from analytic_types.detector_typing import DetectionResult
from analytic_types.detector_typing import DetectionResult, ProcessingResult
from analytic_types.data_bucket import DataBucket
from analytic_types.segment import Segment
from detectors import Detector
from detectors import Detector, ProcessingDetector
import utils
MAX_DEPENDENCY_LEVEL = 100
@ -14,9 +15,10 @@ MIN_DEPENDENCY_FACTOR = 0.1
logger = logging.getLogger('ANOMALY_DETECTOR')
class AnomalyDetector(Detector):
class AnomalyDetector(ProcessingDetector):
def __init__(self, *args, **kwargs):
def __init__(self, analytic_unit_id: AnalyticUnitId):
self.analytic_unit_id = analytic_unit_id
self.bucket = DataBucket()
def train(self, dataframe: pd.DataFrame, payload: Union[list, dict], cache: Optional[ModelCache]) -> ModelCache:
@ -27,15 +29,16 @@ class AnomalyDetector(Detector):
}
}
# TODO: ModelCache -> ModelState
def detect(self, dataframe: pd.DataFrame, cache: Optional[ModelCache]) -> DetectionResult:
data = dataframe['value']
last_values = None
last_value = None
if cache is not None:
last_values = cache.get('last_values')
last_value = cache.get('last_value')
smothed_data = utils.exponential_smoothing(data, cache['alpha'])
upper_bound = smothed_data + cache['confidence']
lower_bound = smothed_data - cache['confidence']
smoothed_data = utils.exponential_smoothing(data, cache['alpha'], last_value)
upper_bound = smoothed_data + cache['confidence']
lower_bound = smoothed_data - cache['confidence']
anomaly_indexes = []
for idx, val in enumerate(data.values):
@ -48,13 +51,34 @@ class AnomalyDetector(Detector):
utils.convert_pd_timestamp_to_ms(dataframe['timestamp'][segment[0]]),
utils.convert_pd_timestamp_to_ms(dataframe['timestamp'][segment[1]]),
) for segment in segments]
last_dataframe_time = dataframe.iloc[-1]['timestamp']
last_detection_time = utils.convert_pd_timestamp_to_ms(last_dataframe_time)
# TODO: ['lastValue'] -> .last_value
cache['lastValue'] = smoothed_data.values[-1]
return DetectionResult(cache, segments, last_detection_time)
def consume_data(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> Optional[DetectionResult]:
self.detect(data, cache)
if cache is None:
msg = f'consume_data got invalid cache {cache} for task {self.analytic_unit_id}'
logging.debug(msg)
raise ValueError(msg)
data_without_nan = data.dropna()
if len(data_without_nan) == 0:
return None
self.bucket.receive_data(data_without_nan)
if len(self.bucket.data) >= self.get_window_size(cache):
return self.detect(self.bucket, cache)
return None
def is_detection_intersected(self) -> bool:
return False
def get_window_size(self, cache: Optional[ModelCache]) -> int:
'''
@ -69,6 +93,24 @@ class AnomalyDetector(Detector):
break
return level
def concat_detection_results(self, detections: List[DetectionResult]) -> DetectionResult:
result = DetectionResult()
for detection in detections:
result.segments.extend(detection.segments)
result.last_detection_time = detection.last_detection_time
result.cache = detection.cache
result.segments = utils.merge_intersecting_segments(result.segments)
return result
# TODO: ModelCache -> ModelState
def process_data(self, data: pd.DataFrame, cache: ModelCache) -> ProcessingResult:
# TODO: exponential_smoothing should return dataframe with related timestamps
smoothed = utils.exponential_smoothing(data['value'], cache['alpha'], cache.get('lastValue'))
timestamps = utils.convert_series_to_timestamp_list(data.timestamp)
smoothed_dataset = list(zip(timestamps, smoothed.values.tolist()))
result = ProcessingResult(smoothed_dataset)
return result
def merge_segments(self, segments: List[Segment]) -> List[Segment]:
segments = utils.merge_intersecting_segments(segments)
return segments

29
analytics/analytics/detectors/detector.py

@ -3,7 +3,7 @@ from pandas import DataFrame
from typing import Optional, Union, List
from analytic_types import ModelCache
from analytic_types.detector_typing import DetectionResult
from analytic_types.detector_typing import DetectionResult, ProcessingResult
from analytic_types.segment import Segment
@ -28,5 +28,32 @@ class Detector(ABC):
def get_window_size(self, cache: Optional[ModelCache]) -> int:
pass
def is_detection_intersected(self) -> bool:
return True
def concat_detection_results(self, detections: List[DetectionResult]) -> DetectionResult:
result = DetectionResult()
for detection in detections:
result.segments.extend(detection.segments)
result.last_detection_time = detection.last_detection_time
result.cache = detection.cache
return result
def merge_segments(self, segments: List[Segment]) -> List[Segment]:
return segments
class ProcessingDetector(Detector):
@abstractmethod
def process_data(self, data, cache: Optional[ModelCache]) -> ProcessingResult:
pass
def concat_processing_results(self, processing_results: List[ProcessingResult]) -> Optional[ProcessingResult]:
if len(processing_results) == 0:
return None
united_result = ProcessingResult()
for result in processing_results:
united_result.data.extend(result.data)
return united_result

1
analytics/analytics/detectors/threshold_detector.py

@ -44,7 +44,6 @@ class ThresholdDetector(Detector):
current_value = row['value']
current_timestamp = convert_pd_timestamp_to_ms(row['timestamp'])
segment = Segment(current_timestamp, current_timestamp)
# TODO: merge segments
if pd.isnull(current_value):
if condition == 'NO_DATA':

5
analytics/analytics/utils/time.py

@ -1,4 +1,5 @@
import pandas as pd
from typing import List
def convert_sec_to_ms(sec) -> int:
return int(sec) * 1000
@ -6,3 +7,7 @@ def convert_sec_to_ms(sec) -> int:
def convert_pd_timestamp_to_ms(timestamp: pd.Timestamp) -> int:
# TODO: convert from nanoseconds to millisecond in a better way: not by dividing by 10^6
return int(timestamp.value) / 1000000
def convert_series_to_timestamp_list(series: pd.Series) -> List[int]:
timestamps = map(lambda value: convert_pd_timestamp_to_ms(value), series)
return list(timestamps)

2
analytics/tests/test_detectors.py

@ -42,7 +42,7 @@ class TestAnomalyDetector(unittest.TestCase):
'confidence': 2,
'alpha': 0.1,
}
detector = anomaly_detector.AnomalyDetector()
detector = anomaly_detector.AnomalyDetector('test_id')
detect_result = detector.detect(dataframe, cache)
result = [{ 'from': 1523889000005.0, 'to': 1523889000005.0 }]
self.assertEqual(result, detect_result.to_json()['segments'])

96
server/src/controllers/analytics_controller.ts

@ -17,6 +17,7 @@ import { queryByMetric, GrafanaUnavailable, DatasourceUnavailable } from 'grafan
import * as _ from 'lodash';
import { WebhookType } from '../services/notification_service';
import { AnomalyAnalyticUnit } from '../models/analytic_units/anomaly_analytic_unit_model';
const SECONDS_IN_MINUTE = 60;
@ -105,7 +106,7 @@ export function terminate() {
alertService.stopAlerting();
}
async function runTask(task: AnalyticsTask): Promise<TaskResult> {
export async function runTask(task: AnalyticsTask): Promise<TaskResult> {
return new Promise<TaskResult>((resolver: TaskResolver) => {
taskResolvers.set(task.id, resolver); // it will be resolved in onTaskResult()
analyticsService.sendTask(task); // we dont wait for result here
@ -126,7 +127,10 @@ async function getQueryRange(
return getQueryRangeForLearningBySegments(segments);
}
if(detectorType === AnalyticUnit.DetectorType.THRESHOLD) {
if(
detectorType === AnalyticUnit.DetectorType.THRESHOLD ||
detectorType === AnalyticUnit.DetectorType.ANOMALY
) {
const now = Date.now();
return {
from: now - 5 * SECONDS_IN_MINUTE * 1000,
@ -215,23 +219,34 @@ export async function runLearning(id: AnalyticUnit.AnalyticUnitId, from?: number
const detector = analyticUnit.detectorType;
let taskPayload: any = { detector, analyticUnitType, cache: oldCache };
if(detector === AnalyticUnit.DetectorType.PATTERN) {
let segments = await Segment.findMany(id, { labeled: true });
if(segments.length === 0) {
throw new Error('Need at least 1 labeled segment');
}
let segmentObjs = segments.map(s => s.toObject());
let deletedSegments = await Segment.findMany(id, { deleted: true });
let deletedSegmentsObjs = deletedSegments.map(s => s.toObject());
segmentObjs = _.concat(segmentObjs, deletedSegmentsObjs);
taskPayload.segments = segmentObjs;
} else if(detector === AnalyticUnit.DetectorType.THRESHOLD) {
taskPayload.threshold = {
value: (analyticUnit as ThresholdAnalyticUnit).value,
condition: (analyticUnit as ThresholdAnalyticUnit).condition
};
switch(detector) {
case AnalyticUnit.DetectorType.PATTERN:
let segments = await Segment.findMany(id, { labeled: true });
if(segments.length === 0) {
throw new Error('Need at least 1 labeled segment');
}
let segmentObjs = segments.map(s => s.toObject());
let deletedSegments = await Segment.findMany(id, { deleted: true });
let deletedSegmentsObjs = deletedSegments.map(s => s.toObject());
segmentObjs = _.concat(segmentObjs, deletedSegmentsObjs);
taskPayload.segments = segmentObjs;
break;
case AnalyticUnit.DetectorType.THRESHOLD:
taskPayload.threshold = {
value: (analyticUnit as ThresholdAnalyticUnit).value,
condition: (analyticUnit as ThresholdAnalyticUnit).condition
};
break;
case AnalyticUnit.DetectorType.ANOMALY:
taskPayload.anomaly = {
alpha: (analyticUnit as AnomalyAnalyticUnit).alpha,
confidence: (analyticUnit as AnomalyAnalyticUnit).confidence
};
break;
default:
throw new Error(`Unknown type of detector: ${detector}`);
}
let range: TimeRange;
@ -558,3 +573,46 @@ async function runDetectionOnExtendedSpan(
await Detection.insertSpan(detection);
return detection;
}
export async function getHSR(analyticUnit: AnalyticUnit.AnalyticUnit, from: number, to: number) {
try {
const grafanaUrl = getGrafanaUrl(analyticUnit.grafanaUrl);
const data = await queryByMetric(analyticUnit.metric, grafanaUrl, from, to, HASTIC_API_KEY);
if(analyticUnit.detectorType !== AnalyticUnit.DetectorType.ANOMALY) {
return data;
} else {
let cache = await AnalyticUnitCache.findById(analyticUnit.id);
if(
cache === null ||
cache.data.alpha !== (analyticUnit as AnalyticUnit.AnomalyAnalyticUnit).alpha ||
cache.data.confidence !== (analyticUnit as AnalyticUnit.AnomalyAnalyticUnit).confidence
) {
await runLearning(analyticUnit.id, from, to);
cache = await AnalyticUnitCache.findById(analyticUnit.id);
}
cache = cache.data;
const analyticUnitType = analyticUnit.type;
const detector = analyticUnit.detectorType;
const payload = {
data: data.values,
analyticUnitType,
detector,
cache
};
const processingTask = new AnalyticsTask(analyticUnit.id, AnalyticsTaskType.PROCESS, payload);
const result = await runTask(processingTask);
if(result.status !== AnalyticUnit.AnalyticUnitStatus.SUCCESS) {
throw new Error(`Data processing error: ${result.error}`);
}
return { values: result.payload.data, columns: data.columns }
}
} catch (err) {
const message = err.message || JSON.stringify(err);
await AnalyticUnit.setStatus(analyticUnit.id, AnalyticUnit.AnalyticUnitStatus.FAILED, message);
throw new Error(message);
}
}

2
server/src/models/analytic_units/anomaly_analytic_unit_model.ts

@ -35,7 +35,7 @@ export class AnomalyAnalyticUnit extends AnalyticUnit {
error,
labeledColor,
deletedColor,
DetectorType.THRESHOLD,
DetectorType.ANOMALY,
visible
);
}

3
server/src/models/analytic_units/index.ts

@ -3,6 +3,7 @@ import { AnalyticUnitId, AnalyticUnitStatus, DetectorType, ANALYTIC_UNIT_TYPES }
import { AnalyticUnit } from './analytic_unit_model';
import { PatternAnalyticUnit } from './pattern_analytic_unit_model';
import { ThresholdAnalyticUnit } from './threshold_analytic_unit_model';
import { AnomalyAnalyticUnit } from './anomaly_analytic_unit_model';
import {
findById,
findMany,
@ -17,7 +18,7 @@ import {
export {
AnalyticUnit, PatternAnalyticUnit, ThresholdAnalyticUnit,
AnalyticUnit, PatternAnalyticUnit, ThresholdAnalyticUnit, AnomalyAnalyticUnit,
AnalyticUnitId, AnalyticUnitStatus, DetectorType, ANALYTIC_UNIT_TYPES,
createAnalyticUnitFromObject,
findById, findMany,

3
server/src/models/analytics_task_model.ts

@ -10,7 +10,8 @@ export enum AnalyticsTaskType {
LEARN = 'LEARN',
DETECT = 'DETECT',
CANCEL = 'CANCEL',
PUSH = 'PUSH'
PUSH = 'PUSH',
PROCESS = 'PROCESS'
};
export class AnalyticsTask {

8
server/src/routes/data_router.ts

@ -1,8 +1,5 @@
import * as AnalyticUnit from '../models/analytic_units';
import { HASTIC_API_KEY } from '../config';
import { getGrafanaUrl } from '../utils/grafana';
import { queryByMetric } from 'grafana-datasource-kit';
import * as AnalyticsController from '../controllers/analytics_controller';
import * as Router from 'koa-router';
@ -46,8 +43,7 @@ async function query(ctx: Router.IRouterContext) {
throw new Error(`can't find analytic unit ${analyticUnitId}`);
}
const grafanaUrl = getGrafanaUrl(analyticUnit.grafanaUrl);
const results = await queryByMetric(analyticUnit.metric, grafanaUrl, from, to, HASTIC_API_KEY);
const results = await AnalyticsController.getHSR(analyticUnit, from, to);
ctx.response.body = { results };
}

1
server/src/services/alert_service.ts

@ -134,6 +134,7 @@ export class AlertService {
alertsType[AnalyticUnit.DetectorType.THRESHOLD] = ThresholdAlert;
alertsType[AnalyticUnit.DetectorType.PATTERN] = PatternAlert;
alertsType[AnalyticUnit.DetectorType.ANOMALY] = Alert;
this._alerts[analyticUnit.id] = new alertsType[detector](analyticUnit);
}

Loading…
Cancel
Save