diff --git a/analytics/analytics/analytic_unit_manager.py b/analytics/analytics/analytic_unit_manager.py index f3525e1..f3f43f0 100644 --- a/analytics/analytics/analytic_unit_manager.py +++ b/analytics/analytics/analytic_unit_manager.py @@ -53,6 +53,9 @@ class AnalyticUnitManager: """ returns payload or None """ + if task['type'] == 'PUSH': + # TODO: implement PUSH message handling + return analytic_unit_id: AnalyticUnitId = task['analyticUnitId'] if task['type'] == 'CANCEL': diff --git a/analytics/bin/server b/analytics/bin/server old mode 100644 new mode 100755 index 929436e..da87cd7 --- a/analytics/bin/server +++ b/analytics/bin/server @@ -50,14 +50,16 @@ async def handle_task(task: object): 'status': "IN_PROGRESS" } - message = services.server_service.ServerMessage('TASK_RESULT', task_result_payload) - await server_service.send_message(message) + if not task['type'] == 'PUSH': + message = services.server_service.ServerMessage('TASK_RESULT', task_result_payload) + await server_service.send_message(message) res = await analytic_unit_manager.handle_analytic_task(task) res['_id'] = task['_id'] - message = services.server_service.ServerMessage('TASK_RESULT', res) - await server_service.send_message(message) + if not task['type'] == 'PUSH': + message = services.server_service.ServerMessage('TASK_RESULT', res) + await server_service.send_message(message) except Exception as e: error_text = traceback.format_exc() diff --git a/server/src/controllers/analytics_controller.ts b/server/src/controllers/analytics_controller.ts index ac3f890..6caff6f 100644 --- a/server/src/controllers/analytics_controller.ts +++ b/server/src/controllers/analytics_controller.ts @@ -6,6 +6,7 @@ import * as AnalyticUnit from '../models/analytic_unit_model'; import { AnalyticsService } from '../services/analytics_service'; import { sendWebhook } from '../services/notification_service'; import { HASTIC_API_KEY } from '../config' +import { DataPuller } from '../services/data_puller'; import { queryByMetric } from 'grafana-datasource-kit'; @@ -20,6 +21,7 @@ export type TaskResolver = (taskResult: TaskResult) => void; const taskResolvers = new Map(); let analyticsService: AnalyticsService = undefined; +let dataPuller: DataPuller; function onTaskResult(taskResult: TaskResult) { @@ -70,6 +72,8 @@ async function onMessage(message: AnalyticsMessage) { export function init() { analyticsService = new AnalyticsService(onMessage); + dataPuller = new DataPuller(analyticsService); + dataPuller.runPuller(); } export function terminate() { @@ -224,6 +228,11 @@ export async function runDetect(id: AnalyticUnit.AnalyticUnitId) { export async function remove(id: AnalyticUnit.AnalyticUnitId) { let task = new AnalyticsTask(id, AnalyticsTaskType.CANCEL); await runTask(task); + + if(dataPuller !== undefined) { + dataPuller.deleteUnit(id); + } + await AnalyticUnit.remove(id); } @@ -277,6 +286,12 @@ export async function createAnalyticUnitFromObject(obj: any): Promise { + if(!value.unit.alert) { + return; + } + let data = await this.pullData(value.unit, value.time, now); + if(data.values.length === 0) { + return; + } + + let payload = { data, from: value.time, to: now}; + value.time = now; + this.pushData(value.unit, payload); + }); + + this._timer = setTimeout(this.puller.bind(this), this._interval); + } + +}