From 7c559046c1afcbb2b885780a033e6b8930604fd4 Mon Sep 17 00:00:00 2001 From: Alexey Velikiy Date: Sat, 25 Aug 2018 23:06:21 +0300 Subject: [PATCH] grafana influx quering --- analytics/analytic_unit_worker.py | 15 +++--- .../general_detector/general_detector.py | 4 -- analytics/detectors/pattern_detector.py | 12 ++--- server/package.json | 2 + .../src/controllers/analytics_controller.ts | 50 +++++++++++++------ server/src/models/grafana_metric_model.ts | 9 ++-- server/src/services/grafana_service.ts | 40 ++++++--------- 7 files changed, 68 insertions(+), 64 deletions(-) diff --git a/analytics/analytic_unit_worker.py b/analytics/analytic_unit_worker.py index a2a3501..6d38b6c 100644 --- a/analytics/analytic_unit_worker.py +++ b/analytics/analytic_unit_worker.py @@ -44,12 +44,13 @@ class AnalyticUnitWorker(object): async def do_learn(self, analytic_unit_id, payload): pattern = payload['pattern'] segments = payload['segments'] + data = payload['data'] # [time, value][] - model = self.get_detector(analytic_unit_id, pattern) - model.synchronize_data() - last_prediction_time = await model.learn(segments) + detector = self.get_detector(analytic_unit_id, pattern) + detector.synchronize_data() + last_prediction_time = await detector.learn(segments) # TODO: we should not do predict before labeling in all models, not just in drops - + if pattern == 'DROP' and len(segments) == 0: # TODO: move result to a class which renders to json for messaging to analytics result = { @@ -68,9 +69,9 @@ class AnalyticUnitWorker(object): pattern = payload['pattern'] last_prediction_time = payload['lastPredictionTime'] - model = self.get_detector(analytic_unit_id, pattern) - model.synchronize_data() - segments, last_prediction_time = await model.predict(last_prediction_time) + detector = self.get_detector(analytic_unit_id, pattern) + detector.synchronize_data() + segments, last_prediction_time = await detector.predict(last_prediction_time) return { 'task': 'PREDICT', 'status': 'SUCCESS', diff --git a/analytics/detectors/general_detector/general_detector.py b/analytics/detectors/general_detector/general_detector.py index 18d95e9..cca4652 100644 --- a/analytics/detectors/general_detector/general_detector.py +++ b/analytics/detectors/general_detector/general_detector.py @@ -84,10 +84,6 @@ class GeneralDetector: logger.info("Predicting is finished for anomaly type='%s'" % self.anomaly_name) return predicted_anomalies, last_prediction_time - def synchronize_data(self): - self.data_prov.synchronize() - self.preprocessor.set_data_provider(self.data_prov) - self.preprocessor.synchronize() def create_algorithm(self): return SupervisedAlgorithm() diff --git a/analytics/detectors/pattern_detector.py b/analytics/detectors/pattern_detector.py index 57e39b8..affae3e 100644 --- a/analytics/detectors/pattern_detector.py +++ b/analytics/detectors/pattern_detector.py @@ -37,19 +37,16 @@ class PatternDetector: self.model = None self.__load_model(pattern_type) - async def learn(self, segments): + async def learn(self, segments, data): self.model = resolve_model_by_pattern(self.pattern_type) window_size = 200 - dataframe = self.data_prov.get_dataframe() - - segments = self.data_prov.transform_anomalies(segments) # TODO: pass only part of dataframe that has segments - self.model.fit(dataframe, segments) + self.model.fit(dataframe, segments, data) self.__save_model() return 0 - async def predict(self, last_prediction_time): + async def predict(self, last_prediction_time, data): if self.model is None: return [], last_prediction_time @@ -78,9 +75,6 @@ class PatternDetector: return segments, last_prediction_time # return predicted_anomalies, last_prediction_time - def synchronize_data(self): - self.data_prov.synchronize() - def __save_model(self): pass # TODO: use data_service to save anything diff --git a/server/package.json b/server/package.json index 64d80f2..fae059a 100644 --- a/server/package.json +++ b/server/package.json @@ -24,6 +24,7 @@ "@types/koa": "^2.0.45", "@types/koa-bodyparser": "^4.2.0", "@types/koa-router": "^7.0.28", + "@types/lodash": "^4.14.116", "@types/nedb": "^1.8.0", "axios": "^0.18.0", "babel-core": "^6.26.3", @@ -37,6 +38,7 @@ "koa": "^2.5.1", "koa-bodyparser": "^4.2.1", "koa-router": "^7.4.0", + "lodash": "^4.17.10", "nedb": "^1.8.0", "node-loader": "^0.6.0", "nodemon": "^1.17.5", diff --git a/server/src/controllers/analytics_controller.ts b/server/src/controllers/analytics_controller.ts index 355ecc9..d72967c 100644 --- a/server/src/controllers/analytics_controller.ts +++ b/server/src/controllers/analytics_controller.ts @@ -5,6 +5,8 @@ import * as AnalyticUnit from '../models/analytic_unit_model'; import { AnalyticsService } from '../services/analytics_service'; import { queryByMetric } from '../services/grafana_service'; +import * as _ from 'lodash'; + type TaskResult = any; export type TaskResolver = (taskResult: TaskResult) => void; @@ -60,43 +62,59 @@ export function terminate() { } async function runTask(task: AnalyticsTask): Promise { - // task.metric = { - // datasource: anomaly.metric.datasource, - // targets: anomaly.metric.targets.map(getTarget) - // }; - return new Promise((resolver: TaskResolver) => { taskResolvers.set(task.id, resolver); // it will be resolved in onTaskResult() analyticsService.sendTask(task); // we dont wait for result here }); } +/** + * Finds range for selecting subset for learning + * @param segments labeled segments + */ +function getQueryRangeForLearningBySegments(segments: Segment.Segment[]) { + if(segments.length < 2) { + throw new Error('Need at least 2 labeled segments'); + } + + let from = _.minBy(segments, s => s.from).from; + let to = _.maxBy(segments, s => s.to).to; + let diff = to - from; + from -= Math.round(diff * 0.1); + to += Math.round(diff * 0.1); + + return { from, to }; +} + export async function runLearning(id: AnalyticUnit.AnalyticUnitId) { - + let previousLastPredictionTime: number = undefined; try { - let segments = await Segment.findMany(id, { labeled: true }); let analyticUnit = await AnalyticUnit.findById(id); + if(analyticUnit.status === AnalyticUnit.AnalyticUnitStatus.LEARNING) { + throw new Error('Can`t starn learning when it`s already started [' + id + ']'); + } + + let segments = await Segment.findMany(id, { labeled: true }); + if(segments.length < 2) { + throw new Error('Need at least 2 labeled segments'); + } let segmentObjs = segments.map(s => s.toObject()); - let data = await queryByMetric(analyticUnit.metric, analyticUnit.panelUrl); - + + let { from, to } = getQueryRangeForLearningBySegments(segments); + let data = await queryByMetric(analyticUnit.metric, analyticUnit.panelUrl, from, to); if(data.length === 0) { throw new Error('Empty data to learn on'); } - if(analyticUnit.status === AnalyticUnit.AnalyticUnitStatus.LEARNING) { - throw new Error('Can`t starn learning when it`s already started [' + id + ']'); - } - - AnalyticUnit.setStatus(id, AnalyticUnit.AnalyticUnitStatus.LEARNING); - let pattern = analyticUnit.type; let task = new AnalyticsTask( - id, AnalyticsTaskType.LEARN, { pattern, segments: segmentObjs } + id, AnalyticsTaskType.LEARN, { pattern, segments: segmentObjs, data } ); + AnalyticUnit.setStatus(id, AnalyticUnit.AnalyticUnitStatus.LEARNING); let result = await runTask(task); let { lastPredictionTime, segments: predictedSegments } = await processLearningResult(result); previousLastPredictionTime = analyticUnit.lastPredictionTime; diff --git a/server/src/models/grafana_metric_model.ts b/server/src/models/grafana_metric_model.ts index 7f8c7f7..9991233 100644 --- a/server/src/models/grafana_metric_model.ts +++ b/server/src/models/grafana_metric_model.ts @@ -62,9 +62,11 @@ export class MetricQuery { private static INFLUX_QUERY_TIME_REGEX = /time >[^A-Z]+/; private _queryParts: string[]; + private _type: string; constructor(metric: GrafanaMetric) { - if (metric.datasource.type !== 'influxdb') { + this._type = metric.datasource.type; + if (this._type !== 'influxdb') { throw new Error(`Queries of type "${metric.datasource.type}" are not supported yet.`); } var queryStr = metric.datasource.params.q; @@ -79,7 +81,8 @@ export class MetricQuery { } } - getQuery(limit: number, offset: number): string { - return `${this._queryParts[0]} TRUE ${this._queryParts[1]} LIMIT ${limit} OFFSET ${offset}`; + getQuery(from: number, to: number, limit: number, offset: number): string { + let timeClause = `time >= ${from}ms AND time <= ${to}ms`; + return `${this._queryParts[0]} ${timeClause} ${this._queryParts[1]} LIMIT ${limit} OFFSET ${offset}`; } } diff --git a/server/src/services/grafana_service.ts b/server/src/services/grafana_service.ts index 1b37785..8fac5f7 100644 --- a/server/src/services/grafana_service.ts +++ b/server/src/services/grafana_service.ts @@ -11,47 +11,37 @@ const CHUNK_SIZE = 50000; * @param metric to query to Grafana * @returns [time, value][] array */ -export async function queryByMetric(metric: GrafanaMetric, panelUrl: string): Promise<[number, number][]> { +export async function queryByMetric( + metric: GrafanaMetric, panelUrl: string, from: number, to: number +): Promise<[number, number][]> { + let datasource = metric.datasource; let origin = new URL(panelUrl).origin; let url = `${origin}/${datasource.url}`; let params = datasource.params - let records = await getRecordsCount(url, params); - - let limit = Math.min(records, CHUNK_SIZE); - let offset = 0; - let data = []; - while (offset <= records) { - let paramsClone = Object.assign({}, params); - paramsClone.q = metric.metricQuery.getQuery(limit, offset); - let chunk = await queryGrafana(url, paramsClone); + + let chunkParams = Object.assign({}, params); + while(true) { + chunkParams.q = metric.metricQuery.getQuery(from, to, CHUNK_SIZE, data.length); + var chunk = await queryGrafana(url, chunkParams); data = data.concat(chunk); - offset += CHUNK_SIZE; + if(chunk.length < CHUNK_SIZE) { + // because if we get less that we could, then there is nothing more + break; + } } return data; } -async function getRecordsCount(url: string, params: any) { - let paramsClone = Object.assign({}, params); - let query = paramsClone.q; - - let field = query.match(/"(\w+)"\)*\sFROM/)[1]; - let measurement = query.match(/FROM\s"(\w+)"/)[1]; - paramsClone.q = `SELECT COUNT(${field}) FROM ${measurement}`; - let result = await queryGrafana(url, paramsClone); - return result[0][1]; -} - async function queryGrafana(url: string, params: any) { let headers = { Authorization: `Bearer ${HASTIC_API_KEY}` }; - let res; try { - res = await axios.get(url, { params, headers }); + var res = await axios.get(url, { params, headers }); } catch (e) { if(e.response.status === 401) { throw new Error('Unauthorized. Check the $HASTIC_API_KEY.'); @@ -69,5 +59,5 @@ async function queryGrafana(url: string, params: any) { return []; } - return results.series[0].values; + return results.series[0].values as [number, number][]; }