You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

179 lines
5.4 KiB

// TODO: import promisify from 'util'
import { getApiKey } from './api_keys';
import { promisify, toIsoString } from '../utils';
import { DashboardQuery, ExportProgress, ExportStatus, ExportTask } from '../types';
import { CSV_PATH } from '../config';
import { QueryConfig, queryByConfig } from '@corpglory/tsdb-kit';
// TODO: export QueryType directly from @corpglory/tsdb-kit
import { QueryType } from '@corpglory/tsdb-kit/lib/connectors';
import { v4 as uuidv4 } from 'uuid';
import * as moment from 'moment';
import * as csv from 'fast-csv';
import * as fs from 'fs';
import * as path from 'path';
import { URL } from 'url';
import * as _ from 'lodash';
const MS_IN_DAY = 24 * 60 * 60 * 1000;
const DEFAULT_PROGRESS = {
exportedRowsCount: 0,
progress: 0,
status: ExportStatus.EXPORTING,
};
export class Exporter {
private _task: ExportTask;
public async export(task: ExportTask, datasourceUrl: string, timeZoneName: string) {
try {
this._task = _.cloneDeep(task);
this._task.id = uuidv4();
this._task.progress = _.cloneDeep(DEFAULT_PROGRESS);
this._validateQueries(task.queries);
await this._updateProgress();
const queryConfigs = task.queries.map(
query =>
new QueryConfig(
QueryType.GRAFANA,
{
...query.datasource,
url: datasourceUrl,
},
[query.target]
)
);
let from = +this._task.timeRange.from;
let to = +this._task.timeRange.to;
const days = Math.ceil((to - from) / MS_IN_DAY);
console.log(`Total days: ${days}`);
const stream = this._initCsvStream();
for(let day = 0; day < days; day++) {
to = from + MS_IN_DAY;
console.log(`${day} day: ${from}ms -> ${to}ms`);
let columns = [];
let values = [];
for(const queryConfig of queryConfigs) {
const host = new URL(datasourceUrl).origin;
const apiKey = getApiKey(host);
const datasourceMetrics = await queryByConfig(queryConfig, datasourceUrl, from, to, apiKey);
if(_.isEmpty(columns)) {
columns = datasourceMetrics.columns;
} else {
columns = _.concat(columns, datasourceMetrics.columns.slice(1));
}
if(_.isEmpty(values)) {
values = datasourceMetrics.values;
} else {
if(values.length !== datasourceMetrics.values.length) {
throw new Error(`All queries should return rows of the same lengths`);
}
for(const rowIdx in values) {
if(datasourceMetrics.values[rowIdx][0] !== values[rowIdx][0]) {
throw new Error('Queries should return the same timestamps');
}
values[rowIdx] = _.concat(values[rowIdx], datasourceMetrics.values[rowIdx].slice(1));
}
}
}
values = values.map((row: number[]) => [toIsoString(row[0], timeZoneName), ...row.slice(1)]);
if(columns.length > 0) {
this._writeCsv(stream, { columns, values });
}
await this._updateProgress({ status: ExportStatus.EXPORTING, progress: (day + 1) / days });
from += MS_IN_DAY;
}
stream.end();
} catch (e) {
await this._updateProgress({ status: ExportStatus.ERROR, errorMessage: e.message });
}
}
private _initCsvStream() {
const csvStream = csv.createWriteStream({ headers: true })
.on('error', async e => await this._updateProgress({ status: ExportStatus.ERROR, errorMessage: e.message }));
const writableStream = fs.createWriteStream(this._getFilePath('csv'));
csvStream.pipe(writableStream);
writableStream.on('finish', async () => {
if(this._task.progress.status !== ExportStatus.ERROR) {
console.log(`Everything is written to ${this._getFilename('csv')}`);
await this._updateProgress({ status: ExportStatus.FINISHED, progress: 1 });
} else {
console.log(`${this._getFilename('csv')} export is finished with error`);
}
});
return csvStream;
}
private async _updateProgress(progress?: Partial<ExportProgress>) {
try {
let time = moment().valueOf();
const data = {
...this._task,
progress: _.assign(this._task.progress, progress, { time }),
};
await promisify(fs.writeFile, this._getFilePath('json'), JSON.stringify(data), 'utf8');
} catch(err) {
console.error(err);
throw new Error('Can`t write file');
}
}
private _validateQueries(queries: DashboardQuery[]) {
if(!_.isArray(queries)) {
throw new Error('`queries` field is required and should be an array');
}
for(const query of queries) {
if(_.isEmpty(query.datasource)) {
throw new Error('all queries should have a `datasource` field');
}
if(_.isEmpty(query.target)) {
throw new Error('all queries should have a `target` field');
}
}
}
private _writeCsv(stream, series) {
for(let row of series.values) {
let csvRow = {};
for(let col in series.columns) {
csvRow[series.columns[col]] = row[col];
}
stream.write(csvRow);
this._task.progress.exportedRowsCount++;
}
}
private _getFilename(extension: string): string {
return `${this._task.id}.${extension}`;
}
private _getFilePath(extension: string): string {
let filename = this._getFilename(extension);
return path.join(CSV_PATH, filename);
}
}