Browse Source

Detection status waiter #264 (#265)

master
rozetko 5 years ago committed by GitHub
parent
commit
2bbaf18040
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 107
      src/panel/graph_panel/controllers/analytic_controller.ts
  2. 41
      src/panel/graph_panel/graph_ctrl.ts
  3. 70
      src/panel/graph_panel/services/analytic_service.ts

107
src/panel/graph_panel/controllers/analytic_controller.ts

@ -14,7 +14,7 @@ import { SegmentsSet } from '../models/segment_set';
import { SegmentArray } from '../models/segment_array';
import { HasticServerInfo, HasticServerInfoUnknown } from '../models/hastic_server_info';
import { Threshold, Condition } from '../models/threshold';
import { DetectionStatus, DETECTION_STATUS_TEXT } from '../models/detection';
import { DetectionStatus, DETECTION_STATUS_TEXT, DetectionSpan } from '../models/detection';
import text from '../partials/help_section.html';
import {
@ -45,6 +45,7 @@ export class AnalyticController {
private _tempIdCounted: number = -1;
private _graphLocked: boolean = false;
private _statusRunners: Set<AnalyticUnitId> = new Set<AnalyticUnitId>();
private _detectionRunners: Set<AnalyticUnitId> = new Set<AnalyticUnitId>();
private _serverInfo: HasticServerInfo;
private _currentMetric: MetricExpanded;
private _currentDatasource: DatasourceRequest;
@ -225,6 +226,21 @@ export class AnalyticController {
this.analyticUnits.forEach(a => this._runStatusWaiter(a));
}
fetchAnalyticUnitsDetections(from: number | null, to: number | null) {
if(from === null || to === null) {
return;
}
this.analyticUnits.forEach(analyticUnit => {
if(analyticUnit.status === 'READY') {
this._runDetectionsWaiter(analyticUnit, from, to);
}
});
}
stopAnalyticUnitsDetectionsFetching() {
this.analyticUnits.forEach(analyticUnit => this._detectionRunners.delete(analyticUnit.id));
}
async fetchAnalyticUnitsDetectionSpans(from: number, to: number): Promise<void[]> {
if(!_.isNumber(+from)) {
throw new Error('from isn`t number');
@ -395,8 +411,10 @@ export class AnalyticController {
this.analyticUnits
.filter(analyticUnit => analyticUnit.inspect)
.forEach(analyticUnit => {
const statuses = analyticUnit.detectionSpans.map(span => span.status);
detectionStatuses = _.concat(detectionStatuses, statuses);
if(analyticUnit.detectionSpans !== undefined) {
const statuses = analyticUnit.detectionSpans.map(span => span.status);
detectionStatuses = _.concat(detectionStatuses, statuses);
}
});
detectionStatuses = _.uniq(detectionStatuses);
@ -441,6 +459,8 @@ export class AnalyticController {
if(!silent) {
await this._analyticService.removeAnalyticUnit(id);
}
this._statusRunners.delete(id);
this._detectionRunners.delete(id);
this._analyticUnitsSet.removeItem(id);
}
@ -513,6 +533,62 @@ export class AnalyticController {
}
private async _runStatusWaiter(analyticUnit: AnalyticUnit) {
const statusGenerator = this._analyticService.getStatusGenerator(
analyticUnit.id, 1000
);
return this._runWaiter<{ status: string, errorMessage?: string }>(
analyticUnit,
this._statusRunners,
statusGenerator,
(data) => {
const status = data.status;
const error = data.errorMessage;
if(analyticUnit.status !== status) {
analyticUnit.status = status;
if(error !== undefined) {
analyticUnit.error = error;
}
this._emitter.emit('analytic-unit-status-change', analyticUnit);
}
if(!analyticUnit.isActiveStatus) {
return true;
}
return false;
}
);
}
// TODO: range type with "from" and "to" fields
private async _runDetectionsWaiter(analyticUnit: AnalyticUnit, from: number, to: number) {
const detectionsGenerator = this._analyticService.getDetectionsGenerator(analyticUnit.id, from, to, 1000);
return this._runWaiter<DetectionSpan[]>(
analyticUnit,
this._detectionRunners,
detectionsGenerator,
(data) => {
if(!_.isEqual(data, analyticUnit.detectionSpans)) {
this._emitter.emit('analytic-unit-status-change', analyticUnit);
}
analyticUnit.detectionSpans = data;
let isFinished = true;
for (let detection of data) {
if(detection.status === DetectionStatus.RUNNING) {
isFinished = false;
}
}
return isFinished;
}
);
}
private async _runWaiter<T>(
analyticUnit: AnalyticUnit,
runners: Set<AnalyticUnitId>,
generator: AsyncIterableIterator<T>,
iteration: (data: T) => boolean
) {
if(this._analyticService === undefined) {
return;
}
@ -524,35 +600,26 @@ export class AnalyticController {
throw new Error('analyticUnit.id is undefined');
}
if(this._statusRunners.has(analyticUnit.id)) {
if(runners.has(analyticUnit.id)) {
return;
}
this._statusRunners.add(analyticUnit.id);
var statusGenerator = this._analyticService.getStatusGenerator(
analyticUnit.id, 1000
);
runners.add(analyticUnit.id);
for await (const data of statusGenerator) {
for await (const data of generator) {
if(data === undefined) {
break;
}
let status = data.status;
let error = data.errorMessage;
if(analyticUnit.status !== status) {
analyticUnit.status = status;
if(error !== undefined) {
analyticUnit.error = error;
}
this._emitter.emit('analytic-unit-status-change', analyticUnit);
if(!runners.has(analyticUnit.id)) {
break;
}
if(!analyticUnit.isActiveStatus) {
const shouldBreak = iteration(data);
if(shouldBreak) {
break;
}
}
this._statusRunners.delete(analyticUnit.id);
runners.delete(analyticUnit.id);
}
public getNewTempSegmentId(): SegmentId {

41
src/panel/graph_panel/graph_ctrl.ts

@ -58,6 +58,11 @@ class GraphCtrl extends MetricsPanelCtrl {
private _grafanaUrl: string;
private _panelId: string;
private _dataTimerange: {
from: number,
to: number
};
panelDefaults = {
// datasource name, null = default datasource
datasource: null,
@ -392,46 +397,40 @@ class GraphCtrl extends MetricsPanelCtrl {
}
if(this.analyticsController !== undefined) {
this.analyticsController.stopAnalyticUnitsDetectionsFetching();
const from = +this.range.from;
const to = +this.range.to;
const loadTasks = [
// this.annotationsPromise,
this.analyticsController.fetchAnalyticUnitsSegments(from, to),
// TODO: run detection status waiter if detection state !== 'READY'
this.analyticsController.fetchAnalyticUnitsDetectionSpans(from, to)
this.analyticsController.fetchAnalyticUnitsSegments(from, to)
];
await Promise.all(loadTasks);
this.loading = false;
// this.annotations = results[0].annotations;
this.render(this.seriesList);
this.analyticsController.fetchAnalyticUnitsDetections(
this._dataTimerange.from,
this._dataTimerange.to
);
}
}
onRender(data) {
if (!this.seriesList) {
if(!this.seriesList) {
return;
}
for (let series of this.seriesList) {
if (series.prediction) {
var overrideItem = _.find(
this.panel.seriesOverrides,
el => el.alias === series.alias
)
if (overrideItem !== undefined) {
this.addSeriesOverride({
alias: series.alias,
linewidth: 0,
legend: false,
// if pointradius === 0 -> point still shows, that's why pointradius === -1
pointradius: -1,
fill: 3
});
}
for(let series of this.seriesList) {
const from = _.find(series.datapoints, datapoint => datapoint[0] !== null);
const to = _.findLast(series.datapoints, datapoint => datapoint[0] !== null);
if(from !== undefined && to !== undefined) {
this._dataTimerange = { from: from[1], to: to[1] };
} else {
this._dataTimerange = { from: null, to: null }
}
series.applySeriesOverrides(this.panel.seriesOverrides);
if (series.unit) {
this.panel.yaxes[series.yaxis - 1].format = series.unit;

70
src/panel/graph_panel/services/analytic_service.ts

@ -167,31 +167,39 @@ export class AnalyticService {
return segments.map(s => new AnalyticSegment(s.labeled, s.id, s.from, s.to, s.deleted));
}
async * getStatusGenerator(id: AnalyticUnitId, duration: number):
AsyncIterableIterator<{ status: string, errorMessage?: string }> {
if(id === undefined) {
throw new Error('id is undefined');
}
let statusCheck = async () => {
try {
return await this.get('/analyticUnits/status', { id });
} catch(error) {
if(error.status === 404) {
return { status: '404' };
getStatusGenerator(
id: AnalyticUnitId,
duration: number
): AsyncIterableIterator<{ status: string, errorMessage?: string }> {
return getGenerator<{ status: string, errorMessage?: string }>(
id,
duration,
async (id) => {
try {
return this.get('/analyticUnits/status', { id });
} catch(error) {
if(error.status === 404) {
return { status: '404' };
}
throw error;
}
throw error;
}
}
let timeout = async () => new Promise(
resolve => setTimeout(resolve, duration)
);
}
while(true) {
yield await statusCheck();
await timeout();
}
getDetectionsGenerator(
id: AnalyticUnitId,
from: number,
to: number,
duration: number
): AsyncIterableIterator<DetectionSpan[]> {
return getGenerator<DetectionSpan[]>(
id,
duration,
this.getDetectionSpans.bind(this),
from,
to
);
}
async getServerInfo(): Promise<HasticServerInfo> {
@ -322,3 +330,23 @@ export class AnalyticService {
return this._isUp;
}
}
async function *getGenerator<T>(
id: AnalyticUnitId,
duration: number,
func: (...args: any[]) => Promise<T>,
...args
): AsyncIterableIterator<T> {
if(id === undefined) {
throw new Error('id is undefined');
}
let timeout = async () => new Promise(
resolve => setTimeout(resolve, duration)
);
while(true) {
yield await func(id, ...args);
await timeout();
}
}

Loading…
Cancel
Save