Browse Source

grafana influx quering

pull/1/head
Coin de Gamma 7 years ago
parent
commit
7c559046c1
  1. 15
      analytics/analytic_unit_worker.py
  2. 4
      analytics/detectors/general_detector/general_detector.py
  3. 12
      analytics/detectors/pattern_detector.py
  4. 2
      server/package.json
  5. 50
      server/src/controllers/analytics_controller.ts
  6. 9
      server/src/models/grafana_metric_model.ts
  7. 40
      server/src/services/grafana_service.ts

15
analytics/analytic_unit_worker.py

@ -44,12 +44,13 @@ class AnalyticUnitWorker(object):
async def do_learn(self, analytic_unit_id, payload):
pattern = payload['pattern']
segments = payload['segments']
data = payload['data'] # [time, value][]
model = self.get_detector(analytic_unit_id, pattern)
model.synchronize_data()
last_prediction_time = await model.learn(segments)
detector = self.get_detector(analytic_unit_id, pattern)
detector.synchronize_data()
last_prediction_time = await detector.learn(segments)
# TODO: we should not do predict before labeling in all models, not just in drops
if pattern == 'DROP' and len(segments) == 0:
# TODO: move result to a class which renders to json for messaging to analytics
result = {
@ -68,9 +69,9 @@ class AnalyticUnitWorker(object):
pattern = payload['pattern']
last_prediction_time = payload['lastPredictionTime']
model = self.get_detector(analytic_unit_id, pattern)
model.synchronize_data()
segments, last_prediction_time = await model.predict(last_prediction_time)
detector = self.get_detector(analytic_unit_id, pattern)
detector.synchronize_data()
segments, last_prediction_time = await detector.predict(last_prediction_time)
return {
'task': 'PREDICT',
'status': 'SUCCESS',

4
analytics/detectors/general_detector/general_detector.py

@ -84,10 +84,6 @@ class GeneralDetector:
logger.info("Predicting is finished for anomaly type='%s'" % self.anomaly_name)
return predicted_anomalies, last_prediction_time
def synchronize_data(self):
self.data_prov.synchronize()
self.preprocessor.set_data_provider(self.data_prov)
self.preprocessor.synchronize()
def create_algorithm(self):
return SupervisedAlgorithm()

12
analytics/detectors/pattern_detector.py

@ -37,19 +37,16 @@ class PatternDetector:
self.model = None
self.__load_model(pattern_type)
async def learn(self, segments):
async def learn(self, segments, data):
self.model = resolve_model_by_pattern(self.pattern_type)
window_size = 200
dataframe = self.data_prov.get_dataframe()
segments = self.data_prov.transform_anomalies(segments)
# TODO: pass only part of dataframe that has segments
self.model.fit(dataframe, segments)
self.model.fit(dataframe, segments, data)
self.__save_model()
return 0
async def predict(self, last_prediction_time):
async def predict(self, last_prediction_time, data):
if self.model is None:
return [], last_prediction_time
@ -78,9 +75,6 @@ class PatternDetector:
return segments, last_prediction_time
# return predicted_anomalies, last_prediction_time
def synchronize_data(self):
self.data_prov.synchronize()
def __save_model(self):
pass
# TODO: use data_service to save anything

2
server/package.json

@ -24,6 +24,7 @@
"@types/koa": "^2.0.45",
"@types/koa-bodyparser": "^4.2.0",
"@types/koa-router": "^7.0.28",
"@types/lodash": "^4.14.116",
"@types/nedb": "^1.8.0",
"axios": "^0.18.0",
"babel-core": "^6.26.3",
@ -37,6 +38,7 @@
"koa": "^2.5.1",
"koa-bodyparser": "^4.2.1",
"koa-router": "^7.4.0",
"lodash": "^4.17.10",
"nedb": "^1.8.0",
"node-loader": "^0.6.0",
"nodemon": "^1.17.5",

50
server/src/controllers/analytics_controller.ts

@ -5,6 +5,8 @@ import * as AnalyticUnit from '../models/analytic_unit_model';
import { AnalyticsService } from '../services/analytics_service';
import { queryByMetric } from '../services/grafana_service';
import * as _ from 'lodash';
type TaskResult = any;
export type TaskResolver = (taskResult: TaskResult) => void;
@ -60,43 +62,59 @@ export function terminate() {
}
async function runTask(task: AnalyticsTask): Promise<TaskResult> {
// task.metric = {
// datasource: anomaly.metric.datasource,
// targets: anomaly.metric.targets.map(getTarget)
// };
return new Promise<TaskResult>((resolver: TaskResolver) => {
taskResolvers.set(task.id, resolver); // it will be resolved in onTaskResult()
analyticsService.sendTask(task); // we dont wait for result here
});
}
/**
* Finds range for selecting subset for learning
* @param segments labeled segments
*/
function getQueryRangeForLearningBySegments(segments: Segment.Segment[]) {
if(segments.length < 2) {
throw new Error('Need at least 2 labeled segments');
}
let from = _.minBy(segments, s => s.from).from;
let to = _.maxBy(segments, s => s.to).to;
let diff = to - from;
from -= Math.round(diff * 0.1);
to += Math.round(diff * 0.1);
return { from, to };
}
export async function runLearning(id: AnalyticUnit.AnalyticUnitId) {
let previousLastPredictionTime: number = undefined;
try {
let segments = await Segment.findMany(id, { labeled: true });
let analyticUnit = await AnalyticUnit.findById(id);
if(analyticUnit.status === AnalyticUnit.AnalyticUnitStatus.LEARNING) {
throw new Error('Can`t starn learning when it`s already started [' + id + ']');
}
let segments = await Segment.findMany(id, { labeled: true });
if(segments.length < 2) {
throw new Error('Need at least 2 labeled segments');
}
let segmentObjs = segments.map(s => s.toObject());
let data = await queryByMetric(analyticUnit.metric, analyticUnit.panelUrl);
let { from, to } = getQueryRangeForLearningBySegments(segments);
let data = await queryByMetric(analyticUnit.metric, analyticUnit.panelUrl, from, to);
if(data.length === 0) {
throw new Error('Empty data to learn on');
}
if(analyticUnit.status === AnalyticUnit.AnalyticUnitStatus.LEARNING) {
throw new Error('Can`t starn learning when it`s already started [' + id + ']');
}
AnalyticUnit.setStatus(id, AnalyticUnit.AnalyticUnitStatus.LEARNING);
let pattern = analyticUnit.type;
let task = new AnalyticsTask(
id, AnalyticsTaskType.LEARN, { pattern, segments: segmentObjs }
id, AnalyticsTaskType.LEARN, { pattern, segments: segmentObjs, data }
);
AnalyticUnit.setStatus(id, AnalyticUnit.AnalyticUnitStatus.LEARNING);
let result = await runTask(task);
let { lastPredictionTime, segments: predictedSegments } = await processLearningResult(result);
previousLastPredictionTime = analyticUnit.lastPredictionTime;

9
server/src/models/grafana_metric_model.ts

@ -62,9 +62,11 @@ export class MetricQuery {
private static INFLUX_QUERY_TIME_REGEX = /time >[^A-Z]+/;
private _queryParts: string[];
private _type: string;
constructor(metric: GrafanaMetric) {
if (metric.datasource.type !== 'influxdb') {
this._type = metric.datasource.type;
if (this._type !== 'influxdb') {
throw new Error(`Queries of type "${metric.datasource.type}" are not supported yet.`);
}
var queryStr = metric.datasource.params.q;
@ -79,7 +81,8 @@ export class MetricQuery {
}
}
getQuery(limit: number, offset: number): string {
return `${this._queryParts[0]} TRUE ${this._queryParts[1]} LIMIT ${limit} OFFSET ${offset}`;
getQuery(from: number, to: number, limit: number, offset: number): string {
let timeClause = `time >= ${from}ms AND time <= ${to}ms`;
return `${this._queryParts[0]} ${timeClause} ${this._queryParts[1]} LIMIT ${limit} OFFSET ${offset}`;
}
}

40
server/src/services/grafana_service.ts

@ -11,47 +11,37 @@ const CHUNK_SIZE = 50000;
* @param metric to query to Grafana
* @returns [time, value][] array
*/
export async function queryByMetric(metric: GrafanaMetric, panelUrl: string): Promise<[number, number][]> {
export async function queryByMetric(
metric: GrafanaMetric, panelUrl: string, from: number, to: number
): Promise<[number, number][]> {
let datasource = metric.datasource;
let origin = new URL(panelUrl).origin;
let url = `${origin}/${datasource.url}`;
let params = datasource.params
let records = await getRecordsCount(url, params);
let limit = Math.min(records, CHUNK_SIZE);
let offset = 0;
let data = [];
while (offset <= records) {
let paramsClone = Object.assign({}, params);
paramsClone.q = metric.metricQuery.getQuery(limit, offset);
let chunk = await queryGrafana(url, paramsClone);
let chunkParams = Object.assign({}, params);
while(true) {
chunkParams.q = metric.metricQuery.getQuery(from, to, CHUNK_SIZE, data.length);
var chunk = await queryGrafana(url, chunkParams);
data = data.concat(chunk);
offset += CHUNK_SIZE;
if(chunk.length < CHUNK_SIZE) {
// because if we get less that we could, then there is nothing more
break;
}
}
return data;
}
async function getRecordsCount(url: string, params: any) {
let paramsClone = Object.assign({}, params);
let query = paramsClone.q;
let field = query.match(/"(\w+)"\)*\sFROM/)[1];
let measurement = query.match(/FROM\s"(\w+)"/)[1];
paramsClone.q = `SELECT COUNT(${field}) FROM ${measurement}`;
let result = await queryGrafana(url, paramsClone);
return result[0][1];
}
async function queryGrafana(url: string, params: any) {
let headers = { Authorization: `Bearer ${HASTIC_API_KEY}` };
let res;
try {
res = await axios.get(url, { params, headers });
var res = await axios.get(url, { params, headers });
} catch (e) {
if(e.response.status === 401) {
throw new Error('Unauthorized. Check the $HASTIC_API_KEY.');
@ -69,5 +59,5 @@ async function queryGrafana(url: string, params: any) {
return [];
}
return results.series[0].values;
return results.series[0].values as [number, number][];
}

Loading…
Cancel
Save