Error handling #5

Merged
rozetko merged 4 commits from error-handling into master 2 years ago
  1. 1
      package.json
  2. 8
      src/models/target.ts
  3. 60
      src/routes/tasks.ts
  4. 179
      src/services/exporter.ts
  5. 38
      src/types.ts
  6. 4
      src/types/export-status.ts
  7. 8
      src/types/target.ts
  8. 5
      yarn.lock

1
package.json

@ -20,6 +20,7 @@
"nodemon": "^2.0.20", "nodemon": "^2.0.20",
"ts-loader": "^9.4.2", "ts-loader": "^9.4.2",
"typescript": "^4.9.4", "typescript": "^4.9.4",
"uuid": "^9.0.0",
"webpack": "^5.75.0", "webpack": "^5.75.0",
"webpack-cli": "^5.0.1" "webpack-cli": "^5.0.1"
} }

8
src/models/target.ts

@ -0,0 +1,8 @@
import { DataSourceSettings, PanelModel } from '../types';
export class Target {
constructor(
public panel: PanelModel,
public datasource: DataSourceSettings,
) {}
}

60
src/routes/tasks.ts

@ -1,26 +1,24 @@
import { Target } from '../types/target';
import { exporterFactory } from '../services/exporter.factory'; import { exporterFactory } from '../services/exporter.factory';
import { EXPORTED_PATH } from '../config'; import { EXPORTED_PATH } from '../config';
import { Task, TaskTableRowConfig } from '../types'; import { ExportTask } from '../types';
import * as express from 'express'; import * as express from 'express';
import * as _ from 'lodash';
import * as path from 'path'; import * as path from 'path';
import * as fs from 'fs'; import * as fs from 'fs';
type TRequest = { type TRequest = {
body: { body: {
from: string, task: ExportTask,
to: string,
username: string,
tasks: Task[],
url: string, url: string,
}, },
}; };
async function getTasks(req, res) { async function getTasks(req, res) {
const resp: TaskTableRowConfig[] = []; const resp: ExportTask[] = [];
fs.readdir(EXPORTED_PATH, (err, items) => { fs.readdir(EXPORTED_PATH, (err, items) => {
if(err) { if(err) {
console.error(err); console.error(err);
@ -34,15 +32,7 @@ async function getTasks(req, res) {
// TODO: read async // TODO: read async
let data = fs.readFileSync(path.join(EXPORTED_PATH, item), 'utf8'); let data = fs.readFileSync(path.join(EXPORTED_PATH, item), 'utf8');
let status = JSON.parse(data); let status = JSON.parse(data);
resp.push({ resp.push(status);
timestamp: status.time,
username: status.username,
datasourceRef: status.datasourceRef,
rowsCount: status.exportedRows,
progress: status.progress,
status: status.status,
filename: file.name,
});
} }
res.status(200).send(resp); res.status(200).send(resp);
@ -51,38 +41,36 @@ async function getTasks(req, res) {
} }
async function addTask(req: TRequest, res) { async function addTask(req: TRequest, res) {
const body = req.body; const body = req.body;
const clientUrl = body.url; const clientUrl = body.url;
const from = parseInt(body.from); if (_.isEmpty(clientUrl)) {
const to = parseInt(body.to); res.status(400).send('"url" field is required');
const username = body.username; }
const tasks = body.tasks; const task = body.task;
if (_.isEmpty(task)) {
res.status(400).send('"task" field is required');
}
const datasourceUrl = `${new URL(clientUrl).origin}/api/ds/query`; const datasourceUrl = `${new URL(clientUrl).origin}/api/ds/query`;
if(isNaN(from) || isNaN(to)) { const from = +task.timeRange.from;
const to = +task.timeRange.to;
if (isNaN(from) || isNaN(to)) {
res.status(400).send('Range error: please fill both "from" and "to" fields'); res.status(400).send('Range error: please fill both "from" and "to" fields');
} else if(from >= to) { } else if (from >= to) {
res.status(400).send('Range error: "from" should be less than "to"'); res.status(400).send('Range error: "from" should be less than "to"');
} else {
const names = tasks.map(item => item.datasource.name).join(', ');
const targets = tasks.map((task: Task) => new Target(
task.panel,
task.datasource,
));
const exporter = exporterFactory.getExporter();
exporter.export(targets, datasourceUrl, username, from, to);
res.status(200).send(`Exporting ${names} data from ${new Date(from).toLocaleString()} to ${new Date(to).toLocaleString()}`);
} }
const exporter = exporterFactory.getExporter();
exporter.export(task, datasourceUrl);
res.status(200).send(`Export process started`);
} }
async function deleteTask(req, res) { async function deleteTask(req, res) {
let filename = req.body.filename; let taskId = req.body.taskId;
let csvFilePath = path.join(EXPORTED_PATH, `${filename}.csv`); let csvFilePath = path.join(EXPORTED_PATH, `${taskId}.csv`);
let jsonFilePath = path.join(EXPORTED_PATH, `${filename}.json`); let jsonFilePath = path.join(EXPORTED_PATH, `${taskId}.json`);
if(fs.existsSync(csvFilePath)) { if(fs.existsSync(csvFilePath)) {
fs.unlink(csvFilePath, err => console.error(err)); fs.unlink(csvFilePath, err => console.error(err));

179
src/services/exporter.ts

@ -1,14 +1,15 @@
import { Target } from '../types/target'; import { Target } from '../models/target';
import { URL } from 'url'; import { URL } from 'url';
import { apiKeys } from '../config'; import { apiKeys } from '../config';
import { promisify } from '../utils'; import { promisify } from '../utils';
import { ExportStatus } from '../types/export-status'; import { DashboardQuery, ExportProgress, ExportStatus, ExportTask } from '../types';
import { DataSourceRef } from '../types';
import { QueryConfig, queryByConfig } from '@corpglory/tsdb-kit'; import { QueryConfig, queryByConfig } from '@corpglory/tsdb-kit';
// TODO: export QueryType directly from @corpglory/tsdb-kit // TODO: export QueryType directly from @corpglory/tsdb-kit
import { QueryType } from '@corpglory/tsdb-kit/lib/connectors'; import { QueryType } from '@corpglory/tsdb-kit/lib/connectors';
import { v4 as uuidv4 } from 'uuid';
import * as moment from 'moment'; import * as moment from 'moment';
import * as csv from 'fast-csv'; import * as csv from 'fast-csv';
import * as fs from 'fs'; import * as fs from 'fs';
@ -17,60 +18,32 @@ import * as _ from 'lodash';
const MS_IN_DAY = 24 * 60 * 60 * 1000; const MS_IN_DAY = 24 * 60 * 60 * 1000;
const DEFAULT_PROGRESS = {
exportedRowsCount: 0,
progress: 0,
status: ExportStatus.EXPORTING,
};
export class Exporter { export class Exporter {
private exportedRows = 0; private _task: ExportTask;
private createdTimestamp: number;
private username: string;
private datasourceRef: DataSourceRef;
private initCsvStream() { public async export(task: ExportTask, datasourceUrl: string) {
const csvStream = csv.createWriteStream({ headers: true }) this._task = _.cloneDeep(task);
.on('error', error => console.error(error)); this._task.id = uuidv4();
this._task.progress = _.cloneDeep(DEFAULT_PROGRESS);
const writableStream = fs.createWriteStream(this.getFilePath('csv')); const targets = task.queries.map((query: DashboardQuery) => new Target(
query.panel,
query.datasource,
));
csvStream.pipe(writableStream); this._validateTargets(datasourceUrl, targets);
writableStream.on('finish', async () => {
console.log(`Everything is written to ${this.getFilename('csv')}`);
await this.updateStatus(ExportStatus.FINISHED, 1);
})
return csvStream; await this._updateProgress();
}
public async updateStatus(status: string, progress: number) { const queryConfigs = targets.map(
try { target =>
let time = moment().valueOf(); new QueryConfig(
let data = {
time,
username: this.username,
exportedRows: this.exportedRows,
progress: progress,
status,
datasourceRef: this.datasourceRef,
};
await promisify(fs.writeFile, this.getFilePath('json'), JSON.stringify(data), 'utf8')
} catch(err) {
console.error(err);
throw new Error('Can`t write file');
}
}
// TODO: rename `data` to `targets` or `queries`
public async export(data: Target[], datasourceUrl: string, username: string, from: number, to: number) {
this.username = username;
this.validateTargets(datasourceUrl, data);
const targets = data.map(target => {
console.log('target', {
...target.datasource,
url: datasourceUrl
});
return {
...target,
metric: new QueryConfig(
QueryType.GRAFANA, QueryType.GRAFANA,
{ {
...target.datasource, ...target.datasource,
@ -78,52 +51,83 @@ export class Exporter {
}, },
target.panel.targets target.panel.targets
) )
} );
});
const datasource = data[0].datasource;
this.datasourceRef = { uid: datasource.uid, type: datasource.type };
await this.updateStatus(ExportStatus.EXPORTING, 0); let from = +this._task.timeRange.from;
let to = +this._task.timeRange.to;
const stream = this.initCsvStream();
const days = Math.ceil((to - from) / MS_IN_DAY); const days = Math.ceil((to - from) / MS_IN_DAY);
console.log(`Total days: ${days}`); console.log(`Total days: ${days}`);
for(let day = 0; day < days; day++) { const stream = this._initCsvStream();
to = from + MS_IN_DAY; try {
for(let day = 0; day < days; day++) {
to = from + MS_IN_DAY;
console.log(`${day} day: ${from}ms -> ${to}ms`);
console.log(`${day} day: ${from}ms -> ${to}ms`); let columns = [];
let values = [];
let columns = []; for(const queryConfig of queryConfigs) {
let values = []; const host = new URL(datasourceUrl).origin;
const apiKey = apiKeys[host];
for(const [index, target] of targets.entries()) { const datasourceMetrics = await queryByConfig(queryConfig, datasourceUrl, from, to, apiKey);
const host = new URL(datasourceUrl).origin;
const apiKey = apiKeys[host];
const datasourceMetrics = await queryByConfig(target.metric, datasourceUrl, from, to, apiKey); columns = datasourceMetrics.columns;
values = datasourceMetrics.values;
}
if(columns.length > 0) {
this._writeCsv(stream, { columns, values, });
}
await this._updateProgress({ status: ExportStatus.EXPORTING, progress: (day + 1) / days });
columns = datasourceMetrics.columns; from += MS_IN_DAY;
values = datasourceMetrics.values;
} }
} catch (e) {
await this._updateProgress({ status: ExportStatus.ERROR, errorMessage: e.message });
}
stream.end();
}
if(columns.length > 0) { private _initCsvStream() {
console.log('values', values); const csvStream = csv.createWriteStream({ headers: true })
this.writeCsv(stream, { .on('error', async e => await this._updateProgress({ status: ExportStatus.ERROR, errorMessage: e.message }));
columns,
values, const writableStream = fs.createWriteStream(this._getFilePath('csv'));
});
csvStream.pipe(writableStream);
writableStream.on('finish', async () => {
if(this._task.progress.status !== ExportStatus.ERROR) {
console.log(`Everything is written to ${this._getFilename('csv')}`);
await this._updateProgress({ status: ExportStatus.FINISHED, progress: 1 });
} else {
console.log(`${this._getFilename('csv')} export is finished with error`);
} }
await this.updateStatus(ExportStatus.EXPORTING, (day + 1) / days); });
from += MS_IN_DAY; return csvStream;
}
private async _updateProgress(progress?: Partial<ExportProgress>) {
try {
let time = moment().valueOf();
const data = {
...this._task,
progress: _.assign(this._task.progress, progress, { time }),
};
await promisify(fs.writeFile, this._getFilePath('json'), JSON.stringify(data), 'utf8');
} catch(err) {
console.error(err);
throw new Error('Can`t write file');
} }
stream.end();
} }
private validateTargets(datasourceUrl: string, targets: Target[]) { private _validateTargets(datasourceUrl: string, targets: Target[]) {
if(!targets || !Array.isArray(targets)) { if(!targets || !Array.isArray(targets)) {
throw new Error('Incorrect targets format'); throw new Error('Incorrect targets format');
} }
@ -140,7 +144,7 @@ export class Exporter {
} }
} }
private writeCsv(stream, series) { private _writeCsv(stream, series) {
for(let row of series.values) { for(let row of series.values) {
const isEmpty = _.every( const isEmpty = _.every(
_.slice(row, 1), _.slice(row, 1),
@ -152,20 +156,17 @@ export class Exporter {
csvRow[series.columns[col]] = row[col]; csvRow[series.columns[col]] = row[col];
} }
stream.write(csvRow); stream.write(csvRow);
this.exportedRows++; this._task.progress.exportedRowsCount++;
} }
} }
} }
private getFilename(extension) { private _getFilename(extension: string): string {
if(this.createdTimestamp === undefined) { return `${this._task.id}.${extension}`;
this.createdTimestamp = moment().valueOf();
}
return `${this.createdTimestamp}.${this.datasourceRef.uid}.${extension}`;
} }
private getFilePath(extension) { private _getFilePath(extension: string): string {
let filename = this.getFilename(extension); let filename = this._getFilename(extension);
return path.join(__dirname, `../exported/${filename}`); return path.join(__dirname, `../exported/${filename}`);
} }
} }

38
src/types/index.ts → src/types.ts

@ -80,19 +80,33 @@ export interface PanelModel {
alert?: any; alert?: any;
} }
// TODO: rename to query export enum ExportStatus {
export type Task = Omit<DataQuery, 'datasource'> & { EXPORTING = 'exporting',
selected: boolean; FINISHED = 'finished',
panel: PanelModel; ERROR = 'error',
datasource: DataSourceSettings; }
export type ExportProgress = {
time: number;
exportedRowsCount: number;
progress: number;
status: ExportStatus;
errorMessage?: string;
}; };
export type TaskTableRowConfig = { export type ExportTask = {
timestamp: number;
username: string; username: string;
datasourceRef: DataSourceRef; queries: DashboardQuery[];
rowsCount: number; timeRange: {
progress: number; from: number;
status: string; to: number;
filename?: string; };
progress?: ExportProgress;
id?: string;
};
export type DashboardQuery = DataQuery & {
selected: boolean;
panel: PanelModel;
datasource: DataSourceSettings;
}; };

4
src/types/export-status.ts

@ -1,4 +0,0 @@
export enum ExportStatus {
EXPORTING = 'exporting',
FINISHED = 'finished',
}

8
src/types/target.ts

@ -1,8 +0,0 @@
import { DataSourceSettings, PanelModel } from '.';
export class Target {
constructor(
public panel: PanelModel,
public datasource: DataSourceSettings,
) {}
}

5
yarn.lock

@ -1555,6 +1555,11 @@ utils-merge@1.0.1:
resolved "https://registry.yarnpkg.com/utils-merge/-/utils-merge-1.0.1.tgz#9f95710f50a267947b2ccc124741c1028427e713" resolved "https://registry.yarnpkg.com/utils-merge/-/utils-merge-1.0.1.tgz#9f95710f50a267947b2ccc124741c1028427e713"
integrity sha512-pMZTvIkT1d+TFGvDOqodOclx0QWkkgi6Tdoa8gC8ffGAAqz9pzPTZWAybbsHHoED/ztMtkv/VoYTYyShUn81hA== integrity sha512-pMZTvIkT1d+TFGvDOqodOclx0QWkkgi6Tdoa8gC8ffGAAqz9pzPTZWAybbsHHoED/ztMtkv/VoYTYyShUn81hA==
uuid@^9.0.0:
version "9.0.0"
resolved "https://registry.yarnpkg.com/uuid/-/uuid-9.0.0.tgz#592f550650024a38ceb0c562f2f6aa435761efb5"
integrity sha512-MXcSTerfPa4uqyzStbRoTgt5XIe3x5+42+q1sDuy3R5MDk66URdLMOZe5aPX/SQd+kuYAh0FdP/pO28IkQyTeg==
vary@~1.1.2: vary@~1.1.2:
version "1.1.2" version "1.1.2"
resolved "https://registry.yarnpkg.com/vary/-/vary-1.1.2.tgz#2299f02c6ded30d4a5961b0b9f74524a18f634fc" resolved "https://registry.yarnpkg.com/vary/-/vary-1.1.2.tgz#2299f02c6ded30d4a5961b0b9f74524a18f634fc"

Loading…
Cancel
Save