Compare commits

...

12 Commits

  1. 2
      README.md
  2. 2
      package.json
  3. 17
      src/routes/tasks.ts
  4. 103
      src/services/exporter.ts

2
README.md

@ -13,7 +13,7 @@ Server for fetching data from Grafana datasources. You would you use it:
#### Installation
```bash
wget https://code.corpglory.net/attachments/12517474-e841-44ee-ba9a-c25367e79389 -O grafana-data-exporter.tar.gz
wget https://code.corpglory.net/attachments/6020c47c-5fe2-4049-bc1d-cb4c4bc5aa7e -O grafana-data-exporter.tar.gz
tar -zxvf grafana-data-exporter.tar.gz
```

2
package.json

@ -1,6 +1,6 @@
{
"name": "grafana-data-exporter",
"version": "0.7.4",
"version": "0.7.7",
"description": "Server for fetching data from Grafana datasources",
"scripts": {
"start": "node dist/server.js",

17
src/routes/tasks.ts

@ -17,6 +17,7 @@ type TRequest = {
task: ExportTask,
url: string,
timeZoneName: string,
apiKey: string,
},
};
@ -66,6 +67,11 @@ async function addTask(req: TRequest, res) {
res.status(400).send('"task" field is required');
return;
}
const apiKey = body.apiKey;
if (_.isEmpty(apiKey)) {
res.status(400).send('"apiKey" field is required');
return;
}
try {
validateGrafanaUrl(clientUrl);
@ -87,15 +93,16 @@ async function addTask(req: TRequest, res) {
}
const exporter = exporterFactory.getExporter();
exporter.export(task, datasourceUrl, body.timeZoneName);
exporter.export(task, datasourceUrl, body.timeZoneName, apiKey);
res.status(200).send(`Export process started`);
}
async function deleteTask(req, res) {
let taskId = req.body.taskId;
let csvFilePath = path.join(CSV_PATH, `${taskId}.csv`);
let jsonFilePath = path.join(CSV_PATH, `${taskId}.json`);
const taskId = req.body.taskId;
const jsonFilePath = path.join(CSV_PATH, `${taskId}.json`);
const data = fs.readFileSync(jsonFilePath, 'utf8');
const csvName = JSON.parse(data)['filename'];
const csvFilePath = path.join(CSV_PATH, `${csvName}.csv`);
if(fs.existsSync(csvFilePath)) {
fs.unlink(csvFilePath, err => console.error(err));
}

103
src/services/exporter.ts

@ -5,7 +5,7 @@ import { CSV_PATH } from '../config';
import { QueryConfig, queryByConfig } from '@corpglory/tsdb-kit';
// TODO: export QueryType directly from @corpglory/tsdb-kit
import { QueryType } from '@corpglory/tsdb-kit/lib/connectors';
import { DataTable, QueryType } from '@corpglory/tsdb-kit/lib/connectors';
import { v4 as uuidv4 } from 'uuid';
@ -29,7 +29,7 @@ const DEFAULT_PROGRESS = {
export class Exporter {
private _task: ExportTask;
public async export(task: ExportTask, datasourceUrl: string, timeZoneName: string) {
public async export(task: ExportTask, datasourceUrl: string, timeZoneName: string, apiKey: string) {
try {
this._task = _.cloneDeep(task);
this._task.id = uuidv4();
@ -63,45 +63,29 @@ export class Exporter {
console.log(`Total days: ${days}`);
const stream = this._initCsvStream();
for(let day = 0; day < days; day++) {
to = from + MS_IN_DAY;
console.log(`${day} day: ${from}ms -> ${to}ms`);
let columns = [];
let values = [];
let metricColumns = [];
let keyColumn;
let rowsDict = {};
for(const queryConfig of queryConfigs) {
const host = new URL(datasourceUrl).origin;
const apiKey = getApiKey(host);
const datasourceMetrics = await queryByConfig(queryConfig, datasourceUrl, from, to, apiKey);
if(_.isEmpty(columns)) {
columns = datasourceMetrics.columns;
} else {
columns = _.concat(columns, datasourceMetrics.columns.slice(1));
}
if(_.isEmpty(values)) {
values = datasourceMetrics.values;
} else {
if(values.length !== datasourceMetrics.values.length) {
throw new Error(`All queries should return rows of the same lengths`);
}
for(const rowIdx in values) {
if(datasourceMetrics.values[rowIdx][0] !== values[rowIdx][0]) {
throw new Error('Queries should return the same timestamps');
}
values[rowIdx] = _.concat(values[rowIdx], datasourceMetrics.values[rowIdx].slice(1));
}
const datasourceMetrics: DataTable = await queryByConfig(queryConfig, datasourceUrl, from, to, apiKey);
if(keyColumn == undefined) {
keyColumn = datasourceMetrics.columns[0];
}
metricColumns = this._updateColumns(metricColumns, datasourceMetrics.columns);
rowsDict = this._updateRows(rowsDict, datasourceMetrics.values, datasourceMetrics.columns);
}
const rows = this._getRowsFromDict(rowsDict, metricColumns);
const columns = _.concat(keyColumn, metricColumns);
const formattedRows = rows.map((row: number[]) => [toIsoString(row[0], timeZoneName), ...row.slice(1)]);
values = values.map((row: number[]) => [toIsoString(row[0], timeZoneName), ...row.slice(1)]);
if(columns.length > 0) {
this._writeCsv(stream, { columns, values });
if(metricColumns.length > 0) {
this._writeCsv(stream, { columns, rows: formattedRows });
}
await this._updateProgress({ status: ExportStatus.EXPORTING, progress: (day + 1) / days });
@ -113,19 +97,52 @@ export class Exporter {
}
}
private _updateColumns(columnsToUpdate: string[], queriedColumns: string[]): string[] {
// slice(1) to avoid key fields
return _.concat(columnsToUpdate, queriedColumns.slice(1));
}
private _updateRows(dict, queriedRows: (number | null)[][], queriedColumns: string[]): { [key: number]: { [column: string]: number} } {
const columns = queriedColumns.slice(1);
for(const rowIdx in queriedRows) {
const key = queriedRows[rowIdx][0];
const values = queriedRows[rowIdx].slice(1);
dict[key] = dict[key] || {};
for(const valueIdx in values) {
dict[key][columns[valueIdx]] = values[valueIdx];
}
}
return dict;
}
private _getRowsFromDict(dict: any, columns: string[]): (number | null)[][] {
const keyList = _.map(_.keys(dict), value => _.toNumber(value));
const orderedKeyList = _.orderBy(keyList);
let rows = [];
for(const keyIdx in orderedKeyList) {
const key = orderedKeyList[keyIdx];
rows[keyIdx] = [key];
for(const column of columns) {
const value = dict[key][column] || null;
rows[keyIdx].push(value)
}
}
return rows;
}
private _initCsvStream() {
const csvStream = csv.createWriteStream({ headers: true, delimiter: this._task.csvDelimiter })
.on('error', async e => await this._updateProgress({ status: ExportStatus.ERROR, errorMessage: e.message }));
const writableStream = fs.createWriteStream(this._getFilePath('csv'));
const writableStream = fs.createWriteStream(this._getCsvFilePath());
csvStream.pipe(writableStream);
writableStream.on('finish', async () => {
if(this._task.progress.status !== ExportStatus.ERROR) {
console.log(`Everything is written to ${this._getFilename('csv')}`);
console.log(`Everything is written to ${this._getCsvFilename()}`);
await this._updateProgress({ status: ExportStatus.FINISHED, progress: 1 });
} else {
console.log(`${this._getFilename('csv')} export is finished with error`);
console.log(`${this._getCsvFilename()} export is finished with error`);
}
});
@ -141,7 +158,7 @@ export class Exporter {
progress: _.assign(this._task.progress, progress, { time }),
};
await promisify(fs.writeFile)(this._getFilePath('json'), JSON.stringify(data), 'utf8');
await promisify(fs.writeFile)(this._getJsonFilePath(), JSON.stringify(data), 'utf8');
} catch(err) {
console.error(err);
throw new Error('Can`t write file');
@ -164,7 +181,7 @@ export class Exporter {
}
private _writeCsv(stream, series) {
for(let row of series.values) {
for(let row of series.rows) {
let csvRow = {};
for(let col in series.columns) {
csvRow[series.columns[col]] = row[col];
@ -174,12 +191,16 @@ export class Exporter {
}
}
private _getFilename(extension: string): string {
return `${this._task.filename}.${extension}`;
private _getCsvFilename(): string {
return `${this._task.filename}.csv`;
}
private _getFilePath(extension: string): string {
let filename = this._getFilename(extension);
private _getCsvFilePath(): string {
let filename = this._getCsvFilename();
return path.join(CSV_PATH, filename);
}
private _getJsonFilePath(): string {
return path.join(CSV_PATH, `${this._task.id}.json`);
}
}

Loading…
Cancel
Save