Browse Source

Pull / push data from Grafana to analytics #267 (#290)

* add data puller

* Do not send response on PUSH event

* Stub analytics PUSH message handling
pull/1/head
Evgeny Smyshlyaev 6 years ago committed by rozetko
parent
commit
d4f6783652
  1. 3
      analytics/analytics/analytic_unit_manager.py
  2. 10
      analytics/bin/server
  3. 15
      server/src/controllers/analytics_controller.ts
  4. 3
      server/src/models/analytics_task_model.ts
  5. 93
      server/src/services/data_puller.ts

3
analytics/analytics/analytic_unit_manager.py

@ -53,6 +53,9 @@ class AnalyticUnitManager:
"""
returns payload or None
"""
if task['type'] == 'PUSH':
# TODO: implement PUSH message handling
return
analytic_unit_id: AnalyticUnitId = task['analyticUnitId']
if task['type'] == 'CANCEL':

10
analytics/bin/server

@ -50,14 +50,16 @@ async def handle_task(task: object):
'status': "IN_PROGRESS"
}
message = services.server_service.ServerMessage('TASK_RESULT', task_result_payload)
await server_service.send_message(message)
if not task['type'] == 'PUSH':
message = services.server_service.ServerMessage('TASK_RESULT', task_result_payload)
await server_service.send_message(message)
res = await analytic_unit_manager.handle_analytic_task(task)
res['_id'] = task['_id']
message = services.server_service.ServerMessage('TASK_RESULT', res)
await server_service.send_message(message)
if not task['type'] == 'PUSH':
message = services.server_service.ServerMessage('TASK_RESULT', res)
await server_service.send_message(message)
except Exception as e:
error_text = traceback.format_exc()

15
server/src/controllers/analytics_controller.ts

@ -6,6 +6,7 @@ import * as AnalyticUnit from '../models/analytic_unit_model';
import { AnalyticsService } from '../services/analytics_service';
import { sendWebhook } from '../services/notification_service';
import { HASTIC_API_KEY } from '../config'
import { DataPuller } from '../services/data_puller';
import { queryByMetric } from 'grafana-datasource-kit';
@ -20,6 +21,7 @@ export type TaskResolver = (taskResult: TaskResult) => void;
const taskResolvers = new Map<AnalyticsTaskId, TaskResolver>();
let analyticsService: AnalyticsService = undefined;
let dataPuller: DataPuller;
function onTaskResult(taskResult: TaskResult) {
@ -70,6 +72,8 @@ async function onMessage(message: AnalyticsMessage) {
export function init() {
analyticsService = new AnalyticsService(onMessage);
dataPuller = new DataPuller(analyticsService);
dataPuller.runPuller();
}
export function terminate() {
@ -224,6 +228,11 @@ export async function runDetect(id: AnalyticUnit.AnalyticUnitId) {
export async function remove(id: AnalyticUnit.AnalyticUnitId) {
let task = new AnalyticsTask(id, AnalyticsTaskType.CANCEL);
await runTask(task);
if(dataPuller !== undefined) {
dataPuller.deleteUnit(id);
}
await AnalyticUnit.remove(id);
}
@ -277,6 +286,12 @@ export async function createAnalyticUnitFromObject(obj: any): Promise<AnalyticUn
}
let unit: AnalyticUnit.AnalyticUnit = AnalyticUnit.AnalyticUnit.fromObject(obj);
let id = await AnalyticUnit.create(unit);
unit.id = id;
if(dataPuller !== undefined) {
dataPuller.addUnit(unit);
}
return id;
}

3
server/src/models/analytics_task_model.ts

@ -9,7 +9,8 @@ export type AnalyticsTaskId = string;
export enum AnalyticsTaskType {
LEARN = 'LEARN',
DETECT = 'DETECT',
CANCEL = 'CANCEL'
CANCEL = 'CANCEL',
PUSH = 'PUSH'
};
export class AnalyticsTask {

93
server/src/services/data_puller.ts

@ -0,0 +1,93 @@
import { AnalyticsTask, AnalyticsTaskType } from '../models/analytics_task_model';
import * as AnalyticUnit from '../models/analytic_unit_model';
import { AnalyticsService } from './analytics_service';
import { HASTIC_API_KEY } from '../config';
import { queryByMetric } from 'grafana-datasource-kit';
import * as _ from 'lodash';
declare type UnitTime = {
unit: AnalyticUnit.AnalyticUnit,
time: number
};
export class DataPuller {
private PULL_PERIOD_MS: number = 5000;
private _interval: number = 1000;
private _timer: any = null;
private _unitTimes: { [id: string]: UnitTime } = {};
constructor(private analyticsService: AnalyticsService){};
public addUnit(unit: AnalyticUnit.AnalyticUnit) {
let time = unit.lastDetectionTime || Date.now();
let unitTime: UnitTime = {unit, time };
this._unitTimes[unit.id] = unitTime;
}
public deleteUnit(id: AnalyticUnit.AnalyticUnitId) {
delete this._unitTimes[id];
}
private pullData(unit: AnalyticUnit.AnalyticUnit, from: number, to: number) {
if(!unit) {
throw Error(`puller: can't pull undefined unit`);
}
return queryByMetric(unit.metric, unit.panelUrl, from, to, HASTIC_API_KEY);
}
private pushData(unit: AnalyticUnit.AnalyticUnit, data: any) {
if(unit === undefined || data === undefined) {
throw Error(`can't push unit: ${unit} data: ${data}`);
}
let task = new AnalyticsTask(unit.id, AnalyticsTaskType.PUSH, data);
this.analyticsService.sendTask(task);
}
//TODO: group analyticUnits by panelID and send same dataset for group
public runPuller() {
this._timer = setTimeout(this.puller.bind(this), this._interval);
console.log('Data puller runned');
}
public stopPuller() {
if(this._timer) {
clearTimeout(this._timer);
this._timer = null;
this._interval = 0;
console.log('Data puller stopped');
}
console.log('Data puller already stopped');
}
private async puller() {
if(_.isEmpty(this._unitTimes)) {
this._interval = this.PULL_PERIOD_MS;
this._timer = setTimeout(this.puller.bind(this), this._interval);
return;
}
let now = Date.now();
_.forOwn(this._unitTimes, async value => {
if(!value.unit.alert) {
return;
}
let data = await this.pullData(value.unit, value.time, now);
if(data.values.length === 0) {
return;
}
let payload = { data, from: value.time, to: now};
value.time = now;
this.pushData(value.unit, payload);
});
this._timer = setTimeout(this.puller.bind(this), this._interval);
}
}
Loading…
Cancel
Save