import { getApiKey } from './api_keys'; import { toDateString, toIsoString } from '../utils'; import { DashboardQuery, ExportProgress, ExportStatus, ExportTask } from '../types'; import { CSV_PATH } from '../config'; import { QueryConfig, queryByConfig } from '@corpglory/tsdb-kit'; // TODO: export QueryType directly from @corpglory/tsdb-kit import { DataTable, QueryType } from '@corpglory/tsdb-kit/lib/connectors'; import { v4 as uuidv4 } from 'uuid'; import * as moment from 'moment'; import * as csv from 'fast-csv'; import * as fs from 'fs'; import * as path from 'path'; import { URL } from 'url'; import { promisify } from 'util'; import * as _ from 'lodash'; const MS_IN_DAY = 24 * 60 * 60 * 1000; const DEFAULT_PROGRESS = { exportedRowsCount: 0, progress: 0, status: ExportStatus.EXPORTING, }; export class Exporter { private _task: ExportTask; public async export(task: ExportTask, datasourceUrl: string, timeZoneName: string, apiKey: string) { try { this._task = _.cloneDeep(task); this._task.id = uuidv4(); this._task.progress = _.cloneDeep(DEFAULT_PROGRESS); this._validateQueries(task.queries); const panelName = _.snakeCase(task.queries[0].panel.title); const dateFrom = toDateString(task.timeRange.from, timeZoneName); const dateTo = toDateString(task.timeRange.to, timeZoneName); this._task.filename = `${panelName}_${task.dashboardUid}_${dateFrom}_${dateTo}`; await this._updateProgress(); const queryConfigs = task.queries.map( query => new QueryConfig( QueryType.GRAFANA, { ...query.datasource, url: datasourceUrl, }, [query.target] ) ); let from = +this._task.timeRange.from; let to = +this._task.timeRange.to; const days = Math.ceil((to - from) / MS_IN_DAY); console.log(`Total days: ${days}`); const stream = this._initCsvStream(); console.log("queryConfigs", queryConfigs); for(let day = 0; day < days; day++) { to = from + MS_IN_DAY; console.log(`${day} day: ${from}ms -> ${to}ms`); let columns = []; let rowsDict = {}; for(const queryConfig of queryConfigs) { const datasourceMetrics: DataTable = await queryByConfig(queryConfig, datasourceUrl, from, to, apiKey); columns = this._updateColumns(columns, datasourceMetrics.columns); rowsDict = this._updateRows(rowsDict, datasourceMetrics.values, datasourceMetrics.columns); } const rows = this._getRowsFromDict(rowsDict, columns); // const formattedRows = rows.map((row: number[]) => [toIsoString(row[0], timeZoneName), ...row.slice(1)]); if(columns.length > 0) { this._writeCsv(stream, { columns, rows }); } await this._updateProgress({ status: ExportStatus.EXPORTING, progress: (day + 1) / days }); from += MS_IN_DAY; } stream.end(); } catch (e) { await this._updateProgress({ status: ExportStatus.ERROR, errorMessage: e.message }); } } private _updateColumns(columnsToUpdate: string[], queriedColumns: string[]): string[] { // slice(1) to avoid time fields return _.concat(columnsToUpdate, queriedColumns.slice(1)); } private _updateRows(dict, queriedRows: (number | null)[][], queriedColumns: string[]): any { const columns = queriedColumns.slice(1); for(const rowIdx in queriedRows) { const key = queriedRows[rowIdx][0]; const values = queriedRows[rowIdx].slice(1); dict[key] = dict[key] || {}; for(const valueIdx in values) { dict[key][columns[valueIdx]] = values[valueIdx]; } } return dict; } private _getRowsFromDict(dict: any, columns: string[]): (number | null)[][] { let keyList = _.orderBy(_.keys(dict)); let rows = []; for(const keyIdx in keyList) { const key = keyList[keyIdx]; rows[keyIdx] = [key]; for(const column of columns) { const value = dict[key][column] || null; rows[keyIdx].push(value) } } return rows; } private _initCsvStream() { const csvStream = csv.createWriteStream({ headers: true, delimiter: this._task.csvDelimiter }) .on('error', async e => await this._updateProgress({ status: ExportStatus.ERROR, errorMessage: e.message })); const writableStream = fs.createWriteStream(this._getCsvFilePath()); csvStream.pipe(writableStream); writableStream.on('finish', async () => { if(this._task.progress.status !== ExportStatus.ERROR) { console.log(`Everything is written to ${this._getCsvFilename()}`); await this._updateProgress({ status: ExportStatus.FINISHED, progress: 1 }); } else { console.log(`${this._getCsvFilename()} export is finished with error`); } }); return csvStream; } private async _updateProgress(progress?: Partial) { try { let time = moment().valueOf(); const data = { ...this._task, progress: _.assign(this._task.progress, progress, { time }), }; await promisify(fs.writeFile)(this._getJsonFilePath(), JSON.stringify(data), 'utf8'); } catch(err) { console.error(err); throw new Error('Can`t write file'); } } private _validateQueries(queries: DashboardQuery[]) { if(!_.isArray(queries)) { throw new Error('`queries` field is required and should be an array'); } for(const query of queries) { if(_.isEmpty(query.datasource)) { throw new Error('all queries should have a `datasource` field'); } if(_.isEmpty(query.target)) { throw new Error('all queries should have a `target` field'); } } } private _writeCsv(stream, series) { for(let row of series.rows) { let csvRow = {}; for(let col in series.columns) { csvRow[series.columns[col]] = row[col]; } stream.write(csvRow); this._task.progress.exportedRowsCount++; } } private _getCsvFilename(): string { return `${this._task.filename}.csv`; } private _getCsvFilePath(): string { let filename = this._getCsvFilename(); return path.join(CSV_PATH, filename); } private _getJsonFilePath(): string { return path.join(CSV_PATH, `${this._task.id}.json`); } }