Browse Source

some refactoring

pull/1/head
rozetko 2 years ago
parent
commit
b8f1f27f1a
  1. 5
      spec/elasticsearch.jest.ts
  2. 4
      spec/graphite.jest.ts
  3. 4
      spec/postgres.jest.ts
  4. 6
      spec/prometheus.jest.ts
  5. 3
      spec/targets.jest.ts
  6. 134
      src/grafana_service.ts
  7. 69
      src/index.ts
  8. 22
      src/metrics/elasticsearch_metric.ts
  9. 17
      src/metrics/metric.ts
  10. 21
      src/metrics/metrics_factory.ts
  11. 48
      src/services/direct_service.ts
  12. 77
      src/services/grafana_service.ts
  13. 25
      src/tsdb-kit/index.ts
  14. 15
      src/types.ts

5
spec/elasticsearch.jest.ts

@ -1,6 +1,7 @@
import { ElasticsearchMetric } from '../src/metrics/elasticsearch_metric';
import { Datasource } from '../src/metrics/metric';
import { Datasource, DatasourceType } from '../src/metrics/metric';
import 'jest';
import * as _ from 'lodash';
describe('simple query', function(){
@ -62,7 +63,7 @@ describe('simple query', function(){
}
}
}],
type: "elasticsearch"
type: DatasourceType.ELASTICSEARCH
};
datasource.data = datasource.data.map(d => JSON.stringify(d)).join('\n');

4
spec/graphite.jest.ts

@ -1,11 +1,13 @@
import { Datasource, Metric } from '../src/index';
import { DatasourceType } from '../src/metrics/metric';
import 'jest';
describe('correct Graphite query', function() {
let datasource: Datasource = {
url: 'http://example.com:1234',
type: 'graphite',
type: DatasourceType.GRAPHITE,
params: {
db: '',
q: '',

4
spec/postgres.jest.ts

@ -1,5 +1,5 @@
import { PostgresMetric } from '../src/metrics/postgres_metric';
import { MetricQuery } from '../src/metrics/metric';
import { DatasourceType, MetricQuery } from '../src/metrics/metric';
import 'jest';
import * as _ from 'lodash';
@ -228,7 +228,7 @@ function getMetricForSqlQuery(query: string = ''): PostgresMetric {
const datasource = {
url: 'api/tsdb/query',
type: 'postgres',
type: DatasourceType.POSTGRES,
data: queryPayload
};

6
spec/prometheus.jest.ts

@ -1,11 +1,11 @@
import { PrometheusMetric } from '../src/metrics/prometheus_metric';
import 'jest';
import { PrometheusMetric } from '../src/metrics/prometheus_metric';
import { DatasourceType } from '../src/metrics/metric';
describe('Test Prometheus time range processing', function() {
let datasource = {
type: 'prometheus',
type: DatasourceType.PROMETHEUS,
url: 'api/datasources/proxy/4/api/v1/query_range?query=node_disk_io_time_ms&start=1543411320&end=1543432950&step=30'
}
let targets = [];

3
spec/targets.jest.ts

@ -1,4 +1,5 @@
import { Datasource, Metric } from '../src/index';
import { DatasourceType } from '../src/metrics/metric';
import 'jest';
@ -6,7 +7,7 @@ import 'jest';
describe('Correct InfluxDB query', function() {
let datasource: Datasource = {
url: 'url',
type: 'influxdb',
type: DatasourceType.INFLUXDB,
params: {
db: 'db',
q: `SELECT mean("value") FROM "db" WHERE time > xxx AND time <= xxx LIMIT 100 OFFSET 20`,

134
src/grafana_service.ts

@ -1,134 +0,0 @@
import { Metric } from './metrics/metrics_factory';
import { MetricQuery, Datasource } from './metrics/metric';
import { URL } from 'url';
import axios from 'axios';
import * as _ from 'lodash';
export class DataKitError extends Error {
constructor(
message: string,
public datasourceType?: string,
public datasourceUrl?: string
) {
super(message);
}
};
export class BadRange extends DataKitError {};
export class GrafanaUnavailable extends DataKitError {};
export class DatasourceUnavailable extends DataKitError {};
const CHUNK_SIZE = 50000;
/**
* @param metric to query to Grafana
* @returns { values: [time, value][], columns: string[] }
*/
export async function queryByMetric(
metric: Metric, url: string, from: number, to: number, apiKey: string
): Promise<{ values: [number, number][], columns: string[] }> {
if(from > to) {
throw new BadRange(
`Data-kit got wrong range: from ${from} > to ${to}`,
metric.datasource.type,
url
);
}
if(from === to) {
console.warn(`Data-kit got from === to`);
}
const grafanaUrl = getGrafanaUrl(url);
let data = {
values: [],
columns: []
};
while(true) {
let query = metric.metricQuery.getQuery(from, to, CHUNK_SIZE, data.values.length);
query.url = `${grafanaUrl}/${query.url}`;
let res = await queryGrafana(query, apiKey, metric.datasource);
let chunk = metric.metricQuery.getResults(res);
let values = chunk.values;
data.values = data.values.concat(values);
data.columns = chunk.columns;
if(values.length < CHUNK_SIZE) {
// because if we get less that we could, then there is nothing more
break;
}
}
return data;
}
async function queryGrafana(query: MetricQuery, apiKey: string, datasource: Datasource) {
let headers = { Authorization: `Bearer ${apiKey}` };
if(query.headers !== undefined) {
_.merge(headers, query.headers);
}
let axiosQuery = {
headers,
url: query.url,
method: query.method,
};
_.defaults(axiosQuery, query.schema);
try {
var res = await axios(axiosQuery);
} catch (e) {
const msg = `Data kit: fail while request data: ${e.message}`;
const parsedUrl = new URL(query.url);
const queryUrl = `query url: ${JSON.stringify(parsedUrl.pathname)}`;
console.error(`${msg} ${queryUrl}`);
if(e.errno === 'ECONNREFUSED') {
throw new GrafanaUnavailable(e.message);
}
if(e.response !== undefined) {
console.error(`Response: \
status: ${e.response.status}, \
response data: ${JSON.stringify(e.response.data)}, \
headers: ${JSON.stringify(e.response.headers)}
`);
if(e.response.status === 401) {
throw new Error(`Unauthorized. Check the API_KEY. ${e.message}`);
}
if(e.response.status === 502) {
let datasourceError = new DatasourceUnavailable(
`datasource ${parsedUrl.pathname} unavailable, message: ${e.message}`,
datasource.type,
query.url
);
throw datasourceError;
}
}
throw new Error(msg);
}
return res;
}
function getGrafanaUrl(url: string) {
const parsedUrl = new URL(url);
const path = parsedUrl.pathname;
const panelUrl = path.match(/^\/*([^\/]*)\/d\//);
if(panelUrl === null) {
return url;
}
const origin = parsedUrl.origin;
const grafanaSubPath = panelUrl[1];
if(grafanaSubPath.length > 0) {
return `${origin}/${grafanaSubPath}`;
}
return origin;
}

69
src/index.ts

@ -1,4 +1,69 @@
import { MetricResults, QueryType } from './metrics/metric';
import { Metric } from './metrics/metrics_factory';
import { queryDirect } from './services/direct_service';
import { queryGrafana } from './services/grafana_service';
import { BadRange } from './types';
export { Metric } from './metrics/metrics_factory';
export { Datasource } from './metrics/metric'
// TODO: move queryByMetric from Grafana service
export { queryByMetric, GrafanaUnavailable, DatasourceUnavailable } from './grafana_service';
export { DatasourceUnavailable } from './types';
export { GrafanaUnavailable } from './services/grafana_service';
const CHUNK_SIZE = 50000;
/**
* @param metric to query to Grafana
* @returns { values: [time, value][], columns: string[] }
*/
export async function queryByMetric(
// TODO: check how did we wanna use `url` field
metric: Metric, url: string, from: number, to: number, queryType: QueryType,
// TODO: we need an abstract DatasourceConfig class which will differ in direct and grafana queries
apiKey?: string
): Promise<MetricResults> {
if(from > to) {
throw new BadRange(
`Data-kit got wrong range: from ${from} > to ${to}`,
metric.datasource.type,
url
);
}
if(from === to) {
console.warn(`Data-kit got from === to`);
}
let data: MetricResults = {
values: [],
columns: []
};
while(true) {
let query = metric.metricQuery.getQuery(from, to, CHUNK_SIZE, data.values.length);
let res: any;
// TODO: use polymorphic `query` method instead
switch(queryType) {
case QueryType.GRAFANA:
res = await queryGrafana(query, apiKey as string, metric.datasource);
break;
case QueryType.DIRECT:
res = await queryDirect(query, metric.datasource);
break;
default:
throw new Error(`Unknown query type: ${queryType}`);
}
let chunk = metric.metricQuery.getResults(res);
let values = chunk.values;
data.values = data.values.concat(values);
data.columns = chunk.columns;
if(values.length < CHUNK_SIZE) {
// because if we get less that we could, then there is nothing more
break;
}
}
return data;
}

22
src/metrics/elasticsearch_metric.ts

@ -1,5 +1,5 @@
import { AbstractMetric, Datasource, MetricId, MetricQuery, MetricResults } from './metric';
import { DataKitError } from '../grafana_service';
import { TsdbKitError } from '../types';
import * as _ from 'lodash';
@ -18,11 +18,11 @@ export type QueryConfig = {
export type Aggregation = {
date_histogram: {
interval: String,
field: String,
min_doc_count: Number,
extended_bounds: { min: String, max: String },
format: String
interval: string,
field: string,
min_doc_count: number,
extended_bounds: { min: string, max: string },
format: string
}
};
@ -36,13 +36,13 @@ export class ElasticsearchMetric extends AbstractMetric {
getQuery(from: number, to: number, limit: number, offset: number): MetricQuery {
let data = this.datasource.data.split('\n').map(d => d === '' ? d: JSON.parse(d));
if(data.length === 0) {
throw new DataKitError('Datasource data is empty');
throw new TsdbKitError('Datasource data is empty');
}
const queryConfig: QueryConfig = data[1];
queryConfig.size = 0;
let timeField = null;
let timeField: string | null = null;
let aggs = _.filter(queryConfig.aggs, f => _.has(f, DATE_HISTOGRAM_FIELD));
_.each(aggs, (agg: Aggregation) => {
@ -65,7 +65,7 @@ export class ElasticsearchMetric extends AbstractMetric {
let filters = queryConfig.query.bool.filter.filter(f => _.has(f, 'range')) as RangeFilter[];
if(filters.length === 0) {
throw new DataKitError('Empty filters');
throw new TsdbKitError('Empty filters');
}
let range = filters[0].range;
range[timeField].gte = from.toString();
@ -106,7 +106,7 @@ export class ElasticsearchMetric extends AbstractMetric {
const bucketAggs = JSON.stringify(this.targets[0].bucketAggs);
const aggregationKeys = JSON.stringify(_.keys(aggregations));
console.error(`can't find related aggregation id. bucketAggs:${bucketAggs} aggregationKeys:${aggregationKeys}`);
throw new DataKitError(`can't find related aggregation id`);
throw new TsdbKitError(`can't find related aggregation id`);
} else {
aggrgAgg = aggrgAgg[0].id;
}
@ -114,7 +114,7 @@ export class ElasticsearchMetric extends AbstractMetric {
let agg = this.targets[0].metrics.filter(m => !m.hide).map(m => m.id);
if(agg.length > 1) {
throw new DataKitError(`multiple series for metric are not supported currently: ${JSON.stringify(agg)}`);
throw new TsdbKitError(`multiple series for metric are not supported currently: ${JSON.stringify(agg)}`);
}
agg = agg[0];

17
src/metrics/metric.ts

@ -1,7 +1,20 @@
export enum QueryType {
DIRECT = 'direct',
GRAFANA = 'grafana',
}
export enum DatasourceType {
INFLUXDB = 'influxdb',
GRAPHITE = 'graphite',
PROMETHEUS = 'prometheus',
POSTGRES = 'postgres',
ELASTICSEARCH = 'elasticsearch',
MYSQL = 'mysql',
}
export declare type Datasource = {
url: string;
// TODO: type: enum
type: string;
type: DatasourceType;
params?: {
db: string;
q: string;

21
src/metrics/metrics_factory.ts

@ -1,6 +1,6 @@
import { InfluxdbMetric } from './influxdb_metric';
import { GraphiteMetric } from './graphite_metric';
import { AbstractMetric, Datasource, MetricId } from './metric';
import { AbstractMetric, Datasource, DatasourceType, MetricId } from './metric';
import { PrometheusMetric } from './prometheus_metric';
import { PostgresMetric } from './postgres_metric';
import { ElasticsearchMetric } from './elasticsearch_metric';
@ -12,13 +12,13 @@ export function metricFactory(
id?: MetricId
): AbstractMetric {
let classMap = {
'influxdb': InfluxdbMetric,
'graphite': GraphiteMetric,
'prometheus': PrometheusMetric,
'postgres': PostgresMetric,
'elasticsearch': ElasticsearchMetric,
'mysql': MysqlMetric,
const classMap = {
[DatasourceType.INFLUXDB]: InfluxdbMetric,
[DatasourceType.GRAPHITE]: GraphiteMetric,
[DatasourceType.PROMETHEUS]: PrometheusMetric,
[DatasourceType.POSTGRES]: PostgresMetric,
[DatasourceType.ELASTICSEARCH]: ElasticsearchMetric,
[DatasourceType.MYSQL]: MysqlMetric,
};
if(classMap[datasource.type] === undefined) {
console.error(`Datasources of type ${datasource.type} are not supported currently`);
@ -32,7 +32,7 @@ export class Metric {
datasource: Datasource;
targets: any[];
id?: MetricId;
private _metricQuery: AbstractMetric = undefined;
private _metricQuery?: AbstractMetric;
constructor(datasource: Datasource, targets: any[], id?: MetricId) {
if(datasource === undefined) {
@ -41,9 +41,6 @@ export class Metric {
if(targets === undefined) {
throw new Error('targets is undefined');
}
if(targets.length === 0) {
throw new Error('targets is empty');
}
this.datasource = datasource;
this.targets = targets;
this.id = id;

48
src/services/direct_service.ts

@ -0,0 +1,48 @@
import { DatasourceUnavailable } from '../types';
import { Datasource, MetricQuery } from '../metrics/metric';
import axios from 'axios';
import * as _ from 'lodash';
// TODO: support direct queries auth
// TODO: move to class and inherit from QueryService abstract class
export async function queryDirect(query: MetricQuery, datasource: Datasource) {
let axiosQuery = {
url: query.url,
method: query.method,
};
console.log(axiosQuery)
_.defaults(axiosQuery, query.schema);
try {
return axios(axiosQuery);
} catch (e) {
// TODO: seems like this error handler can be used for both Grafana and Direct queries
const msg = `TSDB-kit: fail while request data: ${e.message}`;
const parsedUrl = new URL(query.url);
const queryUrl = `query url: ${JSON.stringify(parsedUrl.pathname)}`;
console.error(`${msg} ${queryUrl}`);
if(e.response !== undefined) {
console.error(`Response: \
status: ${e.response.status}, \
response data: ${JSON.stringify(e.response.data)}, \
headers: ${JSON.stringify(e.response.headers)}
`);
if(e.response.status === 401) {
throw new Error(`Unauthorized. Check credentials. ${e.message}`);
}
if(e.response.status === 502) {
let datasourceError = new DatasourceUnavailable(
`datasource ${parsedUrl.pathname} unavailable, message: ${e.message}`,
datasource.type,
query.url
);
throw datasourceError;
}
}
throw new Error(msg);
}
}

77
src/services/grafana_service.ts

@ -0,0 +1,77 @@
import { Datasource, MetricQuery } from '../metrics/metric';
import { TsdbKitError, DatasourceUnavailable } from '../types';
import axios from 'axios';
import * as _ from 'lodash';
export class GrafanaUnavailable extends TsdbKitError { };
// TODO: move to class and inherit from QueryService abstract class
export async function queryGrafana(query: MetricQuery, apiKey: string, datasource: Datasource) {
let headers = { Authorization: `Bearer ${apiKey}` };
const grafanaUrl = getGrafanaUrl(query.url);
query.url = `${grafanaUrl}/${query.url}`;
if(query.headers !== undefined) {
_.merge(headers, query.headers);
}
let axiosQuery = {
headers,
url: query.url,
method: query.method,
};
_.defaults(axiosQuery, query.schema);
try {
return axios(axiosQuery);
} catch (e) {
// TODO: seems like this error handler can be used for both Grafana and Direct queries
const msg = `TSDB-kit: fail while request data: ${e.message}`;
const parsedUrl = new URL(query.url);
const queryUrl = `query url: ${JSON.stringify(parsedUrl.pathname)}`;
console.error(`${msg} ${queryUrl}`);
if(e.errno === 'ECONNREFUSED') {
throw new GrafanaUnavailable(e.message);
}
if(e.response !== undefined) {
console.error(`Response: \
status: ${e.response.status}, \
response data: ${JSON.stringify(e.response.data)}, \
headers: ${JSON.stringify(e.response.headers)}
`);
if(e.response.status === 401) {
throw new Error(`Unauthorized. Check the API_KEY. ${e.message}`);
}
if(e.response.status === 502) {
let datasourceError = new DatasourceUnavailable(
`datasource ${parsedUrl.pathname} unavailable, message: ${e.message}`,
datasource.type,
query.url
);
throw datasourceError;
}
}
throw new Error(msg);
}
}
function getGrafanaUrl(url: string): string {
const parsedUrl = new URL(url);
const path = parsedUrl.pathname;
const panelUrl = path.match(/^\/*([^\/]*)\/d\//);
if(panelUrl === null) {
return url;
}
const origin = parsedUrl.origin;
const grafanaSubPath = panelUrl[1];
if(grafanaSubPath.length > 0) {
return `${origin}/${grafanaSubPath}`;
}
return origin;
}

25
src/tsdb-kit/index.ts

@ -1,31 +1,26 @@
import { queryByMetric, Metric } from '..';
import { PrometheusMetric } from '../metrics/prometheus_metric';
import { DatasourceType, QueryType } from '../metrics/metric';
import axios from 'axios';
import * as _ from 'lodash';
// TODO: these `const`s should be CLI arguments
const PROMETHEUS_URL = 'http://localhost:9090';
const QUERY = '100-(avg by (instance) (irate(node_cpu_seconds_total{job="nvicta-ai-node-exporter",mode="idle"}[5m])) * 100)';
const FROM = 1660307430000; // ms
const TO = 1660307437000; // ms
const datasource = {
type: 'prometheus',
url: 'api/v1/query_range?query=100-(avg by (instance) (irate(node_cpu_seconds_total{job="nvicta-ai-node-exporter",mode="idle"}[5m])) * 100)&start=1543411320&end=1543432950&step=30'
type: DatasourceType.PROMETHEUS,
url: `${PROMETHEUS_URL}/api/v1/query_range?query=${QUERY}&start=1543411320&end=1543432950&step=30`
}
const targets = [];
const prometheus = new PrometheusMetric(datasource, targets);
const query = prometheus.getQuery(FROM, TO, 1000, 0);
const axiosQuery = {
url: `${PROMETHEUS_URL}/${query.url}`,
method: query.method,
};
_.defaults(axiosQuery, query.schema);
axios(axiosQuery)
.then(resp => {
console.log(prometheus.getResults(resp));
const metric = new Metric(datasource, targets);
queryByMetric(metric as any, PROMETHEUS_URL, FROM, TO, QueryType.DIRECT)
.then(res => {
console.log(res);
})
.catch(err => {
console.error('Query error: ', err);

15
src/types.ts

@ -0,0 +1,15 @@
import { DatasourceType } from './metrics/metric';
export class TsdbKitError extends Error {
constructor(
message: string,
public datasourceType?: DatasourceType,
public datasourceUrl?: string
) {
super(message);
}
};
export class BadRange extends TsdbKitError {};
export class DatasourceUnavailable extends TsdbKitError {};
Loading…
Cancel
Save