From c8f4340f7cf9d43231535c91b6f23b081789321d Mon Sep 17 00:00:00 2001 From: Evgeny Smyshlyaev Date: Thu, 1 Aug 2019 12:34:06 +0300 Subject: [PATCH] Error: skip detection: data length less than window_size (#723) * fix * fix * fix * Update server/src/controllers/analytics_controller.ts * fix * fix * fix according review * Update server/spec/analytic_controller.jest.ts Co-Authored-By: Alexey Velikiy * fix * Update analytics/analytics/detectors/pattern_detector.py Co-Authored-By: Alexey Velikiy --- .../analytics/detectors/pattern_detector.py | 11 ++- analytics/analytics/models/model.py | 2 + server/spec/analytic_controller.jest.ts | 90 +++++++++++++++++++ .../src/controllers/analytics_controller.ts | 12 +++ .../src/models/analytic_unit_cache_model.ts | 8 +- 5 files changed, 118 insertions(+), 5 deletions(-) create mode 100644 server/spec/analytic_controller.jest.ts diff --git a/analytics/analytics/detectors/pattern_detector.py b/analytics/analytics/detectors/pattern_detector.py index 936e6bc..ad3769d 100644 --- a/analytics/analytics/detectors/pattern_detector.py +++ b/analytics/analytics/detectors/pattern_detector.py @@ -48,8 +48,13 @@ class PatternDetector(Detector): def train(self, dataframe: pd.DataFrame, segments: List[dict], cache: Optional[ModelCache]) -> ModelCache: # TODO: pass only part of dataframe that has segments - self.model.state = self.model.get_state(cache) - new_cache = self.model.fit(dataframe, segments, self.analytic_unit_id) + self.model.state: models.ModelState = self.model.get_state(cache) + new_cache: models.ModelState = self.model.fit(dataframe, segments, self.analytic_unit_id) + + # time step is optional + if len(dataframe) > 1: + new_cache.time_step = utils.find_interval(dataframe) + new_cache = new_cache.to_json() if len(new_cache) == 0: logging.warning('new_cache is empty with data: {}, segments: {}, cache: {}, analytic unit: {}'.format(dataframe, segments, cache, self.analytic_unit_id)) @@ -75,7 +80,7 @@ class PatternDetector(Detector): raise ValueError(message) if len(dataframe) < window_size * 2: - message = f'{self.analytic_unit_id} skip detection: data length: {len(dataframe)} less than window_size: {window_size}' + message = f'{self.analytic_unit_id} skip detection: dataset length {len(dataframe)} points less than minimal length {window_size * 2} points' logger.error(message) raise ValueError(message) diff --git a/analytics/analytics/models/model.py b/analytics/analytics/models/model.py index e945243..8525230 100644 --- a/analytics/analytics/models/model.py +++ b/analytics/analytics/models/model.py @@ -68,6 +68,7 @@ class ModelState(): def __init__( self, + time_step: int = 0, pattern_center: List[int] = None, pattern_model: List[float] = None, convolve_max: float = 0, @@ -76,6 +77,7 @@ class ModelState(): conv_del_min: float = 0, conv_del_max: float = 0 ): + self.time_step = time_step self.pattern_center = pattern_center if pattern_center is not None else [] self.pattern_model = pattern_model if pattern_model is not None else [] self.convolve_max = convolve_max diff --git a/server/spec/analytic_controller.jest.ts b/server/spec/analytic_controller.jest.ts new file mode 100644 index 0000000..c62ffca --- /dev/null +++ b/server/spec/analytic_controller.jest.ts @@ -0,0 +1,90 @@ +import { queryByMetric } from 'grafana-datasource-kit'; + +jest.mock('grafana-datasource-kit', () => ( + { + ...(jest.requireActual('grafana-datasource-kit')), + queryByMetric: jest.fn((metric, url, from, to, apiKey) => {}) + } +)); + +import { saveAnalyticUnitFromObject, runDetect } from '../src/controllers/analytics_controller'; +import * as AnalyticUnit from '../src/models/analytic_units'; +import * as AnalyticUnitCache from '../src/models/analytic_unit_cache_model'; + +import { HASTIC_API_KEY } from '../src/config'; + + +describe('Check detection range', function() { + const analyticUnitObj = { + _id: 'test', + name: "test", + grafanaUrl: "http://127.0.0.1:3000", + panelId: "ZLc0KfNZk/2", + type: "GENERAL", + metric: { + datasource: { + url: "api/datasources/proxy/5/query", + method: "GET", + data: null, + params: { + db:"dbname", + q: "SELECT mean(\"value\") FROM \"autogen\".\"tcpconns_value\" WHERE time >= now() - 6h GROUP BY time(20s) fill(null)", + epoch: "ms" + }, + type: "influxdb" + }, + targets: [ + { + groupBy: [ + { + params: ["$__interval"], + type: "time" + }, + { + params: ["null"], + type: "fill" + } + ], + measurement: "tcpconns_value", + orderByTime: "ASC", + policy: "autogen", + refId: "A", + resultFormat: "time_series", + select: [[{"params":["value"],"type":"field"},{"params":[],"type":"mean"}]],"tags":[] + } + ] + }, + alert: false, + labeledColor: "#FF99FF", + deletedColor: "#00f0ff", + detectorType: "pattern", + visible: true, + collapsed: false, + createdAt: {"$$date":1564476040880}, + updatedAt: {"$$date":1564476040880} + } + + const WINDOW_SIZE = 10; + const TIME_STEP = 1000; + + async function addTestUnitToDB(): Promise { + const analyticUnitId = await saveAnalyticUnitFromObject(analyticUnitObj); + await AnalyticUnit.update(analyticUnitId, {lastDetectionTime: 1000}); + await AnalyticUnitCache.create(analyticUnitId); + await AnalyticUnitCache.setData(analyticUnitId, { + windowSize: WINDOW_SIZE, + timeStep: TIME_STEP + }); + return analyticUnitId; + }; + + it('check range >= 2 * window size * timeStep', async () => { + const from = 1500000000000; + const to = 1500000000001; + const expectedFrom = to - WINDOW_SIZE * TIME_STEP * 2; + + const id = await addTestUnitToDB(); + await runDetect(id, from, to); + expect(queryByMetric).toBeCalledWith(analyticUnitObj.metric, undefined, expectedFrom, to, HASTIC_API_KEY); + }); +}); diff --git a/server/src/controllers/analytics_controller.ts b/server/src/controllers/analytics_controller.ts index e683169..d565984 100644 --- a/server/src/controllers/analytics_controller.ts +++ b/server/src/controllers/analytics_controller.ts @@ -350,6 +350,11 @@ export async function runDetect(id: AnalyticUnit.AnalyticUnitId, from?: number, } else { range = await getQueryRange(id, detector); } + + if(range.to - range.from < intersection) { + range.from = range.to - intersection; + } + const data = await query(unit, range); let task = new AnalyticsTask( @@ -596,6 +601,13 @@ async function getPayloadData( } else { range = await getQueryRange(analyticUnit.id, analyticUnit.detectorType); } + + const cache = await AnalyticUnitCache.findById(analyticUnit.id); + const intersection = cache.getIntersection(); + if(range.to - range.from < intersection) { + range.from = range.to - intersection; + } + return await query(analyticUnit, range); } diff --git a/server/src/models/analytic_unit_cache_model.ts b/server/src/models/analytic_unit_cache_model.ts index c3c4896..771d8e1 100644 --- a/server/src/models/analytic_unit_cache_model.ts +++ b/server/src/models/analytic_unit_cache_model.ts @@ -33,9 +33,13 @@ export class AnalyticUnitCache { } public getIntersection(): number { - if(_.has(this.data, 'windowSize')) { + if(this.data.windowSize !== undefined) { //TODO: return one window size after resolving https://github.com/hastic/hastic-server/issues/508 - return this.data.windowSize * 2 * MILLISECONDS_IN_INDEX; + if(this.data.timeStep !== undefined) { + return this.data.windowSize * 2 * this.data.timeStep; + } else { + return this.data.windowSize * 2 * MILLISECONDS_IN_INDEX; + } } // TODO: default window size return 3 * MILLISECONDS_IN_INDEX;