Support multiple queries #6

Merged
rozetko merged 7 commits from multiple-queries into master 2 years ago
  1. 1
      package.json
  2. 8
      src/models/target.ts
  3. 13
      src/routes/tasks.ts
  4. 121
      src/services/exporter.ts
  5. 3
      src/types.ts
  6. 6
      src/utils.ts
  7. 9
      yarn.lock

1
package.json

@ -17,6 +17,7 @@
"fast-csv": "^2.5.0", "fast-csv": "^2.5.0",
"lodash": "^4.17.21", "lodash": "^4.17.21",
"moment": "^2.29.4", "moment": "^2.29.4",
"moment-timezone": "^0.5.40",
"nodemon": "^2.0.20", "nodemon": "^2.0.20",
"ts-loader": "^9.4.2", "ts-loader": "^9.4.2",
"typescript": "^4.9.4", "typescript": "^4.9.4",

8
src/models/target.ts

@ -1,8 +0,0 @@
import { DataSourceSettings, PanelModel } from '../types';
export class Target {
constructor(
public panel: PanelModel,
public datasource: DataSourceSettings,
) {}
}

13
src/routes/tasks.ts

@ -14,6 +14,7 @@ type TRequest = {
body: { body: {
task: ExportTask, task: ExportTask,
url: string, url: string,
timeZoneName: string,
}, },
}; };
@ -22,7 +23,7 @@ async function getTasks(req, res) {
fs.readdir(EXPORTED_PATH, (err, items) => { fs.readdir(EXPORTED_PATH, (err, items) => {
if(err) { if(err) {
console.error(err); console.error(err);
res.status(500).send('Something went wrong'); res.status(500).send(err.message);
} else { } else {
for(let item of items) { for(let item of items) {
let file = path.parse(item); let file = path.parse(item);
@ -31,8 +32,12 @@ async function getTasks(req, res) {
} }
// TODO: read async // TODO: read async
let data = fs.readFileSync(path.join(EXPORTED_PATH, item), 'utf8'); let data = fs.readFileSync(path.join(EXPORTED_PATH, item), 'utf8');
let status = JSON.parse(data); try {
resp.push(status); let status = JSON.parse(data);
resp.push(status);
} catch(e) {
console.log(`Cannot read file /exporter/${item}. if this error doesn't repeat, maybe the file is being updated at the moment`);
}
} }
res.status(200).send(resp); res.status(200).send(resp);
@ -63,7 +68,7 @@ async function addTask(req: TRequest, res) {
} }
const exporter = exporterFactory.getExporter(); const exporter = exporterFactory.getExporter();
exporter.export(task, datasourceUrl); exporter.export(task, datasourceUrl, body.timeZoneName);
res.status(200).send(`Export process started`); res.status(200).send(`Export process started`);
} }

121
src/services/exporter.ts

@ -1,7 +1,6 @@
import { Target } from '../models/target';
import { URL } from 'url'; import { URL } from 'url';
import { apiKeys } from '../config'; import { apiKeys } from '../config';
import { promisify } from '../utils'; import { promisify, toIsoString } from '../utils';
import { DashboardQuery, ExportProgress, ExportStatus, ExportTask } from '../types'; import { DashboardQuery, ExportProgress, ExportStatus, ExportTask } from '../types';
import { QueryConfig, queryByConfig } from '@corpglory/tsdb-kit'; import { QueryConfig, queryByConfig } from '@corpglory/tsdb-kit';
@ -27,40 +26,36 @@ const DEFAULT_PROGRESS = {
export class Exporter { export class Exporter {
private _task: ExportTask; private _task: ExportTask;
public async export(task: ExportTask, datasourceUrl: string) { public async export(task: ExportTask, datasourceUrl: string, timeZoneName: string) {
this._task = _.cloneDeep(task); try {
this._task.id = uuidv4(); this._task = _.cloneDeep(task);
this._task.progress = _.cloneDeep(DEFAULT_PROGRESS); this._task.id = uuidv4();
this._task.progress = _.cloneDeep(DEFAULT_PROGRESS);
const targets = task.queries.map((query: DashboardQuery) => new Target(
query.panel, this._validateQueries(task.queries);
query.datasource, this._validateDatasourceUrl(datasourceUrl);
));
await this._updateProgress();
this._validateTargets(datasourceUrl, targets);
const queryConfigs = task.queries.map(
await this._updateProgress(); query =>
new QueryConfig(
const queryConfigs = targets.map( QueryType.GRAFANA,
target => {
new QueryConfig( ...query.datasource,
QueryType.GRAFANA, url: datasourceUrl,
{ },
...target.datasource, [query.target]
url: datasourceUrl )
}, );
target.panel.targets
)
);
let from = +this._task.timeRange.from; let from = +this._task.timeRange.from;
let to = +this._task.timeRange.to; let to = +this._task.timeRange.to;
const days = Math.ceil((to - from) / MS_IN_DAY); const days = Math.ceil((to - from) / MS_IN_DAY);
console.log(`Total days: ${days}`); console.log(`Total days: ${days}`);
const stream = this._initCsvStream(); const stream = this._initCsvStream();
try {
for(let day = 0; day < days; day++) { for(let day = 0; day < days; day++) {
to = from + MS_IN_DAY; to = from + MS_IN_DAY;
@ -75,21 +70,40 @@ export class Exporter {
const datasourceMetrics = await queryByConfig(queryConfig, datasourceUrl, from, to, apiKey); const datasourceMetrics = await queryByConfig(queryConfig, datasourceUrl, from, to, apiKey);
columns = datasourceMetrics.columns; if(_.isEmpty(columns)) {
values = datasourceMetrics.values; columns = datasourceMetrics.columns;
} else {
columns = _.concat(columns, datasourceMetrics.columns.slice(1));
}
if(_.isEmpty(values)) {
values = datasourceMetrics.values;
} else {
if(values.length !== datasourceMetrics.values.length) {
throw new Error(`All queries should return rows of the same lengths`);
}
for(const rowIdx in values) {
if(datasourceMetrics.values[rowIdx][0] !== values[rowIdx][0]) {
throw new Error('Queries should return the same timestamps');
}
values[rowIdx] = _.concat(values[rowIdx], datasourceMetrics.values[rowIdx].slice(1));
}
}
} }
values = values.map((row: number[]) => [toIsoString(row[0], timeZoneName), ...row.slice(1)]);
if(columns.length > 0) { if(columns.length > 0) {
this._writeCsv(stream, { columns, values, }); this._writeCsv(stream, { columns, values });
} }
await this._updateProgress({ status: ExportStatus.EXPORTING, progress: (day + 1) / days }); await this._updateProgress({ status: ExportStatus.EXPORTING, progress: (day + 1) / days });
from += MS_IN_DAY; from += MS_IN_DAY;
} }
stream.end();
} catch (e) { } catch (e) {
await this._updateProgress({ status: ExportStatus.ERROR, errorMessage: e.message }); await this._updateProgress({ status: ExportStatus.ERROR, errorMessage: e.message });
} }
stream.end();
} }
private _initCsvStream() { private _initCsvStream() {
@ -127,15 +141,22 @@ export class Exporter {
} }
} }
private _validateTargets(datasourceUrl: string, targets: Target[]) { private _validateQueries(queries: DashboardQuery[]) {
if(!targets || !Array.isArray(targets)) { if(!_.isArray(queries)) {
throw new Error('Incorrect targets format'); throw new Error('`queries` field is required and should be an array');
} }
if(targets.length > 1) { for(const query of queries) {
throw new Error(`Multiple queries are not supported yet`); if(_.isEmpty(query.datasource)) {
throw new Error('all queries should have a `datasource` field');
}
if(_.isEmpty(query.target)) {
throw new Error('all queries should have a `target` field');
}
} }
}
private _validateDatasourceUrl(datasourceUrl: string) {
const host = new URL(datasourceUrl).origin; const host = new URL(datasourceUrl).origin;
const apiKey = apiKeys[host]; const apiKey = apiKeys[host];
@ -146,18 +167,12 @@ export class Exporter {
private _writeCsv(stream, series) { private _writeCsv(stream, series) {
for(let row of series.values) { for(let row of series.values) {
const isEmpty = _.every( let csvRow = {};
_.slice(row, 1), for(let col in series.columns) {
val => val === null csvRow[series.columns[col]] = row[col];
);
if(!isEmpty) {
let csvRow = {};
for(let col in series.columns) {
csvRow[series.columns[col]] = row[col];
}
stream.write(csvRow);
this._task.progress.exportedRowsCount++;
} }
stream.write(csvRow);
this._task.progress.exportedRowsCount++;
} }
} }

3
src/types.ts

@ -105,8 +105,9 @@ export type ExportTask = {
id?: string; id?: string;
}; };
export type DashboardQuery = DataQuery & { export type DashboardQuery = {
selected: boolean; selected: boolean;
target: DataQuery;
panel: PanelModel; panel: PanelModel;
datasource: DataSourceSettings; datasource: DataSourceSettings;
}; };

6
src/utils.ts

@ -1,3 +1,5 @@
import * as moment from 'moment-timezone';
export async function promisify(method: (...params: any[]) => Promise<any> | void, ...params: any[]) { export async function promisify(method: (...params: any[]) => Promise<any> | void, ...params: any[]) {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
method(...params, (err, result) => { method(...params, (err, result) => {
@ -9,3 +11,7 @@ export async function promisify(method: (...params: any[]) => Promise<any> | voi
}) })
}); });
} }
export function toIsoString(msTimestamp: number, timeZone: string): string {
return moment.tz(msTimestamp, timeZone).format('YYYY-MM-DD HH:mm:ssZ').replace(/:00$/, '');
}

9
yarn.lock

@ -1046,7 +1046,14 @@ minimatch@^3.1.2:
dependencies: dependencies:
brace-expansion "^1.1.7" brace-expansion "^1.1.7"
moment@^2.22.2, moment@^2.29.4: moment-timezone@^0.5.40:
version "0.5.40"
resolved "https://registry.yarnpkg.com/moment-timezone/-/moment-timezone-0.5.40.tgz#c148f5149fd91dd3e29bf481abc8830ecba16b89"
integrity sha512-tWfmNkRYmBkPJz5mr9GVDn9vRlVZOTe6yqY92rFxiOdWXbjaR0+9LwQnZGGuNR63X456NqmEkbskte8tWL5ePg==
dependencies:
moment ">= 2.9.0"
"moment@>= 2.9.0", moment@^2.22.2, moment@^2.29.4:
version "2.29.4" version "2.29.4"
resolved "https://registry.yarnpkg.com/moment/-/moment-2.29.4.tgz#3dbe052889fe7c1b2ed966fcb3a77328964ef108" resolved "https://registry.yarnpkg.com/moment/-/moment-2.29.4.tgz#3dbe052889fe7c1b2ed966fcb3a77328964ef108"
integrity sha512-5LC9SOxjSc2HF6vO2CyuTDNivEdoz2IvyJJGj6X8DJ0eFyfszE0QiEd+iXmBvUP3WHxSjFH/vIsA0EN00cgr8w== integrity sha512-5LC9SOxjSc2HF6vO2CyuTDNivEdoz2IvyJJGj6X8DJ0eFyfszE0QiEd+iXmBvUP3WHxSjFH/vIsA0EN00cgr8w==

Loading…
Cancel
Save