You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
206 lines
6.5 KiB
206 lines
6.5 KiB
import { getApiKey } from './api_keys'; |
|
import { toDateString, toIsoString } from '../utils'; |
|
import { DashboardQuery, ExportProgress, ExportStatus, ExportTask } from '../types'; |
|
import { CSV_PATH } from '../config'; |
|
|
|
import { QueryConfig, queryByConfig } from '@corpglory/tsdb-kit'; |
|
// TODO: export QueryType directly from @corpglory/tsdb-kit |
|
import { DataTable, QueryType } from '@corpglory/tsdb-kit/lib/connectors'; |
|
|
|
import { v4 as uuidv4 } from 'uuid'; |
|
|
|
import * as moment from 'moment'; |
|
import * as csv from 'fast-csv'; |
|
import * as fs from 'fs'; |
|
import * as path from 'path'; |
|
import { URL } from 'url'; |
|
import { promisify } from 'util'; |
|
|
|
import * as _ from 'lodash'; |
|
|
|
const MS_IN_DAY = 24 * 60 * 60 * 1000; |
|
|
|
const DEFAULT_PROGRESS = { |
|
exportedRowsCount: 0, |
|
progress: 0, |
|
status: ExportStatus.EXPORTING, |
|
}; |
|
|
|
export class Exporter { |
|
private _task: ExportTask; |
|
|
|
public async export(task: ExportTask, datasourceUrl: string, timeZoneName: string, apiKey: string) { |
|
try { |
|
this._task = _.cloneDeep(task); |
|
this._task.id = uuidv4(); |
|
this._task.progress = _.cloneDeep(DEFAULT_PROGRESS); |
|
|
|
this._validateQueries(task.queries); |
|
|
|
const panelName = _.snakeCase(task.queries[0].panel.title); |
|
const dateFrom = toDateString(task.timeRange.from, timeZoneName); |
|
const dateTo = toDateString(task.timeRange.to, timeZoneName); |
|
this._task.filename = `${panelName}_${task.dashboardUid}_${dateFrom}_${dateTo}`; |
|
|
|
await this._updateProgress(); |
|
|
|
const queryConfigs = task.queries.map( |
|
query => |
|
new QueryConfig( |
|
QueryType.GRAFANA, |
|
{ |
|
...query.datasource, |
|
url: datasourceUrl, |
|
}, |
|
[query.target] |
|
) |
|
); |
|
|
|
let from = +this._task.timeRange.from; |
|
let to = +this._task.timeRange.to; |
|
const days = Math.ceil((to - from) / MS_IN_DAY); |
|
|
|
console.log(`Total days: ${days}`); |
|
|
|
const stream = this._initCsvStream(); |
|
|
|
for(let day = 0; day < days; day++) { |
|
to = from + MS_IN_DAY; |
|
|
|
console.log(`${day} day: ${from}ms -> ${to}ms`); |
|
|
|
let metricColumns = []; |
|
let keyColumn; |
|
let rowsDict = {}; |
|
for(const queryConfig of queryConfigs) { |
|
const datasourceMetrics: DataTable = await queryByConfig(queryConfig, datasourceUrl, from, to, apiKey); |
|
if(keyColumn == undefined) { |
|
keyColumn = datasourceMetrics.columns[0]; |
|
} |
|
metricColumns = this._updateColumns(metricColumns, datasourceMetrics.columns); |
|
rowsDict = this._updateRows(rowsDict, datasourceMetrics.values, datasourceMetrics.columns); |
|
} |
|
const rows = this._getRowsFromDict(rowsDict, metricColumns); |
|
const columns = _.concat(keyColumn, metricColumns); |
|
const formattedRows = rows.map((row: number[]) => [toIsoString(row[0], timeZoneName), ...row.slice(1)]); |
|
|
|
if(metricColumns.length > 0) { |
|
this._writeCsv(stream, { columns, rows: formattedRows }); |
|
} |
|
await this._updateProgress({ status: ExportStatus.EXPORTING, progress: (day + 1) / days }); |
|
|
|
from += MS_IN_DAY; |
|
} |
|
stream.end(); |
|
} catch (e) { |
|
await this._updateProgress({ status: ExportStatus.ERROR, errorMessage: e.message }); |
|
} |
|
} |
|
|
|
private _updateColumns(columnsToUpdate: string[], queriedColumns: string[]): string[] { |
|
// slice(1) to avoid key fields |
|
return _.concat(columnsToUpdate, queriedColumns.slice(1)); |
|
} |
|
|
|
private _updateRows(dict, queriedRows: (number | null)[][], queriedColumns: string[]): { [key: number]: { [column: string]: number} } { |
|
const columns = queriedColumns.slice(1); |
|
for(const rowIdx in queriedRows) { |
|
const key = queriedRows[rowIdx][0]; |
|
const values = queriedRows[rowIdx].slice(1); |
|
dict[key] = dict[key] || {}; |
|
for(const valueIdx in values) { |
|
dict[key][columns[valueIdx]] = values[valueIdx]; |
|
} |
|
} |
|
return dict; |
|
} |
|
|
|
private _getRowsFromDict(dict: any, columns: string[]): (number | null)[][] { |
|
const keyList = _.map(_.keys(dict), value => _.toNumber(value)); |
|
const orderedKeyList = _.orderBy(keyList); |
|
let rows = []; |
|
for(const keyIdx in orderedKeyList) { |
|
const key = orderedKeyList[keyIdx]; |
|
rows[keyIdx] = [key]; |
|
for(const column of columns) { |
|
const value = dict[key][column] || null; |
|
rows[keyIdx].push(value) |
|
} |
|
} |
|
return rows; |
|
} |
|
|
|
private _initCsvStream() { |
|
const csvStream = csv.createWriteStream({ headers: true, delimiter: this._task.csvDelimiter }) |
|
.on('error', async e => await this._updateProgress({ status: ExportStatus.ERROR, errorMessage: e.message })); |
|
|
|
const writableStream = fs.createWriteStream(this._getCsvFilePath()); |
|
|
|
csvStream.pipe(writableStream); |
|
writableStream.on('finish', async () => { |
|
if(this._task.progress.status !== ExportStatus.ERROR) { |
|
console.log(`Everything is written to ${this._getCsvFilename()}`); |
|
await this._updateProgress({ status: ExportStatus.FINISHED, progress: 1 }); |
|
} else { |
|
console.log(`${this._getCsvFilename()} export is finished with error`); |
|
} |
|
}); |
|
|
|
return csvStream; |
|
} |
|
|
|
private async _updateProgress(progress?: Partial<ExportProgress>) { |
|
try { |
|
let time = moment().valueOf(); |
|
|
|
const data = { |
|
...this._task, |
|
progress: _.assign(this._task.progress, progress, { time }), |
|
}; |
|
|
|
await promisify(fs.writeFile)(this._getJsonFilePath(), JSON.stringify(data), 'utf8'); |
|
} catch(err) { |
|
console.error(err); |
|
throw new Error('Can`t write file'); |
|
} |
|
} |
|
|
|
private _validateQueries(queries: DashboardQuery[]) { |
|
if(!_.isArray(queries)) { |
|
throw new Error('`queries` field is required and should be an array'); |
|
} |
|
|
|
for(const query of queries) { |
|
if(_.isEmpty(query.datasource)) { |
|
throw new Error('all queries should have a `datasource` field'); |
|
} |
|
if(_.isEmpty(query.target)) { |
|
throw new Error('all queries should have a `target` field'); |
|
} |
|
} |
|
} |
|
|
|
private _writeCsv(stream, series) { |
|
for(let row of series.rows) { |
|
let csvRow = {}; |
|
for(let col in series.columns) { |
|
csvRow[series.columns[col]] = row[col]; |
|
} |
|
stream.write(csvRow); |
|
this._task.progress.exportedRowsCount++; |
|
} |
|
} |
|
|
|
private _getCsvFilename(): string { |
|
return `${this._task.filename}.csv`; |
|
} |
|
|
|
private _getCsvFilePath(): string { |
|
let filename = this._getCsvFilename(); |
|
return path.join(CSV_PATH, filename); |
|
} |
|
|
|
private _getJsonFilePath(): string { |
|
return path.join(CSV_PATH, `${this._task.id}.json`); |
|
} |
|
}
|
|
|