|
|
@ -20,18 +20,20 @@ export class DataPuller { |
|
|
|
constructor(private analyticsService: AnalyticsService) {}; |
|
|
|
constructor(private analyticsService: AnalyticsService) {}; |
|
|
|
|
|
|
|
|
|
|
|
public addUnit(analyticUnit: AnalyticUnit.AnalyticUnit) { |
|
|
|
public addUnit(analyticUnit: AnalyticUnit.AnalyticUnit) { |
|
|
|
|
|
|
|
console.log(`start pulling analytic unit ${analyticUnit.id}`); |
|
|
|
this._runAnalyticUnitPuller(analyticUnit); |
|
|
|
this._runAnalyticUnitPuller(analyticUnit); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
public deleteUnit(analyticUnitId: AnalyticUnit.AnalyticUnitId) { |
|
|
|
public deleteUnit(analyticUnitId: AnalyticUnit.AnalyticUnitId) { |
|
|
|
if(_.has(this._unitTimes, analyticUnitId)) { |
|
|
|
if(_.has(this._unitTimes, analyticUnitId)) { |
|
|
|
delete this._unitTimes[analyticUnitId]; |
|
|
|
delete this._unitTimes[analyticUnitId]; |
|
|
|
|
|
|
|
console.log(`analytic unit ${analyticUnitId} deleted from data puller`); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
private async pullData(unit: AnalyticUnit.AnalyticUnit, from: number, to: number): Promise<MetricDataChunk> { |
|
|
|
private async pullData(unit: AnalyticUnit.AnalyticUnit, from: number, to: number): Promise<MetricDataChunk> { |
|
|
|
if(unit === undefined) { |
|
|
|
if(unit === undefined) { |
|
|
|
throw Error(`puller: can't pull undefined unit`); |
|
|
|
throw Error(`data puller: can't pull undefined unit`); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
return queryByMetric(unit.metric, unit.panelUrl, from, to, HASTIC_API_KEY); |
|
|
|
return queryByMetric(unit.metric, unit.panelUrl, from, to, HASTIC_API_KEY); |
|
|
@ -39,16 +41,23 @@ export class DataPuller { |
|
|
|
|
|
|
|
|
|
|
|
private pushData(unit: AnalyticUnit.AnalyticUnit, data: any) { |
|
|
|
private pushData(unit: AnalyticUnit.AnalyticUnit, data: any) { |
|
|
|
if(unit === undefined || data === undefined) { |
|
|
|
if(unit === undefined || data === undefined) { |
|
|
|
throw Error(`can't push unit: ${unit} data: ${data}`); |
|
|
|
throw Error(`data puller can't push unit: ${unit} data: ${data}`); |
|
|
|
} |
|
|
|
} |
|
|
|
let task = new AnalyticsTask(unit.id, AnalyticsTaskType.PUSH, data); |
|
|
|
let task = new AnalyticsTask(unit.id, AnalyticsTaskType.PUSH, data); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
try { |
|
|
|
this.analyticsService.sendTask(task); |
|
|
|
this.analyticsService.sendTask(task); |
|
|
|
|
|
|
|
} catch(e) { |
|
|
|
|
|
|
|
console.log(`data puller got error while push data ${e.message}`); |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
//TODO: group analyticUnits by panelID and send same dataset for group
|
|
|
|
//TODO: group analyticUnits by panelID and send same dataset for group
|
|
|
|
public async runPuller() { |
|
|
|
public async runPuller() { |
|
|
|
const analyticUnits = await AnalyticUnit.findMany({ alert: true }); |
|
|
|
const analyticUnits = await AnalyticUnit.findMany({ alert: true }); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
console.log(`starting data puller with ${JSON.stringify(analyticUnits.map(u => u.id))} analytic units`); |
|
|
|
|
|
|
|
|
|
|
|
_.each(analyticUnits, analyticUnit => { |
|
|
|
_.each(analyticUnits, analyticUnit => { |
|
|
|
this._runAnalyticUnitPuller(analyticUnit); |
|
|
|
this._runAnalyticUnitPuller(analyticUnit); |
|
|
|
}); |
|
|
|
}); |
|
|
@ -58,6 +67,7 @@ export class DataPuller { |
|
|
|
|
|
|
|
|
|
|
|
public stopPuller() { |
|
|
|
public stopPuller() { |
|
|
|
this._unitTimes = {}; |
|
|
|
this._unitTimes = {}; |
|
|
|
|
|
|
|
console.log('Data puller stopped'); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
private async _runAnalyticUnitPuller(analyticUnit: AnalyticUnit.AnalyticUnit) { |
|
|
|
private async _runAnalyticUnitPuller(analyticUnit: AnalyticUnit.AnalyticUnit) { |
|
|
@ -99,6 +109,13 @@ export class DataPuller { |
|
|
|
async * getDataGenerator(analyticUnit: AnalyticUnit.AnalyticUnit, duration: number): |
|
|
|
async * getDataGenerator(analyticUnit: AnalyticUnit.AnalyticUnit, duration: number): |
|
|
|
AsyncIterableIterator<MetricDataChunk> { |
|
|
|
AsyncIterableIterator<MetricDataChunk> { |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if(!this.analyticsService.ready) { |
|
|
|
|
|
|
|
return { |
|
|
|
|
|
|
|
columns: [], |
|
|
|
|
|
|
|
values: [] |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
const getData = async () => { |
|
|
|
const getData = async () => { |
|
|
|
try { |
|
|
|
try { |
|
|
|
const time = this._unitTimes[analyticUnit.id] |
|
|
|
const time = this._unitTimes[analyticUnit.id] |
|
|
|