From 2bbaf18040ce3f58542079edbf704cd604eb0e86 Mon Sep 17 00:00:00 2001 From: rozetko Date: Tue, 23 Apr 2019 23:09:57 +0300 Subject: [PATCH] Detection status waiter #264 (#265) --- .../controllers/analytic_controller.ts | 107 ++++++++++++++---- src/panel/graph_panel/graph_ctrl.ts | 41 ++++--- .../graph_panel/services/analytic_service.ts | 70 ++++++++---- 3 files changed, 156 insertions(+), 62 deletions(-) diff --git a/src/panel/graph_panel/controllers/analytic_controller.ts b/src/panel/graph_panel/controllers/analytic_controller.ts index b0a75fd..bcca5c7 100644 --- a/src/panel/graph_panel/controllers/analytic_controller.ts +++ b/src/panel/graph_panel/controllers/analytic_controller.ts @@ -14,7 +14,7 @@ import { SegmentsSet } from '../models/segment_set'; import { SegmentArray } from '../models/segment_array'; import { HasticServerInfo, HasticServerInfoUnknown } from '../models/hastic_server_info'; import { Threshold, Condition } from '../models/threshold'; -import { DetectionStatus, DETECTION_STATUS_TEXT } from '../models/detection'; +import { DetectionStatus, DETECTION_STATUS_TEXT, DetectionSpan } from '../models/detection'; import text from '../partials/help_section.html'; import { @@ -45,6 +45,7 @@ export class AnalyticController { private _tempIdCounted: number = -1; private _graphLocked: boolean = false; private _statusRunners: Set = new Set(); + private _detectionRunners: Set = new Set(); private _serverInfo: HasticServerInfo; private _currentMetric: MetricExpanded; private _currentDatasource: DatasourceRequest; @@ -225,6 +226,21 @@ export class AnalyticController { this.analyticUnits.forEach(a => this._runStatusWaiter(a)); } + fetchAnalyticUnitsDetections(from: number | null, to: number | null) { + if(from === null || to === null) { + return; + } + this.analyticUnits.forEach(analyticUnit => { + if(analyticUnit.status === 'READY') { + this._runDetectionsWaiter(analyticUnit, from, to); + } + }); + } + + stopAnalyticUnitsDetectionsFetching() { + this.analyticUnits.forEach(analyticUnit => this._detectionRunners.delete(analyticUnit.id)); + } + async fetchAnalyticUnitsDetectionSpans(from: number, to: number): Promise { if(!_.isNumber(+from)) { throw new Error('from isn`t number'); @@ -395,8 +411,10 @@ export class AnalyticController { this.analyticUnits .filter(analyticUnit => analyticUnit.inspect) .forEach(analyticUnit => { - const statuses = analyticUnit.detectionSpans.map(span => span.status); - detectionStatuses = _.concat(detectionStatuses, statuses); + if(analyticUnit.detectionSpans !== undefined) { + const statuses = analyticUnit.detectionSpans.map(span => span.status); + detectionStatuses = _.concat(detectionStatuses, statuses); + } }); detectionStatuses = _.uniq(detectionStatuses); @@ -441,6 +459,8 @@ export class AnalyticController { if(!silent) { await this._analyticService.removeAnalyticUnit(id); } + this._statusRunners.delete(id); + this._detectionRunners.delete(id); this._analyticUnitsSet.removeItem(id); } @@ -513,6 +533,62 @@ export class AnalyticController { } private async _runStatusWaiter(analyticUnit: AnalyticUnit) { + const statusGenerator = this._analyticService.getStatusGenerator( + analyticUnit.id, 1000 + ); + + return this._runWaiter<{ status: string, errorMessage?: string }>( + analyticUnit, + this._statusRunners, + statusGenerator, + (data) => { + const status = data.status; + const error = data.errorMessage; + if(analyticUnit.status !== status) { + analyticUnit.status = status; + if(error !== undefined) { + analyticUnit.error = error; + } + this._emitter.emit('analytic-unit-status-change', analyticUnit); + } + if(!analyticUnit.isActiveStatus) { + return true; + } + return false; + } + ); + } + + // TODO: range type with "from" and "to" fields + private async _runDetectionsWaiter(analyticUnit: AnalyticUnit, from: number, to: number) { + const detectionsGenerator = this._analyticService.getDetectionsGenerator(analyticUnit.id, from, to, 1000); + + return this._runWaiter( + analyticUnit, + this._detectionRunners, + detectionsGenerator, + (data) => { + if(!_.isEqual(data, analyticUnit.detectionSpans)) { + this._emitter.emit('analytic-unit-status-change', analyticUnit); + } + analyticUnit.detectionSpans = data; + let isFinished = true; + for (let detection of data) { + if(detection.status === DetectionStatus.RUNNING) { + isFinished = false; + } + } + return isFinished; + } + ); + } + + private async _runWaiter( + analyticUnit: AnalyticUnit, + runners: Set, + generator: AsyncIterableIterator, + iteration: (data: T) => boolean + ) { if(this._analyticService === undefined) { return; } @@ -524,35 +600,26 @@ export class AnalyticController { throw new Error('analyticUnit.id is undefined'); } - if(this._statusRunners.has(analyticUnit.id)) { + if(runners.has(analyticUnit.id)) { return; } - this._statusRunners.add(analyticUnit.id); - - var statusGenerator = this._analyticService.getStatusGenerator( - analyticUnit.id, 1000 - ); + runners.add(analyticUnit.id); - for await (const data of statusGenerator) { + for await (const data of generator) { if(data === undefined) { break; } - let status = data.status; - let error = data.errorMessage; - if(analyticUnit.status !== status) { - analyticUnit.status = status; - if(error !== undefined) { - analyticUnit.error = error; - } - this._emitter.emit('analytic-unit-status-change', analyticUnit); + if(!runners.has(analyticUnit.id)) { + break; } - if(!analyticUnit.isActiveStatus) { + const shouldBreak = iteration(data); + if(shouldBreak) { break; } } - this._statusRunners.delete(analyticUnit.id); + runners.delete(analyticUnit.id); } public getNewTempSegmentId(): SegmentId { diff --git a/src/panel/graph_panel/graph_ctrl.ts b/src/panel/graph_panel/graph_ctrl.ts index d125d53..557fcce 100644 --- a/src/panel/graph_panel/graph_ctrl.ts +++ b/src/panel/graph_panel/graph_ctrl.ts @@ -58,6 +58,11 @@ class GraphCtrl extends MetricsPanelCtrl { private _grafanaUrl: string; private _panelId: string; + private _dataTimerange: { + from: number, + to: number + }; + panelDefaults = { // datasource name, null = default datasource datasource: null, @@ -392,46 +397,40 @@ class GraphCtrl extends MetricsPanelCtrl { } if(this.analyticsController !== undefined) { + this.analyticsController.stopAnalyticUnitsDetectionsFetching(); const from = +this.range.from; const to = +this.range.to; const loadTasks = [ // this.annotationsPromise, - this.analyticsController.fetchAnalyticUnitsSegments(from, to), - // TODO: run detection status waiter if detection state !== 'READY' - this.analyticsController.fetchAnalyticUnitsDetectionSpans(from, to) + this.analyticsController.fetchAnalyticUnitsSegments(from, to) ]; await Promise.all(loadTasks); this.loading = false; // this.annotations = results[0].annotations; this.render(this.seriesList); + this.analyticsController.fetchAnalyticUnitsDetections( + this._dataTimerange.from, + this._dataTimerange.to + ); } } onRender(data) { - if (!this.seriesList) { + if(!this.seriesList) { return; } - for (let series of this.seriesList) { - if (series.prediction) { - var overrideItem = _.find( - this.panel.seriesOverrides, - el => el.alias === series.alias - ) - if (overrideItem !== undefined) { - this.addSeriesOverride({ - alias: series.alias, - linewidth: 0, - legend: false, - // if pointradius === 0 -> point still shows, that's why pointradius === -1 - pointradius: -1, - fill: 3 - }); - } + for(let series of this.seriesList) { + const from = _.find(series.datapoints, datapoint => datapoint[0] !== null); + const to = _.findLast(series.datapoints, datapoint => datapoint[0] !== null); + + if(from !== undefined && to !== undefined) { + this._dataTimerange = { from: from[1], to: to[1] }; + } else { + this._dataTimerange = { from: null, to: null } } - series.applySeriesOverrides(this.panel.seriesOverrides); if (series.unit) { this.panel.yaxes[series.yaxis - 1].format = series.unit; diff --git a/src/panel/graph_panel/services/analytic_service.ts b/src/panel/graph_panel/services/analytic_service.ts index d73428d..250c5bc 100644 --- a/src/panel/graph_panel/services/analytic_service.ts +++ b/src/panel/graph_panel/services/analytic_service.ts @@ -167,31 +167,39 @@ export class AnalyticService { return segments.map(s => new AnalyticSegment(s.labeled, s.id, s.from, s.to, s.deleted)); } - async * getStatusGenerator(id: AnalyticUnitId, duration: number): - AsyncIterableIterator<{ status: string, errorMessage?: string }> { - - if(id === undefined) { - throw new Error('id is undefined'); - } - let statusCheck = async () => { - try { - return await this.get('/analyticUnits/status', { id }); - } catch(error) { - if(error.status === 404) { - return { status: '404' }; + getStatusGenerator( + id: AnalyticUnitId, + duration: number + ): AsyncIterableIterator<{ status: string, errorMessage?: string }> { + return getGenerator<{ status: string, errorMessage?: string }>( + id, + duration, + async (id) => { + try { + return this.get('/analyticUnits/status', { id }); + } catch(error) { + if(error.status === 404) { + return { status: '404' }; + } + throw error; } - throw error; } - } - - let timeout = async () => new Promise( - resolve => setTimeout(resolve, duration) ); + } - while(true) { - yield await statusCheck(); - await timeout(); - } + getDetectionsGenerator( + id: AnalyticUnitId, + from: number, + to: number, + duration: number + ): AsyncIterableIterator { + return getGenerator( + id, + duration, + this.getDetectionSpans.bind(this), + from, + to + ); } async getServerInfo(): Promise { @@ -322,3 +330,23 @@ export class AnalyticService { return this._isUp; } } + +async function *getGenerator( + id: AnalyticUnitId, + duration: number, + func: (...args: any[]) => Promise, + ...args +): AsyncIterableIterator { + if(id === undefined) { + throw new Error('id is undefined'); + } + + let timeout = async () => new Promise( + resolve => setTimeout(resolve, duration) + ); + + while(true) { + yield await func(id, ...args); + await timeout(); + } +}