From 053e1a4e9db28effcb0f0ba82c090bceab8d1b42 Mon Sep 17 00:00:00 2001 From: rozetko Date: Fri, 7 Dec 2018 00:54:00 +0300 Subject: [PATCH] Update Data Puller state #294 (#295) * Get analytic unit from db each puller tick * Use async iterator to pull and push data * Fix webpack config for proper babel-polyfill usage * Add MetricDataChunk type * Add pattern field to payload --- server/build/webpack.614.prod.conf.js | 2 +- server/build/webpack.base.conf.js | 2 +- .../src/controllers/analytics_controller.ts | 15 ++- server/src/routes/analytic_units_router.ts | 2 +- server/src/services/data_puller.ts | 105 ++++++++++-------- 5 files changed, 74 insertions(+), 52 deletions(-) diff --git a/server/build/webpack.614.prod.conf.js b/server/build/webpack.614.prod.conf.js index 4c50954..7f3c49a 100644 --- a/server/build/webpack.614.prod.conf.js +++ b/server/build/webpack.614.prod.conf.js @@ -7,7 +7,7 @@ module.exports = { __dirname: false, __filename: false, }, - entry: [ 'babel-polyfill', './dist/server-dev.js' ], + entry: [ './dist/server-dev.js' ], output: { filename: "server.js", path: path.join(__dirname, '../dist') diff --git a/server/build/webpack.base.conf.js b/server/build/webpack.base.conf.js index 3462909..9b96c3e 100644 --- a/server/build/webpack.base.conf.js +++ b/server/build/webpack.base.conf.js @@ -14,7 +14,7 @@ module.exports = { __dirname: false, __filename: false, }, - entry: [ './src/index.ts' ], + entry: [ 'babel-polyfill', './src/index.ts' ], output: { filename: "server-dev.js", path: resolve('dist') diff --git a/server/src/controllers/analytics_controller.ts b/server/src/controllers/analytics_controller.ts index 6caff6f..2398ffa 100644 --- a/server/src/controllers/analytics_controller.ts +++ b/server/src/controllers/analytics_controller.ts @@ -286,13 +286,20 @@ export async function createAnalyticUnitFromObject(obj: any): Promise { + if(unit === undefined) { throw Error(`puller: can't pull undefined unit`); } return queryByMetric(unit.metric, unit.panelUrl, from, to, HASTIC_API_KEY); @@ -48,46 +44,65 @@ export class DataPuller { } //TODO: group analyticUnits by panelID and send same dataset for group - public runPuller() { - this._timer = setTimeout(this.puller.bind(this), this._interval); + public async runPuller() { + const analyticUnits = await AnalyticUnit.findMany({ alert: true }); + + _.each(analyticUnits, analyticUnit => { + this._runAnalyticUnitPuller(analyticUnit); + }); + console.log('Data puller runned'); } public stopPuller() { - if(this._timer) { - clearTimeout(this._timer); - this._timer = null; - this._interval = 0; - console.log('Data puller stopped'); - } - console.log('Data puller already stopped'); + this._unitTimes = {}; } - private async puller() { - - if(_.isEmpty(this._unitTimes)) { - this._interval = this.PULL_PERIOD_MS; - this._timer = setTimeout(this.puller.bind(this), this._interval); - return; - } + private async _runAnalyticUnitPuller(analyticUnit: AnalyticUnit.AnalyticUnit) { + const time = analyticUnit.lastDetectionTime || Date.now(); + this._unitTimes[analyticUnit.id] = time; - let now = Date.now(); + const dataGenerator = this.getDataGenerator( + analyticUnit, PULL_PERIOD_MS + ); - _.forOwn(this._unitTimes, async value => { - if(!value.unit.alert) { - return; + for await (const data of dataGenerator) { + if(!_.has(this._unitTimes, analyticUnit.id)) { + break; } - let data = await this.pullData(value.unit, value.time, now); + if(data.values.length === 0) { - return; + continue; } - let payload = { data, from: value.time, to: now}; - value.time = now; - this.pushData(value.unit, payload); - }); - - this._timer = setTimeout(this.puller.bind(this), this._interval); + const now = Date.now(); + let payload = { data, from: time, to: now, pattern: analyticUnit.type }; + this._unitTimes[analyticUnit.id] = now; + this.pushData(analyticUnit, payload); + } + } + + async * getDataGenerator(analyticUnit: AnalyticUnit.AnalyticUnit, duration: number): + AsyncIterableIterator { + + const getData = async () => { + try { + const time = this._unitTimes[analyticUnit.id] + const now = Date.now(); + return await this.pullData(analyticUnit, time, now); + } catch(err) { + throw new Error(`Error while pulling data: ${err.message}`); + } + } + + const timeout = async () => new Promise( + resolve => setTimeout(resolve, duration) + ); + + while(true) { + yield await getData(); + await timeout(); + } } }