diff --git a/analytics/analytics/analytic_unit_manager.py b/analytics/analytics/analytic_unit_manager.py index b1ba0be..d4478d7 100644 --- a/analytics/analytics/analytic_unit_manager.py +++ b/analytics/analytics/analytic_unit_manager.py @@ -57,7 +57,7 @@ class AnalyticUnitManager: payload = task['payload'] worker = self.__ensure_worker(analytic_unit_id, payload['detector'], payload['analyticUnitType']) - data = payload['data'] + data = payload.get('data') if task['type'] == 'PUSH': # TODO: do it a better way res = await worker.consume_data(data, payload['cache']) diff --git a/analytics/analytics/detectors/anomaly_detector.py b/analytics/analytics/detectors/anomaly_detector.py index 32f0617..863ac8d 100644 --- a/analytics/analytics/detectors/anomaly_detector.py +++ b/analytics/analytics/detectors/anomaly_detector.py @@ -12,6 +12,7 @@ import utils MAX_DEPENDENCY_LEVEL = 100 MIN_DEPENDENCY_FACTOR = 0.1 +BASIC_ALPHA = 0.5 logger = logging.getLogger('ANOMALY_DETECTOR') @@ -22,24 +23,83 @@ class AnomalyDetector(ProcessingDetector): self.bucket = DataBucket() def train(self, dataframe: pd.DataFrame, payload: Union[list, dict], cache: Optional[ModelCache]) -> ModelCache: + segments = payload.get('segments') + prepared_segments = [] + + new_cache = { + 'confidence': payload['confidence'], + 'alpha': payload['alpha'] + } + + if segments is not None: + seasonality = payload.get('seasonality') + assert seasonality is not None and seasonality > 0, \ + f'{self.analytic_unit_id} got invalid seasonality {seasonality}' + + for segment in segments: + segment_len = (int(segment['to']) - int(segment['from'])) + assert segment_len <= seasonality, \ + f'seasonality {seasonality} must be great then segment length {segment_len}' + + from_index = utils.timestamp_to_index(dataframe, pd.to_datetime(segment['from'], unit='ms')) + to_index = utils.timestamp_to_index(dataframe, pd.to_datetime(segment['to'], unit='ms')) + segment_data = dataframe[from_index : to_index] + prepared_segments.append({'from': segment['from'], 'data': segment_data.value.tolist()}) + + data_start_time = utils.convert_pd_timestamp_to_ms(dataframe['timestamp'][0]) + data_second_time = utils.convert_pd_timestamp_to_ms(dataframe['timestamp'][1]) + time_step = data_second_time - data_start_time + new_cache['seasonality'] = seasonality + new_cache['segments'] = prepared_segments + new_cache['timeStep'] = time_step + return { - 'cache': { - 'confidence': payload['confidence'], - 'alpha': payload['alpha'] - } + 'cache': new_cache } # TODO: ModelCache -> ModelState def detect(self, dataframe: pd.DataFrame, cache: Optional[ModelCache]) -> DetectionResult: data = dataframe['value'] + segments = cache.get('segments') + last_value = None if cache is not None: last_value = cache.get('last_value') smoothed_data = utils.exponential_smoothing(data, cache['alpha'], last_value) + + # TODO: use class for cache to avoid using string literals upper_bound = smoothed_data + cache['confidence'] lower_bound = smoothed_data - cache['confidence'] + if segments is not None: + + seasonality = cache.get('seasonality') + assert seasonality is not None and seasonality > 0, \ + f'{self.analytic_unit_id} got invalid seasonality {seasonality}' + + data_start_time = utils.convert_pd_timestamp_to_ms(dataframe['timestamp'][0]) + data_second_time = utils.convert_pd_timestamp_to_ms(dataframe['timestamp'][1]) + time_step = data_second_time - data_start_time + + for segment in segments: + seasonality_offset = (abs(segment['from'] - data_start_time) % seasonality) // time_step + seasonality_index = seasonality // time_step + #TODO: upper and lower bounds for segment_data + segment_data = utils.exponential_smoothing(pd.Series(segment['data']), BASIC_ALPHA) + upper_seasonality_curve = self.add_season_to_data( + smoothed_data, segment_data, seasonality_offset, seasonality_index, True + ) + lower_seasonality_curve = self.add_season_to_data( + smoothed_data, segment_data, seasonality_offset, seasonality_index, False + ) + assert len(smoothed_data) == len(upper_seasonality_curve), \ + f'len smoothed {len(smoothed_data)} != len seasonality {len(upper_seasonality_curve)}' + + # TODO: use class for cache to avoid using string literals + upper_bound = upper_seasonality_curve + cache['confidence'] + lower_bound = lower_seasonality_curve - cache['confidence'] + anomaly_indexes = [] for idx, val in enumerate(data.values): if val > upper_bound.values[idx] or val < lower_bound.values[idx]: @@ -91,7 +151,11 @@ class AnomalyDetector(ProcessingDetector): for level in range(1, MAX_DEPENDENCY_LEVEL): if (1 - cache['alpha']) ** level < MIN_DEPENDENCY_FACTOR: break - return level + + seasonality = 0 + if cache.get('segments') is not None and cache['seasonality'] > 0: + seasonality = cache['seasonality'] // cache['timeStep'] + return max(level, seasonality) def concat_detection_results(self, detections: List[DetectionResult]) -> DetectionResult: result = DetectionResult() @@ -102,15 +166,56 @@ class AnomalyDetector(ProcessingDetector): result.segments = utils.merge_intersecting_segments(result.segments) return result - # TODO: ModelCache -> ModelState - def process_data(self, data: pd.DataFrame, cache: ModelCache) -> ProcessingResult: + # TODO: ModelCache -> ModelState (don't use string literals) + def process_data(self, dataframe: pd.DataFrame, cache: ModelCache) -> ProcessingResult: + segments = cache.get('segments') + # TODO: exponential_smoothing should return dataframe with related timestamps - smoothed = utils.exponential_smoothing(data['value'], cache['alpha'], cache.get('lastValue')) - timestamps = utils.convert_series_to_timestamp_list(data.timestamp) + smoothed = utils.exponential_smoothing(dataframe['value'], cache['alpha'], cache.get('lastValue')) + + # TODO: remove duplication with detect() + if segments is not None: + seasonality = cache.get('seasonality') + assert seasonality is not None and seasonality > 0, \ + f'{self.analytic_unit_id} got invalid seasonality {seasonality}' + + data_start_time = utils.convert_pd_timestamp_to_ms(dataframe['timestamp'][0]) + time_step = utils.convert_pd_timestamp_to_ms(dataframe['timestamp'][1]) - utils.convert_pd_timestamp_to_ms(dataframe['timestamp'][0]) + + for segment in segments: + seasonality_offset = (abs(segment['from'] - data_start_time) % seasonality) // time_step + seasonality_index = seasonality // time_step + segment_data = utils.exponential_smoothing(pd.Series(segment['data']), BASIC_ALPHA) + upper_seasonality_curve = self.add_season_to_data( + smoothed, segment_data, seasonality_offset, seasonality_index, True + ) + lower_seasonality_curve = self.add_season_to_data( + smoothed, segment_data, seasonality_offset, seasonality_index, False + ) + assert len(smoothed) == len(upper_seasonality_curve), \ + f'len smoothed {len(smoothed)} != len seasonality {len(upper_seasonality_curve)}' + smoothed = upper_seasonality_curve + + # TODO: support multiple segments + upper_bound = upper_seasonality_curve + cache['confidence'] + lower_bound = lower_seasonality_curve - cache['confidence'] + + timestamps = utils.convert_series_to_timestamp_list(dataframe.timestamp) smoothed_dataset = list(zip(timestamps, smoothed.values.tolist())) result = ProcessingResult(smoothed_dataset) return result - def merge_segments(self, segments: List[Segment]) -> List[Segment]: - segments = utils.merge_intersecting_segments(segments) - return segments + def add_season_to_data(self, data: pd.Series, segment: pd.Series, offset: int, seasonality: int, addition: bool) -> pd.Series: + #data - smoothed data to which seasonality will be added + #if addition == True -> segment is added + #if addition == False -> segment is subtracted + len_smoothed_data = len(data) + for idx, _ in enumerate(data): + if idx - offset < 0: + continue + if (idx - offset) % seasonality == 0: + if addition: + data = data.add(pd.Series(segment.values, index = segment.index + idx), fill_value = 0) + else: + data = data.add(pd.Series(segment.values * -1, index = segment.index + idx), fill_value = 0) + return data[:len_smoothed_data] diff --git a/analytics/analytics/detectors/detector.py b/analytics/analytics/detectors/detector.py index e8a2344..3a146b8 100644 --- a/analytics/analytics/detectors/detector.py +++ b/analytics/analytics/detectors/detector.py @@ -39,8 +39,6 @@ class Detector(ABC): result.cache = detection.cache return result - def merge_segments(self, segments: List[Segment]) -> List[Segment]: - return segments class ProcessingDetector(Detector): diff --git a/analytics/analytics/utils/time.py b/analytics/analytics/utils/time.py index 14710b0..39b69d6 100644 --- a/analytics/analytics/utils/time.py +++ b/analytics/analytics/utils/time.py @@ -6,7 +6,7 @@ def convert_sec_to_ms(sec) -> int: def convert_pd_timestamp_to_ms(timestamp: pd.Timestamp) -> int: # TODO: convert from nanoseconds to millisecond in a better way: not by dividing by 10^6 - return int(timestamp.value) / 1000000 + return int(timestamp.value) // 1000000 def convert_series_to_timestamp_list(series: pd.Series) -> List[int]: timestamps = map(lambda value: convert_pd_timestamp_to_ms(value), series) diff --git a/server/src/controllers/analytics_controller.ts b/server/src/controllers/analytics_controller.ts index ec53a3d..6329f4c 100644 --- a/server/src/controllers/analytics_controller.ts +++ b/server/src/controllers/analytics_controller.ts @@ -117,9 +117,11 @@ async function getQueryRange( analyticUnitId: AnalyticUnit.AnalyticUnitId, detectorType: AnalyticUnit.DetectorType ): Promise { - if(detectorType === AnalyticUnit.DetectorType.PATTERN) { - // TODO: find labeled OR deleted segments to generate timerange - const segments = await Segment.findMany(analyticUnitId, { labeled: true }); + if( + detectorType === AnalyticUnit.DetectorType.PATTERN || + detectorType === AnalyticUnit.DetectorType.ANOMALY + ) { + const segments = await Segment.findMany(analyticUnitId, { $or: { labeled: true, deleted: true } }); if(segments.length === 0) { throw new Error('Need at least 1 labeled segment'); } @@ -127,10 +129,7 @@ async function getQueryRange( return getQueryRangeForLearningBySegments(segments); } - if( - detectorType === AnalyticUnit.DetectorType.THRESHOLD || - detectorType === AnalyticUnit.DetectorType.ANOMALY - ) { + if(detectorType === AnalyticUnit.DetectorType.THRESHOLD) { const now = Date.now(); return { from: now - 5 * SECONDS_IN_MINUTE * 1000, @@ -199,7 +198,7 @@ function getQueryRangeForLearningBySegments(segments: Segment.Segment[]) { } export async function runLearning(id: AnalyticUnit.AnalyticUnitId, from?: number, to?: number) { - console.log('learning started...'); + console.log(`LEARNING started for ${id}`); try { let analyticUnit = await AnalyticUnit.findById(id); @@ -235,6 +234,7 @@ export async function runLearning(id: AnalyticUnit.AnalyticUnitId, from?: number let deletedSegmentsObjs = deletedSegments.map(s => s.toObject()); segmentObjs = _.concat(segmentObjs, deletedSegmentsObjs); taskPayload.segments = segmentObjs; + taskPayload.data = await getPayloadData(analyticUnit, from, to); break; case AnalyticUnit.DetectorType.THRESHOLD: taskPayload.threshold = { @@ -247,24 +247,30 @@ export async function runLearning(id: AnalyticUnit.AnalyticUnitId, from?: number alpha: (analyticUnit as AnomalyAnalyticUnit).alpha, confidence: (analyticUnit as AnomalyAnalyticUnit).confidence }; + + const seasonality = (analyticUnit as AnomalyAnalyticUnit).seasonality; + if(seasonality > 0) { + let segments = await Segment.findMany(id, { deleted: true }); + if(segments.length === 0) { + console.log('Need at least 1 labeled segment, ignore seasonality'); + break; + } + taskPayload.anomaly.seasonality = seasonality; + + let segmentObjs = segments.map(s => s.toObject()); + taskPayload.anomaly.segments = segmentObjs; + taskPayload.data = await getPayloadData(analyticUnit, from, to); + } break; default: throw new Error(`Unknown type of detector: ${detector}`); } - let range: TimeRange; - if(from !== undefined && to !== undefined) { - range = { from, to }; - } else { - range = await getQueryRange(id, detector); - } - taskPayload.data = await query(analyticUnit, range); - let task = new AnalyticsTask( id, AnalyticsTaskType.LEARN, taskPayload ); AnalyticUnit.setStatus(id, AnalyticUnit.AnalyticUnitStatus.LEARNING); - console.log(`run task, id:${id}`); + console.log(`run ${task.type} task, id:${id}`); let result = await runTask(task); if(result.status !== AnalyticUnit.AnalyticUnitStatus.SUCCESS) { throw new Error(result.error); @@ -502,11 +508,6 @@ export async function runLearningWithDetection( ): Promise { // TODO: move setting status somehow "inside" learning await AnalyticUnit.setStatus(id, AnalyticUnit.AnalyticUnitStatus.PENDING); - const foundSegments = await Segment.findMany(id, { labeled: false, deleted: false }); - if(foundSegments !== null) { - await Segment.removeSegments(foundSegments.map(segment => segment.id)); - } - await Detection.clearSpans(id); runLearning(id, from, to) .then(() => runDetect(id, from, to)) .catch(err => console.error(err)); @@ -559,6 +560,20 @@ export async function getDetectionSpans( return _.concat(readySpans, alreadyRunningSpans, newRunningSpans.filter(span => span !== null)); } +async function getPayloadData( + analyticUnit: AnalyticUnit.AnalyticUnit, + from: number, + to:number +) { + let range: TimeRange; + if(from !== undefined && to !== undefined) { + range = { from, to }; + } else { + range = await getQueryRange(analyticUnit.id, analyticUnit.detectorType); + } + return await query(analyticUnit, range); +} + async function runDetectionOnExtendedSpan( analyticUnitId: AnalyticUnit.AnalyticUnitId, from: number, diff --git a/server/src/models/analytic_units/anomaly_analytic_unit_model.ts b/server/src/models/analytic_units/anomaly_analytic_unit_model.ts index e6ad720..da38566 100644 --- a/server/src/models/analytic_units/anomaly_analytic_unit_model.ts +++ b/server/src/models/analytic_units/anomaly_analytic_unit_model.ts @@ -3,7 +3,10 @@ import { AnalyticUnitId, AnalyticUnitStatus, DetectorType } from './types'; import { Metric } from 'grafana-datasource-kit'; - +type SeasonalityPeriod = { + unit: string, + value: number +} export class AnomalyAnalyticUnit extends AnalyticUnit { public learningAfterUpdateRequired = true; @@ -15,6 +18,8 @@ export class AnomalyAnalyticUnit extends AnalyticUnit { type: string, public alpha: number, public confidence: number, + public seasonality: number, //seasonality in ms + private seasonalityPeriod: SeasonalityPeriod, metric?: Metric, alert?: boolean, id?: AnalyticUnitId, @@ -48,7 +53,9 @@ export class AnomalyAnalyticUnit extends AnalyticUnit { return { ...baseObject, alpha: this.alpha, - confidence: this.confidence + confidence: this.confidence, + seasonality: this.seasonality, + seasonalityPeriod: this.seasonalityPeriod }; } @@ -57,7 +64,9 @@ export class AnomalyAnalyticUnit extends AnalyticUnit { return { ...baseObject, alpha: this.alpha, - confidence: this.confidence + confidence: this.confidence, + seasonality: this.seasonality, + seasonalityPeriod: this.seasonalityPeriod }; } @@ -75,6 +84,8 @@ export class AnomalyAnalyticUnit extends AnalyticUnit { obj.type, obj.alpha, obj.confidence, + obj.seasonality, + obj.seasonalityPeriod, metric, obj.alert, obj._id, diff --git a/server/src/models/segment_model.ts b/server/src/models/segment_model.ts index e3cd652..7d58329 100644 --- a/server/src/models/segment_model.ts +++ b/server/src/models/segment_model.ts @@ -68,6 +68,7 @@ export class Segment { } export type FindManyQuery = { + $or?: any, timeFromGTE?: number, timeToLTE?: number, intexGT?: number, diff --git a/server/src/services/analytics_service.ts b/server/src/services/analytics_service.ts index 21a612a..4c3e32e 100644 --- a/server/src/services/analytics_service.ts +++ b/server/src/services/analytics_service.ts @@ -212,9 +212,9 @@ export class AnalyticsService { try { response = JSON.parse(text); } catch (e) { - console.error("Can`t parse response from analytics as json:"); + console.error('Can`t parse response from analytics as json:'); console.error(text); - throw new Error('Unexpected response'); + throw new Error('Can`t parse response from analytics as json, see log'); } this._onMessage(AnalyticsMessage.fromObject(response)); }