concat rows with different timestamps #12

Merged
rozetko merged 4 commits from csv-data-compute into master 2 years ago
  1. 78
      src/services/exporter.ts

78
src/services/exporter.ts

@ -5,7 +5,7 @@ import { CSV_PATH } from '../config';
import { QueryConfig, queryByConfig } from '@corpglory/tsdb-kit'; import { QueryConfig, queryByConfig } from '@corpglory/tsdb-kit';
// TODO: export QueryType directly from @corpglory/tsdb-kit // TODO: export QueryType directly from @corpglory/tsdb-kit
import { QueryType } from '@corpglory/tsdb-kit/lib/connectors'; import { DataTable, QueryType } from '@corpglory/tsdb-kit/lib/connectors';
import { v4 as uuidv4 } from 'uuid'; import { v4 as uuidv4 } from 'uuid';
@ -63,42 +63,29 @@ export class Exporter {
console.log(`Total days: ${days}`); console.log(`Total days: ${days}`);
const stream = this._initCsvStream(); const stream = this._initCsvStream();
for(let day = 0; day < days; day++) { for(let day = 0; day < days; day++) {
to = from + MS_IN_DAY; to = from + MS_IN_DAY;
console.log(`${day} day: ${from}ms -> ${to}ms`); console.log(`${day} day: ${from}ms -> ${to}ms`);
let columns = []; let metricColumns = [];
let values = []; let keyColumn;
let rowsDict = {};
for(const queryConfig of queryConfigs) { for(const queryConfig of queryConfigs) {
const datasourceMetrics = await queryByConfig(queryConfig, datasourceUrl, from, to, apiKey); const datasourceMetrics: DataTable = await queryByConfig(queryConfig, datasourceUrl, from, to, apiKey);
if(keyColumn == undefined) {
if(_.isEmpty(columns)) { keyColumn = datasourceMetrics.columns[0];
columns = datasourceMetrics.columns;
} else {
columns = _.concat(columns, datasourceMetrics.columns.slice(1));
}
if(_.isEmpty(values)) {
values = datasourceMetrics.values;
} else {
if(values.length !== datasourceMetrics.values.length) {
throw new Error(`All queries should return rows of the same lengths`);
}
for(const rowIdx in values) {
if(datasourceMetrics.values[rowIdx][0] !== values[rowIdx][0]) {
throw new Error('Queries should return the same timestamps');
}
values[rowIdx] = _.concat(values[rowIdx], datasourceMetrics.values[rowIdx].slice(1));
}
} }
metricColumns = this._updateColumns(metricColumns, datasourceMetrics.columns);
rowsDict = this._updateRows(rowsDict, datasourceMetrics.values, datasourceMetrics.columns);
} }
const rows = this._getRowsFromDict(rowsDict, metricColumns);
const columns = _.concat(keyColumn, metricColumns);
const formattedRows = rows.map((row: number[]) => [toIsoString(row[0], timeZoneName), ...row.slice(1)]);
values = values.map((row: number[]) => [toIsoString(row[0], timeZoneName), ...row.slice(1)]); if(metricColumns.length > 0) {
this._writeCsv(stream, { columns, rows: formattedRows });
if(columns.length > 0) {
this._writeCsv(stream, { columns, values });
} }
await this._updateProgress({ status: ExportStatus.EXPORTING, progress: (day + 1) / days }); await this._updateProgress({ status: ExportStatus.EXPORTING, progress: (day + 1) / days });
@ -110,6 +97,39 @@ export class Exporter {
} }
} }
private _updateColumns(columnsToUpdate: string[], queriedColumns: string[]): string[] {
// slice(1) to avoid key fields
return _.concat(columnsToUpdate, queriedColumns.slice(1));
}
private _updateRows(dict, queriedRows: (number | null)[][], queriedColumns: string[]): { [key: number]: { [column: string]: number} } {
const columns = queriedColumns.slice(1);
for(const rowIdx in queriedRows) {
const key = queriedRows[rowIdx][0];
const values = queriedRows[rowIdx].slice(1);
dict[key] = dict[key] || {};
for(const valueIdx in values) {
dict[key][columns[valueIdx]] = values[valueIdx];
}
}
return dict;
}
private _getRowsFromDict(dict: any, columns: string[]): (number | null)[][] {
const keyList = _.map(_.keys(dict), value => _.toNumber(value));
const orderedKeyList = _.orderBy(keyList);
let rows = [];
for(const keyIdx in orderedKeyList) {
const key = orderedKeyList[keyIdx];
rows[keyIdx] = [key];
for(const column of columns) {
const value = dict[key][column] || null;
rows[keyIdx].push(value)
}
}
return rows;
}
private _initCsvStream() { private _initCsvStream() {
const csvStream = csv.createWriteStream({ headers: true, delimiter: this._task.csvDelimiter }) const csvStream = csv.createWriteStream({ headers: true, delimiter: this._task.csvDelimiter })
.on('error', async e => await this._updateProgress({ status: ExportStatus.ERROR, errorMessage: e.message })); .on('error', async e => await this._updateProgress({ status: ExportStatus.ERROR, errorMessage: e.message }));
@ -161,7 +181,7 @@ export class Exporter {
} }
private _writeCsv(stream, series) { private _writeCsv(stream, series) {
for(let row of series.values) { for(let row of series.rows) {
let csvRow = {}; let csvRow = {};
for(let col in series.columns) { for(let col in series.columns) {
csvRow[series.columns[col]] = row[col]; csvRow[series.columns[col]] = row[col];

Loading…
Cancel
Save