Support multiple queries #6

Merged
rozetko merged 7 commits from multiple-queries into master 2 years ago
  1. 1
      package.json
  2. 8
      src/models/target.ts
  3. 13
      src/routes/tasks.ts
  4. 121
      src/services/exporter.ts
  5. 3
      src/types.ts
  6. 6
      src/utils.ts
  7. 9
      yarn.lock

1
package.json

@ -17,6 +17,7 @@
"fast-csv": "^2.5.0",
"lodash": "^4.17.21",
"moment": "^2.29.4",
"moment-timezone": "^0.5.40",
"nodemon": "^2.0.20",
"ts-loader": "^9.4.2",
"typescript": "^4.9.4",

8
src/models/target.ts

@ -1,8 +0,0 @@
import { DataSourceSettings, PanelModel } from '../types';
export class Target {
constructor(
public panel: PanelModel,
public datasource: DataSourceSettings,
) {}
}

13
src/routes/tasks.ts

@ -14,6 +14,7 @@ type TRequest = {
body: {
task: ExportTask,
url: string,
timeZoneName: string,
},
};
@ -22,7 +23,7 @@ async function getTasks(req, res) {
fs.readdir(EXPORTED_PATH, (err, items) => {
if(err) {
console.error(err);
res.status(500).send('Something went wrong');
res.status(500).send(err.message);
} else {
for(let item of items) {
let file = path.parse(item);
@ -31,8 +32,12 @@ async function getTasks(req, res) {
}
// TODO: read async
let data = fs.readFileSync(path.join(EXPORTED_PATH, item), 'utf8');
let status = JSON.parse(data);
resp.push(status);
try {
let status = JSON.parse(data);
resp.push(status);
} catch(e) {
console.log(`Cannot read file /exporter/${item}. if this error doesn't repeat, maybe the file is being updated at the moment`);
}
}
res.status(200).send(resp);
@ -63,7 +68,7 @@ async function addTask(req: TRequest, res) {
}
const exporter = exporterFactory.getExporter();
exporter.export(task, datasourceUrl);
exporter.export(task, datasourceUrl, body.timeZoneName);
res.status(200).send(`Export process started`);
}

121
src/services/exporter.ts

@ -1,7 +1,6 @@
import { Target } from '../models/target';
import { URL } from 'url';
import { apiKeys } from '../config';
import { promisify } from '../utils';
import { promisify, toIsoString } from '../utils';
import { DashboardQuery, ExportProgress, ExportStatus, ExportTask } from '../types';
import { QueryConfig, queryByConfig } from '@corpglory/tsdb-kit';
@ -27,40 +26,36 @@ const DEFAULT_PROGRESS = {
export class Exporter {
private _task: ExportTask;
public async export(task: ExportTask, datasourceUrl: string) {
this._task = _.cloneDeep(task);
this._task.id = uuidv4();
this._task.progress = _.cloneDeep(DEFAULT_PROGRESS);
const targets = task.queries.map((query: DashboardQuery) => new Target(
query.panel,
query.datasource,
));
this._validateTargets(datasourceUrl, targets);
await this._updateProgress();
const queryConfigs = targets.map(
target =>
new QueryConfig(
QueryType.GRAFANA,
{
...target.datasource,
url: datasourceUrl
},
target.panel.targets
)
);
public async export(task: ExportTask, datasourceUrl: string, timeZoneName: string) {
try {
this._task = _.cloneDeep(task);
this._task.id = uuidv4();
this._task.progress = _.cloneDeep(DEFAULT_PROGRESS);
this._validateQueries(task.queries);
this._validateDatasourceUrl(datasourceUrl);
await this._updateProgress();
const queryConfigs = task.queries.map(
query =>
new QueryConfig(
QueryType.GRAFANA,
{
...query.datasource,
url: datasourceUrl,
},
[query.target]
)
);
let from = +this._task.timeRange.from;
let to = +this._task.timeRange.to;
const days = Math.ceil((to - from) / MS_IN_DAY);
let from = +this._task.timeRange.from;
let to = +this._task.timeRange.to;
const days = Math.ceil((to - from) / MS_IN_DAY);
console.log(`Total days: ${days}`);
console.log(`Total days: ${days}`);
const stream = this._initCsvStream();
try {
const stream = this._initCsvStream();
for(let day = 0; day < days; day++) {
to = from + MS_IN_DAY;
@ -75,21 +70,40 @@ export class Exporter {
const datasourceMetrics = await queryByConfig(queryConfig, datasourceUrl, from, to, apiKey);
columns = datasourceMetrics.columns;
values = datasourceMetrics.values;
if(_.isEmpty(columns)) {
columns = datasourceMetrics.columns;
} else {
columns = _.concat(columns, datasourceMetrics.columns.slice(1));
}
if(_.isEmpty(values)) {
values = datasourceMetrics.values;
} else {
if(values.length !== datasourceMetrics.values.length) {
throw new Error(`All queries should return rows of the same lengths`);
}
for(const rowIdx in values) {
if(datasourceMetrics.values[rowIdx][0] !== values[rowIdx][0]) {
throw new Error('Queries should return the same timestamps');
}
values[rowIdx] = _.concat(values[rowIdx], datasourceMetrics.values[rowIdx].slice(1));
}
}
}
values = values.map((row: number[]) => [toIsoString(row[0], timeZoneName), ...row.slice(1)]);
if(columns.length > 0) {
this._writeCsv(stream, { columns, values, });
this._writeCsv(stream, { columns, values });
}
await this._updateProgress({ status: ExportStatus.EXPORTING, progress: (day + 1) / days });
from += MS_IN_DAY;
}
stream.end();
} catch (e) {
await this._updateProgress({ status: ExportStatus.ERROR, errorMessage: e.message });
}
stream.end();
}
private _initCsvStream() {
@ -127,15 +141,22 @@ export class Exporter {
}
}
private _validateTargets(datasourceUrl: string, targets: Target[]) {
if(!targets || !Array.isArray(targets)) {
throw new Error('Incorrect targets format');
private _validateQueries(queries: DashboardQuery[]) {
if(!_.isArray(queries)) {
throw new Error('`queries` field is required and should be an array');
}
if(targets.length > 1) {
throw new Error(`Multiple queries are not supported yet`);
for(const query of queries) {
if(_.isEmpty(query.datasource)) {
throw new Error('all queries should have a `datasource` field');
}
if(_.isEmpty(query.target)) {
throw new Error('all queries should have a `target` field');
}
}
}
private _validateDatasourceUrl(datasourceUrl: string) {
const host = new URL(datasourceUrl).origin;
const apiKey = apiKeys[host];
@ -146,18 +167,12 @@ export class Exporter {
private _writeCsv(stream, series) {
for(let row of series.values) {
const isEmpty = _.every(
_.slice(row, 1),
val => val === null
);
if(!isEmpty) {
let csvRow = {};
for(let col in series.columns) {
csvRow[series.columns[col]] = row[col];
}
stream.write(csvRow);
this._task.progress.exportedRowsCount++;
let csvRow = {};
for(let col in series.columns) {
csvRow[series.columns[col]] = row[col];
}
stream.write(csvRow);
this._task.progress.exportedRowsCount++;
}
}

3
src/types.ts

@ -105,8 +105,9 @@ export type ExportTask = {
id?: string;
};
export type DashboardQuery = DataQuery & {
export type DashboardQuery = {
selected: boolean;
target: DataQuery;
panel: PanelModel;
datasource: DataSourceSettings;
};

6
src/utils.ts

@ -1,3 +1,5 @@
import * as moment from 'moment-timezone';
export async function promisify(method: (...params: any[]) => Promise<any> | void, ...params: any[]) {
return new Promise((resolve, reject) => {
method(...params, (err, result) => {
@ -9,3 +11,7 @@ export async function promisify(method: (...params: any[]) => Promise<any> | voi
})
});
}
export function toIsoString(msTimestamp: number, timeZone: string): string {
return moment.tz(msTimestamp, timeZone).format('YYYY-MM-DD HH:mm:ssZ').replace(/:00$/, '');
}

9
yarn.lock

@ -1046,7 +1046,14 @@ minimatch@^3.1.2:
dependencies:
brace-expansion "^1.1.7"
moment@^2.22.2, moment@^2.29.4:
moment-timezone@^0.5.40:
version "0.5.40"
resolved "https://registry.yarnpkg.com/moment-timezone/-/moment-timezone-0.5.40.tgz#c148f5149fd91dd3e29bf481abc8830ecba16b89"
integrity sha512-tWfmNkRYmBkPJz5mr9GVDn9vRlVZOTe6yqY92rFxiOdWXbjaR0+9LwQnZGGuNR63X456NqmEkbskte8tWL5ePg==
dependencies:
moment ">= 2.9.0"
"moment@>= 2.9.0", moment@^2.22.2, moment@^2.29.4:
version "2.29.4"
resolved "https://registry.yarnpkg.com/moment/-/moment-2.29.4.tgz#3dbe052889fe7c1b2ed966fcb3a77328964ef108"
integrity sha512-5LC9SOxjSc2HF6vO2CyuTDNivEdoz2IvyJJGj6X8DJ0eFyfszE0QiEd+iXmBvUP3WHxSjFH/vIsA0EN00cgr8w==

Loading…
Cancel
Save