diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..d47b80a --- /dev/null +++ b/.gitignore @@ -0,0 +1,7 @@ +dist/ +dist-test/ +node_modules/ +npm-debug.log +.vscode/ +lib/ +package-lock.json diff --git a/.npmignore b/.npmignore new file mode 100644 index 0000000..ab72757 --- /dev/null +++ b/.npmignore @@ -0,0 +1,6 @@ +src +spec +.travis.yml +jest.config.js +tsconfig.jest.json +tsconfig.json diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..56556a3 --- /dev/null +++ b/.travis.yml @@ -0,0 +1,24 @@ +language: node_js +node_js: +- '8' +before_script: +- npm install +script: +- npm test + +jobs: + include: + - stage: npm release + script: + - npm run build + deploy: + provider: npm + skip_cleanup: true + email: ping@corpglory.com + api_key: + secure: hdGJrwq7fny1EPGZqxX7/Khyw4kokW5/304JAvKVSdSVuonGNCNzgPO5UJppdN9UrX3RZTvs5NdaJUGt0Xhq+9UlxfGxg6Gl44kf8AVNFHy6+YsZu4kWCEEeFFLraELeQ+K+2U6LOeoQ7muGvTlLpmfkT+J9NVUgdxHsrmziktt+iWIY2a6gOjJwLXC8lbwBy7UzQq7v8YJX6hU5t4FwlsNFwObpaKRK4xRwSDnTnHurJnTzLNcR5+sp6Ltx0EKAcbwqTXv8iTJsKMfTXimXdWuIrQpuyfpNyfYyjWxK2AU01qFAA3+ianv2sRQHqm56R9oXu+rTC9v8djutwuR4uCaTeeSVIO2zp6HcnWHciNVjUXe1DijjqBU1NIDq5wPPbW9V2meXXCWgW0m2iY+2PDQDa26PIIxS6NvYpwITW903FhBuB6VHGppPu/1J87hzo7FJrWkies4rIpi2xD9tosIQ0EInIi1m2o65oncOGNzUvS9UMyU/e0jGPnQ6Q5sqrUm8juvn+elrevFCrYIYKvQ5k+MJWurTyaq0S0xMx7pacVImKb2pirtxSVmo0nCSpFgagKAkN6+dXLO+siuDMmwMJvKqRg0+9SclYcYjobexiKNLaOulgLfOlSpjbFdVhQjWPJLZL50/y4R5NuiAzOCSeKNvRjw2YHIKaTvCWZg= + on: + tags: true + +notifications: + email: false diff --git a/README.md b/README.md new file mode 100644 index 0000000..6329b6f --- /dev/null +++ b/README.md @@ -0,0 +1,22 @@ +# tsdb-kit + +[![Build Status](https://travis-ci.org/CorpGlory/tsdb-kit.svg?branch=master)](https://travis-ci.org/CorpGlory/tsdb-kit) + +Node.js library for running Grafana datasources on backend plus utils. +You can send your datasource metric from Grafana to compile it on Node.js and query your datasource via Grafana API in background. + +User gets unified interface to all datasources. Library gives single output format: fields order, time units, etc + +## Supported datasources + +* Influxdb +* Graphite +* Prometheus +* PostgreSQL / TimescaleDB +* ElasticSearch + +Please write us a letter if you want your datasource to be supported: ping@corpglory.com + +## Projects based on library +* [grafana-data-exporter](https://github.com/CorpGlory/grafana-data-exporter) +* [Hastic](https://github.com/hastic/hastic-server) diff --git a/jest.config.js b/jest.config.js new file mode 100644 index 0000000..4f0f1e2 --- /dev/null +++ b/jest.config.js @@ -0,0 +1,21 @@ +module.exports = { + "verbose": true, + "globals": { + "ts-jest": { + "useBabelrc": true, + "tsConfigFile": "tsconfig.jest.json" + } + }, + "transform": { + "\\.ts": "ts-jest" + }, + "testRegex": "(\\.|/)([jt]est)\\.[jt]s$", + "moduleFileExtensions": [ + "ts", + "js", + "json" + ], + "setupFiles": [ + "/spec/setup_tests.ts" + ] +}; diff --git a/package.json b/package.json new file mode 100644 index 0000000..14936ae --- /dev/null +++ b/package.json @@ -0,0 +1,36 @@ +{ + "name": "tsdb-kit", + "version": "0.1.17", + "description": "", + "scripts": { + "build": "tsc", + "dev": "tsc -w", + "test": "jest --config jest.config.js" + }, + "repository": { + "type": "git", + "url": "git+https://github.com/CorpGlory/tsdb-kit.git" + }, + "author": "CorpGlory Inc.", + "publishConfig": { + "access": "public" + }, + "license": "", + "bugs": { + "url": "https://github.com/CorpGlory/tsdb-kit/issues" + }, + "homepage": "https://github.com/CorpGlory/tsdb-kit", + "dependencies": { + "axios": "^0.18.0", + "moment": "^2.22.2", + "url": "^0.11.0" + }, + "devDependencies": { + "@types/jest": "24.0.0", + "jest": "24.0.0", + "ts-jest": "23.10.5", + "typescript": "3.3.1" + }, + "main": "./lib/index.js", + "typings": "./lib/index.d.ts" +} diff --git a/spec/elasticsearch.jest.ts b/spec/elasticsearch.jest.ts new file mode 100644 index 0000000..cb797da --- /dev/null +++ b/spec/elasticsearch.jest.ts @@ -0,0 +1,254 @@ +import { ElasticsearchMetric } from '../src/metrics/elasticsearch_metric'; +import { MetricQuery, Datasource } from '../src/metrics/metric'; + +import 'jest'; +import * as _ from 'lodash'; + +describe('simple query', function(){ + + let datasourse: Datasource = { + url: "api/datasources/proxy/1/_msearch", + data: [{ + "search_type": "query_then_fetch", + "ignore_unavailable": true, + "index": "metricbeat-*" + }, + { + "size": 0, + "query": { + "bool": { + "filter": [ + { + "range": { + "@timestamp": { + "gte": "1545933121101", + "lte": "1545954721101", + "format": "epoch_millis" + } + } + }, + { + "query_string": { + "analyze_wildcard": true, + "query": "beat.hostname:opt-project.ru AND !system.network.name:\"IBM USB Remote NDIS Network Device\"" + } + } + ] + } + }, + "aggs": { + "2": { + "date_histogram": { + "interval": "30s", + "field": "@timestamp", + "min_doc_count": 0, + "extended_bounds": { + "min": "1545933121101", + "max": "1545954721101" + }, + "format": "epoch_millis" + }, + "aggs": { + "1": { + "avg": { + "field": "system.network.in.bytes" + } + }, + "3": { + "derivative": { + "buckets_path": "1" + } + } + } + } + } + }], + type: "elasticsearch" + }; + datasourse.data = datasourse.data.map(d => JSON.stringify(d)).join('\n'); + + let targets = [ + { + "bucketAggs": [ + { + "field": "@timestamp", + "id": "2", + "settings": { + "interval": "auto", + "min_doc_count": 0, + "trimEdges": 0 + }, + "type": "date_histogram" + } + ], + "metrics": [ + { + "field": "system.network.in.bytes", + "hide": true, + "id": "1", + "meta": {}, + "pipelineAgg": "select metric", + "settings": {}, + "type": "avg" + }, + { + "field": "1", + "id": "3", + "meta": {}, + "pipelineAgg": "1", + "settings": {}, + "type": "derivative" + } + ], + "query": "beat.hostname:opt-project.ru AND !system.network.name:\"IBM USB Remote NDIS Network Device\"", + "refId": "A", + "target": "carbon.agents.0b0226864dc8-a.cpuUsage", + "timeField": "@timestamp" + } + ]; + + let queryTemplate = [{ + "search_type": "query_then_fetch", + "ignore_unavailable": true, + "index": "metricbeat-*" + }, + { + "size": 0, + "query": { + "bool": { + "filter": [ + { + "range": { + "@timestamp": { + "gte": "0", + "lte": "1", + "format": "epoch_millis" + } + } + }, + { + "query_string": { + "analyze_wildcard": true, + "query": "beat.hostname:opt-project.ru AND !system.network.name:\"IBM USB Remote NDIS Network Device\"" + } + } + ] + } + }, + "aggs": { + "2": { + "date_histogram": { + "interval": "30s", + "field": "@timestamp", + "min_doc_count": 0, + "extended_bounds": { + "min": "1545933121101", + "max": "1545954721101" + }, + "format": "epoch_millis" + }, + "aggs": { + "1": { + "avg": { + "field": "system.network.in.bytes" + } + }, + "3": { + "derivative": { + "buckets_path": "1" + } + } + } + } + } + }]; + + let elasticMetric = new ElasticsearchMetric(datasourse, targets); + + it('check correct time processing', function() { + let expectedQuery = { + url: datasourse.url, + method: 'POST', + schema: { + data: queryTemplate.map(e => JSON.stringify(e)).join('\n') + } + }; + + let from = 0; + let to = 1; + let limit = 222; + let offset = 333; + + expect(elasticMetric.getQuery(from, to, limit, offset)).toEqual(expectedQuery); + }); + + + let result = { + "data": { + "responses": [ + { + "took": 39, + "timed_out": false, + "_shards": { + "total": 37, + "successful": 37, + "failed": 0 + }, + "hits": { + "total": 63127, + "max_score": 0.0, + "hits": [] + }, + "aggregations": { + "2": { + "buckets": [ + { + "key_as_string": "1545934140000", + "key": 1545934140000, + "doc_count": 118, + "1": { + "value": 8.640455022375E9 + } + }, + { + "key_as_string": "1545934200000", + "key": 1545934200000, + "doc_count": 178, + "1": { + "value": 8.641446309833334E9 + }, + "3": { + "value": 991287.4583339691 + } + }, + { + "key_as_string": "1545934260000", + "key": 1545934260000, + "doc_count": 177, + "1": { + "value": 8.642345302333334E9 + }, + "3": { + "value": 898992.5 + } + } + ] + } + } + } + ] + } + }; + + it('check results parsing', function() { + let expectedResult = { + columns: ['timestamp', 'target'], + values: [[1545934140000, null], + [1545934200000, 991287.4583339691], + [1545934260000, 898992.5] + ] + } + + expect(elasticMetric.getResults(result)).toEqual(expectedResult); + }); +}); diff --git a/spec/graphite.jest.ts b/spec/graphite.jest.ts new file mode 100644 index 0000000..43eb12c --- /dev/null +++ b/spec/graphite.jest.ts @@ -0,0 +1,25 @@ +import { Datasource, Metric } from '../src/index'; + +import 'jest'; + +describe('correct Graphite query', function() { + let datasource: Datasource = { + url: 'http://example.com:1234', + type: 'graphite', + params: { + db: '', + q: '', + epoch: '' + }, + data: 'target=target=template(hosts.$hostname.cpu, hostname=\"worker1\")&from=00:00_00000000&until=00:00_00000000&maxDataPoints=000' + }; + + let target = `target=template(hosts.$hostname.cpu, hostname="worker1")`; + let query = new Metric(datasource, [target]); + + it("test simple query with time clause", function () { + expect(query.metricQuery.getQuery(1534809600000, 1537488000000, 500, 0).url).toBe( + `${datasource.url}?target=${target}&from=1534809600&until=1537488000&maxDataPoints=500` + ) + }); +}) diff --git a/spec/postgres.jest.ts b/spec/postgres.jest.ts new file mode 100644 index 0000000..04b3b68 --- /dev/null +++ b/spec/postgres.jest.ts @@ -0,0 +1,254 @@ +import { PostgresMetric } from '../src/metrics/postgres_metric'; +import { MetricQuery } from '../src/metrics/metric'; + +import 'jest'; +import * as _ from 'lodash'; + + +describe('Test query creation', function() { + + let limit = 1000; + let offset = 0; + let from = 1542983750857; + let to = 1542984313292; + let postgres = getDefaultMetric(); + let mQuery: MetricQuery = postgres.getQuery(from, to, limit, offset); + + it('test that payload placed to data field', function() { + expect('data' in mQuery.schema).toBeTruthy(); + expect('queries' in mQuery.schema.data).toBeTruthy(); + expect(mQuery.schema.data.queries).toBeInstanceOf(Array); + }); + + it('test from/to casting to string', function() { + expect(typeof mQuery.schema.data.from).toBe('string'); + expect(typeof mQuery.schema.data.to).toBe('string'); + }); + + it('method should be POST', function() { + expect(mQuery.method.toLocaleLowerCase()).toBe('post'); + }); +}); + +describe('Test result parsing', function() { + let postgres = getDefaultMetric(); + let timestamps = [1542983800000, 1542983800060, 1542983800120] + let response = { + data: { + results: { + A: { + refId: 'A', + meta: { + rowCount:0, + sql: 'SELECT "time" AS "time", val FROM local ORDER BY 1' + }, + series: [ + { + name:"val", + points: [ + [622, timestamps[0]], + [844, timestamps[1]], + [648, timestamps[2]] + ] + } + ], + tables: 'null' + } + } + } + } + + let result = postgres.getResults(response); + + it('check results columns order', function() { + let timestampColumnNumber = result.columns.indexOf('timestamp'); + expect(result.values.map(v => v[timestampColumnNumber])).toEqual(timestamps); + }); +}); + +describe('Test sql processing', function() { + let limit = 1000; + let offset = 77; + let from = 1542983750857; + let to = 1542984313292; + + let check = function(original: string, expected: string) { + checkExpectation(original, expected, from, to, limit, offset); + } + + it('simple sql with one select', function() { + let original = `SELECT + \"time\" AS \"time\", + val + FROM local + ORDER BY 1`; + let expected = `SELECT + \"time\" AS \"time\", + val + FROM local + ORDER BY 1 LIMIT ${limit} OFFSET ${offset}`; + check(original, expected); + }); + + it('sql with order by rows', function() { + let original = `SELECT + $__time(time), + AVG(power) OVER(ORDER BY speed ROWS BETWEEN 150 PRECEDING AND CURRENT ROW) + FROM + wind_pwr_spd + WHERE + $__timeFilter(time)`; + let expected = `SELECT + $__time(time), + AVG(power) OVER(ORDER BY speed ROWS BETWEEN 150 PRECEDING AND CURRENT ROW) + FROM + wind_pwr_spd + WHERE + $__timeFilter(time) LIMIT ${limit} OFFSET ${offset}`; + check(original,expected); + }); + + it('sql with offset limit', function() { + let original = `WITH RECURSIVE t(n) AS ( + VALUES (1) + UNION ALL + SELECT n+1 FROM t WHERE n < 100 + ) + SELECT sum(n) FROM t OFFSET 0 LIMIT 0;`; + + + let expected = `WITH RECURSIVE t(n) AS ( + VALUES (1) + UNION ALL + SELECT n+1 FROM t WHERE n < 100 + ) + SELECT sum(n) FROM t OFFSET ${offset} LIMIT ${limit};`; + check(original, expected); + }); + + it('sql with macroses', function() { + let original = `SELECT + time + FROM metric_values + WHERE time > $__timeFrom() + OR time < $__timeFrom() + OR 1 < $__unixEpochFrom() + OR $__unixEpochTo() > 1 ORDER BY 1`; + let expected = `SELECT + time + FROM metric_values + WHERE time > $__timeFrom() + OR time < $__timeFrom() + OR 1 < $__unixEpochFrom() + OR $__unixEpochTo() > 1 ORDER BY 1 LIMIT ${limit} OFFSET ${offset}`; + check(original, expected); + }); + + it('complex sql with one select', function() { + let original = `SELECT + statistics.created_at as time, + CAST(statistics.value AS decimal) as value, + sensor.title as metric + FROM statistics + INNER JOIN sensor + ON sensor.id = statistics.sensor_id + WHERE + statistics.device_id = '000-aaaa-bbbb' + AND sensor.type = 5 + AND sensor.section_id IN($section_id) + AND statistics.value != 'ERR' + AND statistics.value !='???' + AND $__timeFilter(statistics.created_at)`; + let expected = `SELECT + statistics.created_at as time, + CAST(statistics.value AS decimal) as value, + sensor.title as metric + FROM statistics + INNER JOIN sensor + ON sensor.id = statistics.sensor_id + WHERE + statistics.device_id = '000-aaaa-bbbb' + AND sensor.type = 5 + AND sensor.section_id IN($section_id) + AND statistics.value != 'ERR' + AND statistics.value !='???' + AND $__timeFilter(statistics.created_at) LIMIT ${limit} OFFSET ${offset}`; + check(original, expected); + }) + + it('sql with number of nested select', function() { + let original = `WITH regional_sales AS ( + SELECT region, SUM(amount) AS total_sales + FROM orders + GROUP BY region LIMIT 5 OFFSET 1 + ), top_regions AS ( + SELECT region + FROM regional_sales + WHERE total_sales > (SELECT SUM(total_sales)/10 FROM regional_sales) + LIMIT 3 + ) + SELECT region, + product, + SUM(quantity) AS product_units, + SUM(amount) AS product_sales + FROM orders + WHERE region IN (SELECT region FROM top_regions) + GROUP BY region, product OFFSET 500;`; + let expected = `WITH regional_sales AS ( + SELECT region, SUM(amount) AS total_sales + FROM orders + GROUP BY region LIMIT 5 OFFSET 1 + ), top_regions AS ( + SELECT region + FROM regional_sales + WHERE total_sales > (SELECT SUM(total_sales)/10 FROM regional_sales) + LIMIT 3 + ) + SELECT region, + product, + SUM(quantity) AS product_units, + SUM(amount) AS product_sales + FROM orders + WHERE region IN (SELECT region FROM top_regions) + GROUP BY region, product OFFSET ${offset} LIMIT ${limit};`; + check(original, expected); + }); +}); + +function checkExpectation(original: string, expected: string, from: number, to: number, limit: number, offset: number) { + let metric = getMetricWithSql(original); + expect(metric.getQuery(from, to, limit, offset).schema.data.queries[0].rawSql).toBe(expected); +} + +function getMetricWithSql(sql: string): PostgresMetric { + let metric = getDefaultMetric(); + metric.datasource.data.queries[0].rawSql = sql; + return metric; +} + +function getDefaultMetric(): PostgresMetric { + let queryPayload = { + from: 1542983750857, + to: 1542984313292, + queries:[{ + refId: 'A', + intervalMs:2000, + maxDataPoints:191, + datasourceId:1, + rawSql: 'SELECT\n \"time\" AS \"time\",\n val\nFROM local\nORDER BY 1', + format: 'time_series' + }] + }; + + let datasource = { + url: 'api/tsdb/query', + type: 'postgres', + data: queryPayload + }; + + let targets = [{ + refId: 'A', + }]; + + return new PostgresMetric(datasource, targets); +} diff --git a/spec/prometheus.jest.ts b/spec/prometheus.jest.ts new file mode 100644 index 0000000..bfa436e --- /dev/null +++ b/spec/prometheus.jest.ts @@ -0,0 +1,21 @@ +import { PrometheusMetric } from '../src/metrics/prometheus_metric'; + +import 'jest'; + + +describe('Test Prometheus time range processing', function() { + let datasource = { + type: 'prometheus', + url: 'api/datasources/proxy/4/api/v1/query_range?query=node_disk_io_time_ms&start=1543411320&end=1543432950&step=30' + } + let targets = []; + let prometheus = new PrometheusMetric(datasource, targets); + + it('check that from/to present in url', function() { + let from = 1234567891234; //milliseconds + let to = 1234567899999; + let query = prometheus.getQuery(from, to, 1000, 0); + expect(query.url.indexOf(`start=${Math.floor(from / 1000)}`) !== -1).toBeTruthy(); + expect(query.url.indexOf(`end=${Math.floor(to / 1000)}`) !== -1).toBeTruthy(); + }); +}); diff --git a/spec/setup_tests.ts b/spec/setup_tests.ts new file mode 100644 index 0000000..89d2823 --- /dev/null +++ b/spec/setup_tests.ts @@ -0,0 +1,2 @@ +console.log = jest.fn(); +console.error = jest.fn(); diff --git a/spec/targets.jest.ts b/spec/targets.jest.ts new file mode 100644 index 0000000..e50a537 --- /dev/null +++ b/spec/targets.jest.ts @@ -0,0 +1,42 @@ +import { Datasource, Metric } from '../src/index'; + +import 'jest'; + + +describe('Correct InfluxDB query', function() { + let datasource: Datasource = { + url: 'url', + type: 'influxdb', + params: { + db: 'db', + q: `SELECT mean("value") FROM "db" WHERE time > xxx AND time <= xxx LIMIT 100 OFFSET 20`, + epoch: '' + } + }; + + let target = 'mean("value")'; + + it("test query with two time expressions", function() { + let query = new Metric(datasource, [target]); + expect(query.metricQuery.getQuery(1534809600,1537488000,666,10).schema.params.q).toBe( + `SELECT mean("value") FROM "db" WHERE time >= 1534809600ms AND time <= 1537488000ms LIMIT 666 OFFSET 10` + ) + }); + + it('test query with one time expression', function() { + datasource.params.q = `SELECT mean("value") FROM "cpu_value" WHERE time >= now() - 6h GROUP BY time(30s) fill(null)`; + let query = new Metric(datasource, [target]); + expect(query.metricQuery.getQuery(1534809600,1537488000,666,10).schema.params.q).toBe( + `SELECT mean("value") FROM "cpu_value" WHERE time >= 1534809600ms AND time <= 1537488000ms GROUP BY time(30s) fill(null) LIMIT 666 OFFSET 10` + ) + }); + + it('test query with time expression', function() { + datasource.params.q = `SELECT mean("value") FROM "cpu_value" WHERE time>= now() - 6h AND time= 1534809600ms AND time <= 1537488000ms GROUP BY time(30s) fill(null) LIMIT 666 OFFSET 10` + ) + }); + +}) diff --git a/spec/utils.jest.ts b/spec/utils.jest.ts new file mode 100644 index 0000000..3162e85 --- /dev/null +++ b/spec/utils.jest.ts @@ -0,0 +1,18 @@ +import { processSQLLimitOffset } from '../src/metrics/utils'; + +import 'jest'; + + +describe('test utils methods', function(){ + it('test SQL limit offset processing', function() { + expect(processSQLLimitOffset('without', 10, 5)).toBe('without LIMIT 10 OFFSET 5'); + expect(processSQLLimitOffset('limit 3 OFFSET 1', 10, 5)).toBe('LIMIT 10 OFFSET 5'); + expect(processSQLLimitOffset('xxx \nlimit 11\nxxx', 10, 5)).toBe('xxx \nLIMIT 10\nxxx OFFSET 5'); + expect(processSQLLimitOffset('xxx offset 4 xxx', 10, 5)).toBe('xxx OFFSET 5 xxx LIMIT 10'); + expect(processSQLLimitOffset('xxx\nlimit 0\noffset 4\nxxx', 10, 5)).toBe('xxx\nLIMIT 10\nOFFSET 5\nxxx'); + expect(processSQLLimitOffset('()()(limit 3 OFFSET 1) (())', 10, 5)).toBe('()()(limit 3 OFFSET 1) (()) LIMIT 10 OFFSET 5'); + expect(processSQLLimitOffset('()(limit 3) OFFSET 1 ()', 10, 5)).toBe('()(limit 3) OFFSET 5 () LIMIT 10'); + expect(processSQLLimitOffset('(offset 9)(limit 3) OFFSET 1 ()()(()) LIMIT 8 ()', 10, 5)) + .toBe('(offset 9)(limit 3) OFFSET 5 ()()(()) LIMIT 10 ()'); + }); +}); diff --git a/src/grafana_service.ts b/src/grafana_service.ts new file mode 100644 index 0000000..df2b905 --- /dev/null +++ b/src/grafana_service.ts @@ -0,0 +1,134 @@ +import { Metric } from './metrics/metrics_factory'; +import { MetricQuery, Datasource } from './metrics/metric'; + +import { URL } from 'url'; +import axios from 'axios'; +import * as _ from 'lodash'; + +export class DataKitError extends Error { + constructor( + message: string, + public datasourceType?: string, + public datasourceUrl?: string + ) { + super(message); + } +}; + +export class BadRange extends DataKitError {}; +export class GrafanaUnavailable extends DataKitError {}; +export class DatasourceUnavailable extends DataKitError {}; + +const CHUNK_SIZE = 50000; + + +/** + * @param metric to query to Grafana + * @returns { values: [time, value][], columns: string[] } + */ +export async function queryByMetric( + metric: Metric, url: string, from: number, to: number, apiKey: string +): Promise<{ values: [number, number][], columns: string[] }> { + + if(from > to) { + throw new BadRange( + `Data-kit got wrong range: from ${from} > to ${to}`, + metric.datasource.type, + url + ); + } + + if(from === to) { + console.warn(`Data-kit got from === to`); + } + + const grafanaUrl = getGrafanaUrl(url); + + let data = { + values: [], + columns: [] + }; + + while(true) { + let query = metric.metricQuery.getQuery(from, to, CHUNK_SIZE, data.values.length); + query.url = `${grafanaUrl}/${query.url}`; + let res = await queryGrafana(query, apiKey, metric.datasource); + let chunk = metric.metricQuery.getResults(res); + let values = chunk.values; + data.values = data.values.concat(values); + data.columns = chunk.columns; + + if(values.length < CHUNK_SIZE) { + // because if we get less that we could, then there is nothing more + break; + } + } + return data; +} + +async function queryGrafana(query: MetricQuery, apiKey: string, datasource: Datasource) { + let headers = { Authorization: `Bearer ${apiKey}` }; + + if(query.headers !== undefined) { + _.merge(headers, query.headers); + } + + + let axiosQuery = { + headers, + url: query.url, + method: query.method, + }; + + _.defaults(axiosQuery, query.schema); + + try { + var res = await axios(axiosQuery); + } catch (e) { + const msg = `Data kit: fail while request data: ${e.message}`; + const parsedUrl = new URL(query.url); + const queryUrl = `query url: ${JSON.stringify(parsedUrl.pathname)}`; + console.error(`${msg} ${queryUrl}`); + if(e.errno === 'ECONNREFUSED') { + throw new GrafanaUnavailable(e.message); + } + if(e.response !== undefined) { + console.error(`Response: \ + status: ${e.response.status}, \ + response data: ${JSON.stringify(e.response.data)}, \ + headers: ${JSON.stringify(e.response.headers)} + `); + if(e.response.status === 401) { + throw new Error(`Unauthorized. Check the API_KEY. ${e.message}`); + } + if(e.response.status === 502) { + let datasourceError = new DatasourceUnavailable( + `datasource ${parsedUrl.pathname} unavailable, message: ${e.message}`, + datasource.type, + query.url + ); + throw datasourceError; + } + } + throw new Error(msg); + } + + return res; +} + +function getGrafanaUrl(url: string) { + const parsedUrl = new URL(url); + const path = parsedUrl.pathname; + const panelUrl = path.match(/^\/*([^\/]*)\/d\//); + if(panelUrl === null) { + return url; + } + + const origin = parsedUrl.origin; + const grafanaSubPath = panelUrl[1]; + if(grafanaSubPath.length > 0) { + return `${origin}/${grafanaSubPath}`; + } + + return origin; +} diff --git a/src/index.ts b/src/index.ts new file mode 100644 index 0000000..7bc1800 --- /dev/null +++ b/src/index.ts @@ -0,0 +1,3 @@ +export { Metric } from './metrics/metrics_factory'; +export { Datasource } from './metrics/metric' +export { queryByMetric, GrafanaUnavailable, DatasourceUnavailable } from './grafana_service'; diff --git a/src/metrics/elasticsearch_metric.ts b/src/metrics/elasticsearch_metric.ts new file mode 100644 index 0000000..21855d4 --- /dev/null +++ b/src/metrics/elasticsearch_metric.ts @@ -0,0 +1,132 @@ +import { AbstractMetric, Datasource, MetricId, MetricQuery, MetricResults } from './metric'; +import { DataKitError } from '../grafana_service'; + +import * as _ from 'lodash'; + +export type RangeFilter = { range: { [key: string]: { gte: String, lte: String } } }; +export type QueryStringFilter = { query_string: { analyze_wildcard: Boolean, query: String } }; + +export type QueryConfig = { + size: number, + query: { + bool: { + filter: (RangeFilter | QueryStringFilter)[] + } + }, + aggs: { [key: string]: Aggregation } +}; + +export type Aggregation = { + date_histogram: { + interval: String, + field: String, + min_doc_count: Number, + extended_bounds: { min: String, max: String }, + format: String + } +}; + +const DATE_HISTOGRAM_FIELD = 'date_histogram'; + +export class ElasticsearchMetric extends AbstractMetric { + constructor(datasource: Datasource, targets: any[], id?: MetricId) { + super(datasource, targets, id); + } + + getQuery(from: number, to: number, limit: number, offset: number): MetricQuery { + let data = this.datasource.data.split('\n').map(d => d === '' ? d: JSON.parse(d)); + if(data.length === 0) { + throw new DataKitError('Datasource data is empty'); + } + + const queryConfig: QueryConfig = data[1]; + + queryConfig.size = 0; + let timeField = null; + + let aggs = _.filter(queryConfig.aggs, f => _.has(f, DATE_HISTOGRAM_FIELD)); + _.each(aggs, (agg: Aggregation) => { + agg[DATE_HISTOGRAM_FIELD].extended_bounds = { + min: from.toString(), + max: to.toString() + }; + + if(timeField !== null) { + console.warn( + `got more than one datasource time field, change ${timeField} to ${agg[DATE_HISTOGRAM_FIELD].field}` + ); + } + timeField = agg[DATE_HISTOGRAM_FIELD].field; + }); + + if(timeField === null) { + throw new Error('datasource time field not found'); + } + + let filters = queryConfig.query.bool.filter.filter(f => _.has(f, 'range')) as RangeFilter[]; + if(filters.length === 0) { + throw new DataKitError('Empty filters'); + } + let range = filters[0].range; + range[timeField].gte = from.toString(); + range[timeField].lte = to.toString(); + + + data = data + .filter(d => d !== '') + .map(d => JSON.stringify(d)) + .join('\n'); + data += '\n'; + + return { + url: this.datasource.url, + method: 'POST', + schema: { data }, + headers: {'Content-Type': 'application/json'} + } + } + + getResults(res): MetricResults { + let columns = ['timestamp', 'target']; + let values = []; + + if(res.data === undefined || res.data.responses.length < 1) { + console.log('datasource return empty response, no data'); + return { + columns, + values + }; + } + + let aggregations = res.data.responses[0].aggregations; + let aggrgAgg: any = this.targets[0].bucketAggs.filter(a => { + return !a.fake && _.has(aggregations, a.id) + }); + if(_.isEmpty(aggrgAgg)) { + const bucketAggs = JSON.stringify(this.targets[0].bucketAggs); + const aggregationKeys = JSON.stringify(_.keys(aggregations)); + console.error(`can't find related aggregation id. bucketAggs:${bucketAggs} aggregationKeys:${aggregationKeys}`); + throw new DataKitError(`can't find related aggregation id`); + } else { + aggrgAgg = aggrgAgg[0].id; + } + let responseValues = aggregations[aggrgAgg].buckets; + let agg = this.targets[0].metrics.filter(m => !m.hide).map(m => m.id); + + if(agg.length > 1) { + throw new DataKitError(`multiple series for metric are not supported currently: ${JSON.stringify(agg)}`); + } + + agg = agg[0]; + + if(responseValues.length > 0) { + values = responseValues.map(r => [r.key, _.has(r, agg) ? r[agg].value: null]); + } + + return { + columns, + values + } + } +} + diff --git a/src/metrics/graphite_metric.ts b/src/metrics/graphite_metric.ts new file mode 100644 index 0000000..410c122 --- /dev/null +++ b/src/metrics/graphite_metric.ts @@ -0,0 +1,65 @@ +import { AbstractMetric, Datasource, MetricId, MetricQuery, MetricResults } from './metric'; + +import * as _ from 'lodash'; + + +export class GraphiteMetric extends AbstractMetric { + constructor(datasource: Datasource, targets: any[], id?: MetricId) { + super(datasource, targets, id); + } + + getQuery(from: number, to: number, limit: number, offset: number): MetricQuery { + let fromDate = Math.floor(from / 1000); + let toDate = Math.floor(to / 1000); + + let fromRegex = /from=[^\&]+/i; + let untilRegex = /until=[^\&]+/i; + let limitRegex = /maxDataPoints=[^\&]+/i; + + let query: string = this.datasource.data; + let replacements: [RegExp, string][] = [ + [fromRegex, `from=${fromDate}`], + [untilRegex, `until=${toDate}`], + [limitRegex, `maxDataPoints=${limit}`] + ]; + + _.each(replacements, r => { + let k = r[0]; + let v = r[1]; + if(query.search(k)) { + query = query.replace(k, v); + } else { + query += v; + } + }); + + return { + url: `${this.datasource.url}?${query}`, + method: 'GET', + schema: { + params: this.datasource.params + } + } + } + + getResults(res): MetricResults { + + if(res.data === undefined || res.data.length < 1) { + console.log('datasource return empty response, no data'); + return { + columns: ['timestamp', 'target'], + values: [] + }; + } + + return { + columns: ['timestamp', res.data[0]['target']], + values: res.data[0].datapoints.map(point => { + let val = point[0]; + let timestamp = point[1] * 1000; //convert seconds -> ms + return [timestamp, val]; + }) + }; + } +} + diff --git a/src/metrics/influxdb_metric.ts b/src/metrics/influxdb_metric.ts new file mode 100644 index 0000000..a4ef1c2 --- /dev/null +++ b/src/metrics/influxdb_metric.ts @@ -0,0 +1,63 @@ +import { AbstractMetric, Datasource, MetricId, MetricQuery, MetricResults } from "./metric"; +import { processSQLLimitOffset } from './utils'; + + +const INFLUX_QUERY_TIME_REGEX = /time ?[><=]+ ?[^A-Z]+(AND ?time ?[><=]+ ?[^A-Z]+)?/; + + +export class InfluxdbMetric extends AbstractMetric { + + private _queryParts: string[]; + + constructor(datasource: Datasource, targets: any[], id?: MetricId) { + super(datasource, targets, id); + + var queryStr = datasource.params.q; + this._queryParts = queryStr.split(INFLUX_QUERY_TIME_REGEX); + if(this._queryParts.length == 1) { + throw new Error( + `Query "${queryStr}" is not replaced with LIMIT/OFFSET oeprators. Missing time clause.` + ); + } + if(this._queryParts.length > 3) { + throw new Error(`Query "${queryStr}" has multiple time clauses. Can't parse.`); + } + } + + getQuery(from: number, to: number, limit: number, offset: number): MetricQuery { + let timeClause = `time >= ${from}ms AND time <= ${to}ms`; + let q = `${this._queryParts[0]} ${timeClause} ${this._queryParts[2]}`; + q = processSQLLimitOffset(q, limit, offset); + return { + url: this.datasource.url, + method: 'GET', + schema: { + params: { + q, + db: this.datasource.params.db, + epoch: this.datasource.params.epoch + } + } + } + } + + getResults(res): MetricResults { + let emptyResult = { + columns: ['timestamp', 'target'], + values: [] + }; + + if(res.data === undefined || res.data.results.length < 1) { + console.log('datasource return empty response, no data'); + return emptyResult; + } + + // TODO: support more than 1 metric (each res.data.results item is a metric) + let results = res.data.results[0]; + if (results.series === undefined) { + return emptyResult; + } + + return results.series[0]; + } +} diff --git a/src/metrics/metric.ts b/src/metrics/metric.ts new file mode 100644 index 0000000..706274b --- /dev/null +++ b/src/metrics/metric.ts @@ -0,0 +1,39 @@ +export declare type Datasource = { + url: string; + type: string; + params?: { + db: string; + q: string; + epoch: string; + }; + data?: any; +}; + +export type MetricQuery = { + url: string; + method: string; + schema: any; + headers?: any; +} + +export type MetricResults = { + values: any; + columns: any; +} + +export type MetricId = string; + +export abstract class AbstractMetric { + constructor( + public datasource: Datasource, + public targets: any[], + public id?: MetricId + ) {}; + abstract getQuery(from: number, to: number, limit: number, offset: number): MetricQuery; + /* + from / to - timestamp in ms + limit - max number of items in result + offset - number of items to skip from timerange start + */ + abstract getResults(res): MetricResults; +} diff --git a/src/metrics/metrics_factory.ts b/src/metrics/metrics_factory.ts new file mode 100644 index 0000000..3f56a8b --- /dev/null +++ b/src/metrics/metrics_factory.ts @@ -0,0 +1,77 @@ +import { InfluxdbMetric } from './influxdb_metric'; +import { GraphiteMetric } from './graphite_metric'; +import { AbstractMetric, Datasource, MetricId } from './metric'; +import { PrometheusMetric } from './prometheus_metric'; +import { PostgresMetric } from './postgres_metric'; +import { ElasticsearchMetric } from './elasticsearch_metric'; + + +export function metricFactory( + datasource: Datasource, + targets: any[], + id?: MetricId +): AbstractMetric { + + let classMap = { + 'influxdb': InfluxdbMetric, + 'graphite': GraphiteMetric, + 'prometheus': PrometheusMetric, + 'postgres': PostgresMetric, + 'elasticsearch': ElasticsearchMetric + }; + if(classMap[datasource.type] === undefined) { + console.error(`Datasources of type ${datasource.type} are not supported currently`); + throw new Error(`Datasources of type ${datasource.type} are not supported currently`); + } else { + return new classMap[datasource.type](datasource, targets, id); + } +} + +export class Metric { + datasource: Datasource; + targets: any[]; + id?: MetricId; + private _metricQuery: AbstractMetric = undefined; + + constructor(datasource: Datasource, targets: any[], id?: MetricId) { + if(datasource === undefined) { + throw new Error('datasource is undefined'); + } + if(targets === undefined) { + throw new Error('targets is undefined'); + } + if(targets.length === 0) { + throw new Error('targets is empty'); + } + this.datasource = datasource; + this.targets = targets; + this.id = id; + } + + public get metricQuery() { + if(this._metricQuery === undefined) { + this._metricQuery = metricFactory(this.datasource, this.targets, this.id); + } + return this._metricQuery; + } + + + public toObject() { + return { + datasource: this.datasource, + targets: this.targets, + _id: this.id + }; + } + + static fromObject(obj: any): Metric { + if(obj === undefined) { + throw new Error('obj is undefined'); + } + return new Metric( + obj.datasource, + obj.targets, + obj._id + ); + } +} diff --git a/src/metrics/postgres_metric.ts b/src/metrics/postgres_metric.ts new file mode 100644 index 0000000..93849b8 --- /dev/null +++ b/src/metrics/postgres_metric.ts @@ -0,0 +1,63 @@ +import { AbstractMetric, Datasource, MetricId, MetricQuery, MetricResults } from './metric'; +import { processSQLLimitOffset } from './utils'; + +import * as _ from 'lodash'; + + +export class PostgresMetric extends AbstractMetric { + + private _targetName: string; //save first target name, while multi metric not implemented + + constructor(datasource: Datasource, targets: any[], id?: MetricId) { + super(datasource, targets, id); + + if(targets.length === 0) { + throw Error('got empty targets list'); + } + this._targetName = targets[0].refId; + } + + getQuery(from: number, to: number, limit: number, offset: number): MetricQuery { + let queries = this.datasource.data.queries; + _.forEach(queries, q => { + q.rawSql = processSQLLimitOffset(q.rawSql, limit, offset); + }); + return { + url: this.datasource.url, + method: 'POST', + schema: { + data: { + from: String(from), + to: String(to), + queries: queries + } + } + }; + } + + getResults(res): MetricResults { + let emptyResult = { + columns: ['timestamp', 'target'], + values: [] + }; + + if(res.data === undefined || res.data.results.length < 1) { + console.log('datasource return empty response, no data'); + return emptyResult; + } + + // TODO: support more than 1 metric (each res.data.results item is a metric) + let results = res.data.results[this._targetName]; + if (results.series === undefined) { + return emptyResult; + } + + let points = results.series[0].points; + points.forEach(p => p.reverse()); + + return { + columns: ['timestamp', results.series[0].name], + values: points + }; + } +} diff --git a/src/metrics/prometheus_metric.ts b/src/metrics/prometheus_metric.ts new file mode 100644 index 0000000..dd32488 --- /dev/null +++ b/src/metrics/prometheus_metric.ts @@ -0,0 +1,83 @@ +import { AbstractMetric, Datasource, MetricId, MetricQuery, MetricResults } from './metric'; + + +const QUERY_TIME_REGEX = /\&start=[^\&]*\&end=[^\&]*\&/; + +export class PrometheusMetric extends AbstractMetric { + + constructor(datasource: Datasource, targets: any[], id?: MetricId) { + super(datasource, targets, id); + } + + getQuery(from: number, to: number, limit: number, offset: number): MetricQuery { + let url = this.datasource.url; + from = Math.floor(from / 1000); //prometheus uses seconds for timestamp + to = Math.floor(to / 1000); + + url = url.replace(/\&start=[^\&]+/, `&start=${from}`); + url = url.replace(/\&end=[^\&]+/, `&end=${to}`); + return { + url, + method: 'GET', + schema: { + params: this.datasource.params + } + } + } + + getResults(res): MetricResults { + + if(res.data === undefined || res.data.data.result.length < 1) { + console.log('datasource return empty response, no data'); + return { + columns: ['timestamp', 'target'], + values: [] + }; + } + + let result = res.data.data.result; + let result_matrix = { + columns: ['timestamp'], + values: [] + }; + + result.map(r => { + let keys = []; + for(let key in r.metric) { + keys.push(`${key}=${r.metric[key]}`); + } + result_matrix.columns.push(keys.join(':')); + }); + + let values = result.map(r => r.values); + + let timestamps = []; + values.map(v => v.map(row => timestamps.push(row[0]))); + timestamps = timestamps.filter(function(item, i, ar) { + return ar.indexOf(item) === i; //uniq values + }); + + for(let t of timestamps) { + let row = [t]; + values.map(v => { + if(v[0] === undefined) { + row.push(0); + } + + let currentTimestamp = v[0][0]; + let currentValue = v[0][1]; + + if(currentTimestamp === t) { + row.push(+currentValue); + v.shift(); + } + else { + row.push(null); + } + }); + row[0] = +row[0] * 1000; //convert timestamp to ms + result_matrix.values.push(row); + }; + return result_matrix; + } +} diff --git a/src/metrics/utils.ts b/src/metrics/utils.ts new file mode 100644 index 0000000..24ceb58 --- /dev/null +++ b/src/metrics/utils.ts @@ -0,0 +1,48 @@ +import * as _ from 'lodash'; + + +export function processSQLLimitOffset(sql: string, limit: number, offset: number): string { + let splits = sql.split(';'); + if(splits.length > 1 && splits[1] !== '' ) { + throw Error('multiple metrics currently not supported'); + } + sql = splits[0]; // removes ";" from EOL + + let relim = /limit [0-9]+/ig; + let reoff = /offset [0-9]+/ig; + + let limIdx = ensureParentheses(relim, sql); + if(limIdx.index !== -1) { + sql = `${sql.slice(0, limIdx.index)}LIMIT ${limit}${sql.slice(limIdx.index + limIdx.length)}`; + } else { + sql += ` LIMIT ${limit}`; + } + + let offIdx = ensureParentheses(reoff, sql); + if(offIdx.index !== -1) { + sql = `${sql.slice(0, offIdx.index)}OFFSET ${offset}${sql.slice(offIdx.index + offIdx.length)}`; + } else { + sql += ` OFFSET ${offset}`; + } + + if(splits.length === 2) { + sql += ';'; + } + return sql; +} + +function ensureParentheses(regex: RegExp, str: string): { index: number, length: number } { + let occurence: RegExpExecArray; + while((occurence = regex.exec(str)) !== null) { + let leftPart = str.slice(0, occurence.index) + let rightPart = str.slice(occurence.index + occurence[0].length); + + let leftPairing = (leftPart.match(/\(/g) || []).length === (leftPart.match(/\)/g) || []).length; + let rightPairing = (rightPart.match(/\(/g) || []).length === (rightPart.match(/\)/g) || []).length; + + if(leftPairing && rightPairing) { + return { index: occurence.index, length: occurence[0].length }; + } + } + return { index: -1, length: 0 }; +} diff --git a/tsconfig.jest.json b/tsconfig.jest.json new file mode 100644 index 0000000..1c66acf --- /dev/null +++ b/tsconfig.jest.json @@ -0,0 +1,3 @@ +{ + "extends": "./tsconfig" +} diff --git a/tsconfig.json b/tsconfig.json new file mode 100644 index 0000000..01c56a9 --- /dev/null +++ b/tsconfig.json @@ -0,0 +1,12 @@ +{ + "compilerOptions": { + "sourceMap": true, + "module": "commonjs", + "target": "es2015", + "declaration": true, + "outDir": "lib" + }, + "include": [ + "src/**/*" + ] +}