From d300362b49fc6d4333960cfa73232345f1eb397a Mon Sep 17 00:00:00 2001 From: rozetko Date: Sat, 5 Jan 2019 14:51:39 +0300 Subject: [PATCH] Fix threshold detector (#331) --- analytics/analytics/analytic_unit_manager.py | 4 +++- analytics/analytics/detectors/threshold_detector.py | 9 ++++++--- server/src/controllers/analytics_controller.ts | 2 +- server/src/models/threshold_model.ts | 3 ++- server/src/routes/threshold_router.ts | 5 +++-- server/src/services/data_puller.ts | 4 +++- 6 files changed, 18 insertions(+), 9 deletions(-) diff --git a/analytics/analytics/analytic_unit_manager.py b/analytics/analytics/analytic_unit_manager.py index 2111159..46d8494 100644 --- a/analytics/analytics/analytic_unit_manager.py +++ b/analytics/analytics/analytic_unit_manager.py @@ -74,7 +74,9 @@ class AnalyticUnitManager: worker = self.__ensure_worker(analytic_unit_id, payload['detector'], payload['analyticUnitType']) data = prepare_data(payload['data']) if task['type'] == 'PUSH': - return await worker.recieve_data(data, payload['cache']) + # TODO: do it a better way + res = await worker.recieve_data(data, payload['cache']) + return res.update({ 'analyticUnitId': analytic_unit_id }) elif task['type'] == 'LEARN': if 'segments' in payload: return await worker.do_train(payload['segments'], data, payload['cache']) diff --git a/analytics/analytics/detectors/threshold_detector.py b/analytics/analytics/detectors/threshold_detector.py index b2c1716..38c2750 100644 --- a/analytics/analytics/detectors/threshold_detector.py +++ b/analytics/analytics/detectors/threshold_detector.py @@ -27,12 +27,15 @@ class ThresholdDetector(Detector): value = cache['value'] condition = cache['condition'] - last_entry = dataframe.iloc[-1] + dataframe_without_nans = dataframe.dropna() + if len(dataframe_without_nans) == 0: + return dict() + last_entry = dataframe_without_nans.iloc[-1] last_value = last_entry['value'] # TODO: convert from nanoseconds to millisecond in a better way: not by dividing by 10^6 last_time = last_entry['timestamp'].value / 1000000 - segment = (last_time, last_time) + segment = ({ 'from': last_time, 'to': last_time }) segments = [] if condition == '>': if last_value > value: @@ -57,4 +60,4 @@ class ThresholdDetector(Detector): } def recieve_data(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> Optional[dict]: - return self.detect(self.bucket.data, cache) + return self.detect(data, cache) diff --git a/server/src/controllers/analytics_controller.ts b/server/src/controllers/analytics_controller.ts index a8e95e5..a9d4733 100644 --- a/server/src/controllers/analytics_controller.ts +++ b/server/src/controllers/analytics_controller.ts @@ -101,7 +101,7 @@ async function query(analyticUnit: AnalyticUnit.AnalyticUnit, detector: Analytic } else if(detector === AnalyticUnit.DetectorType.THRESHOLD) { const now = Date.now(); range = { - from: now - 5 * SECONDS_IN_MINUTE, + from: now - 5 * SECONDS_IN_MINUTE * 1000, to: now }; } diff --git a/server/src/models/threshold_model.ts b/server/src/models/threshold_model.ts index 611ff15..1374c7f 100644 --- a/server/src/models/threshold_model.ts +++ b/server/src/models/threshold_model.ts @@ -59,7 +59,8 @@ export async function findOne(id: AnalyticUnitId): Promise { } export async function updateThreshold(id: AnalyticUnitId, value: number, condition: Condition) { - if(findOne(id) === null) { + const threshold = await db.findOne(id); + if(threshold === null) { return db.insertOne({ id, value, condition }); } return db.updateOne({ id }, { value, condition }); diff --git a/server/src/routes/threshold_router.ts b/server/src/routes/threshold_router.ts index aeeec65..1f3107b 100644 --- a/server/src/routes/threshold_router.ts +++ b/server/src/routes/threshold_router.ts @@ -9,8 +9,9 @@ import * as _ from 'lodash'; async function getThresholds(ctx: Router.IRouterContext) { try { - const ids: AnalyticUnitId = ctx.request.query.ids; - if(ids === undefined || _.isEmpty(ids)) { + const ids: AnalyticUnitId[] = ctx.request.query.ids.split(','); + + if(ids === undefined) { throw new Error('analyticUnitIds (ids) are missing'); } diff --git a/server/src/services/data_puller.ts b/server/src/services/data_puller.ts index 043c3ec..24fe746 100644 --- a/server/src/services/data_puller.ts +++ b/server/src/services/data_puller.ts @@ -94,11 +94,13 @@ export class DataPuller { if(cache !== null) { cache = cache.data } + const detector = AnalyticUnit.getDetectorByType(analyticUnit.type); let payload = { data: payloadValues, from: time, to: now, - pattern: analyticUnit.type, + analyticUnitType: analyticUnit.type, + detector, cache }; this._unitTimes[analyticUnit.id] = now;