|
|
@ -5,7 +5,7 @@ import { CSV_PATH } from '../config'; |
|
|
|
|
|
|
|
|
|
|
|
import { QueryConfig, queryByConfig } from '@corpglory/tsdb-kit'; |
|
|
|
import { QueryConfig, queryByConfig } from '@corpglory/tsdb-kit'; |
|
|
|
// TODO: export QueryType directly from @corpglory/tsdb-kit
|
|
|
|
// TODO: export QueryType directly from @corpglory/tsdb-kit
|
|
|
|
import { QueryType } from '@corpglory/tsdb-kit/lib/connectors'; |
|
|
|
import { DataTable, QueryType } from '@corpglory/tsdb-kit/lib/connectors'; |
|
|
|
|
|
|
|
|
|
|
|
import { v4 as uuidv4 } from 'uuid'; |
|
|
|
import { v4 as uuidv4 } from 'uuid'; |
|
|
|
|
|
|
|
|
|
|
@ -63,42 +63,27 @@ export class Exporter { |
|
|
|
console.log(`Total days: ${days}`); |
|
|
|
console.log(`Total days: ${days}`); |
|
|
|
|
|
|
|
|
|
|
|
const stream = this._initCsvStream(); |
|
|
|
const stream = this._initCsvStream(); |
|
|
|
|
|
|
|
console.log("queryConfigs", queryConfigs); |
|
|
|
for(let day = 0; day < days; day++) { |
|
|
|
for(let day = 0; day < days; day++) { |
|
|
|
to = from + MS_IN_DAY; |
|
|
|
to = from + MS_IN_DAY; |
|
|
|
|
|
|
|
|
|
|
|
console.log(`${day} day: ${from}ms -> ${to}ms`); |
|
|
|
console.log(`${day} day: ${from}ms -> ${to}ms`); |
|
|
|
|
|
|
|
|
|
|
|
let columns = []; |
|
|
|
let columns = []; |
|
|
|
let values = []; |
|
|
|
let rows = []; |
|
|
|
|
|
|
|
|
|
|
|
for(const queryConfig of queryConfigs) { |
|
|
|
for(const queryConfig of queryConfigs) { |
|
|
|
const datasourceMetrics = await queryByConfig(queryConfig, datasourceUrl, from, to, apiKey); |
|
|
|
const datasourceMetrics: DataTable = await queryByConfig(queryConfig, datasourceUrl, from, to, apiKey); |
|
|
|
|
|
|
|
columns = this._updateColumns(columns, datasourceMetrics.columns); |
|
|
|
if(_.isEmpty(columns)) { |
|
|
|
console.log("columns", columns) |
|
|
|
columns = datasourceMetrics.columns; |
|
|
|
rows = this._updateRows(rows, datasourceMetrics.values); |
|
|
|
} else { |
|
|
|
console.log("rows", rows.slice(0, 10)); |
|
|
|
columns = _.concat(columns, datasourceMetrics.columns.slice(1)); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if(_.isEmpty(values)) { |
|
|
|
|
|
|
|
values = datasourceMetrics.values; |
|
|
|
|
|
|
|
} else { |
|
|
|
|
|
|
|
if(values.length !== datasourceMetrics.values.length) { |
|
|
|
|
|
|
|
throw new Error(`All queries should return rows of the same lengths`); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
for(const rowIdx in values) { |
|
|
|
|
|
|
|
if(datasourceMetrics.values[rowIdx][0] !== values[rowIdx][0]) { |
|
|
|
|
|
|
|
throw new Error('Queries should return the same timestamps'); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
values[rowIdx] = _.concat(values[rowIdx], datasourceMetrics.values[rowIdx].slice(1)); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
values = values.map((row: number[]) => [toIsoString(row[0], timeZoneName), ...row.slice(1)]); |
|
|
|
rows = rows.map((row: number[]) => [toIsoString(row[0], timeZoneName), ...row.slice(1)]); |
|
|
|
|
|
|
|
|
|
|
|
if(columns.length > 0) { |
|
|
|
if(columns.length > 0) { |
|
|
|
this._writeCsv(stream, { columns, values }); |
|
|
|
this._writeCsv(stream, { columns, rows }); |
|
|
|
} |
|
|
|
} |
|
|
|
await this._updateProgress({ status: ExportStatus.EXPORTING, progress: (day + 1) / days }); |
|
|
|
await this._updateProgress({ status: ExportStatus.EXPORTING, progress: (day + 1) / days }); |
|
|
|
|
|
|
|
|
|
|
@ -110,6 +95,27 @@ export class Exporter { |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private _updateColumns(columnsToUpdate: string[], queriedColumns: string[]): string[] { |
|
|
|
|
|
|
|
// slice(1) to avoid time fields
|
|
|
|
|
|
|
|
return _.concat(columnsToUpdate, queriedColumns.slice(1)); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private _updateRows(rowsToUpdate: (number | null)[][], queriedRows: (number | null)[][]): (number | null)[][] { |
|
|
|
|
|
|
|
if(_.isEmpty(rowsToUpdate)) { |
|
|
|
|
|
|
|
return queriedRows; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
if(rowsToUpdate.length !== queriedRows.length) { |
|
|
|
|
|
|
|
throw new Error(`All queries should return rows of the same lengths`); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
for(const rowIdx in rowsToUpdate) { |
|
|
|
|
|
|
|
if(queriedRows[rowIdx][0] !== rowsToUpdate[rowIdx][0]) { |
|
|
|
|
|
|
|
throw new Error('Queries should return the same timestamps'); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
rowsToUpdate[rowIdx] = _.concat(rowsToUpdate[rowIdx], queriedRows[rowIdx].slice(1)); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
return rowsToUpdate; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
private _initCsvStream() { |
|
|
|
private _initCsvStream() { |
|
|
|
const csvStream = csv.createWriteStream({ headers: true, delimiter: this._task.csvDelimiter }) |
|
|
|
const csvStream = csv.createWriteStream({ headers: true, delimiter: this._task.csvDelimiter }) |
|
|
|
.on('error', async e => await this._updateProgress({ status: ExportStatus.ERROR, errorMessage: e.message })); |
|
|
|
.on('error', async e => await this._updateProgress({ status: ExportStatus.ERROR, errorMessage: e.message })); |
|
|
@ -161,7 +167,7 @@ export class Exporter { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
private _writeCsv(stream, series) { |
|
|
|
private _writeCsv(stream, series) { |
|
|
|
for(let row of series.values) { |
|
|
|
for(let row of series.rows) { |
|
|
|
let csvRow = {}; |
|
|
|
let csvRow = {}; |
|
|
|
for(let col in series.columns) { |
|
|
|
for(let col in series.columns) { |
|
|
|
csvRow[series.columns[col]] = row[col]; |
|
|
|
csvRow[series.columns[col]] = row[col]; |
|
|
|