You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
132 lines
3.7 KiB
132 lines
3.7 KiB
import { DatasourceConnector, Datasource, DatasourceQuery, DataTable } from '.'; |
|
import { TsdbKitError } from '../types'; |
|
|
|
import * as _ from 'lodash'; |
|
|
|
export type RangeFilter = { range: { [key: string]: { gte: String, lte: String } } }; |
|
export type QueryStringFilter = { query_string: { analyze_wildcard: Boolean, query: String } }; |
|
|
|
export type ElasticsearchQuery = { |
|
size: number, |
|
query: { |
|
bool: { |
|
filter: (RangeFilter | QueryStringFilter)[] |
|
} |
|
}, |
|
aggs: { [key: string]: Aggregation } |
|
}; |
|
|
|
export type Aggregation = { |
|
date_histogram: { |
|
interval: string, |
|
field: string, |
|
min_doc_count: number, |
|
extended_bounds: { min: string, max: string }, |
|
format: string |
|
} |
|
}; |
|
|
|
const DATE_HISTOGRAM_FIELD = 'date_histogram'; |
|
|
|
export class ElasticsearchConnector extends DatasourceConnector { |
|
constructor(datasource: Datasource, targets: any[]) { |
|
super(datasource, targets); |
|
} |
|
|
|
getQuery(from: number, to: number, limit: number, offset: number): DatasourceQuery { |
|
let data = this.datasource.data.split('\n').map(d => d === '' ? d: JSON.parse(d)); |
|
if(data.length === 0) { |
|
throw new TsdbKitError('Datasource data is empty'); |
|
} |
|
|
|
const query: ElasticsearchQuery = data[1]; |
|
|
|
query.size = 0; |
|
let timeField: string | null = null; |
|
|
|
let aggs = _.filter(query.aggs, f => _.has(f, DATE_HISTOGRAM_FIELD)); |
|
_.each(aggs, (agg: Aggregation) => { |
|
agg[DATE_HISTOGRAM_FIELD].extended_bounds = { |
|
min: from.toString(), |
|
max: to.toString() |
|
}; |
|
|
|
if(timeField !== null) { |
|
console.warn( |
|
`got more than one datasource time field, change ${timeField} to ${agg[DATE_HISTOGRAM_FIELD].field}` |
|
); |
|
} |
|
timeField = agg[DATE_HISTOGRAM_FIELD].field; |
|
}); |
|
|
|
if(timeField === null) { |
|
throw new Error('datasource time field not found'); |
|
} |
|
|
|
let filters = query.query.bool.filter.filter(f => _.has(f, 'range')) as RangeFilter[]; |
|
if(filters.length === 0) { |
|
throw new TsdbKitError('Empty filters'); |
|
} |
|
let range = filters[0].range; |
|
range[timeField].gte = from.toString(); |
|
range[timeField].lte = to.toString(); |
|
|
|
|
|
data = data |
|
.filter(d => d !== '') |
|
.map(d => JSON.stringify(d)) |
|
.join('\n'); |
|
data += '\n'; |
|
|
|
return { |
|
url: this.datasource.url, |
|
method: 'POST', |
|
schema: { data }, |
|
headers: {'Content-Type': 'application/json'} |
|
} |
|
} |
|
|
|
parseResponse(res): DataTable { |
|
let columns = ['timestamp', 'target']; |
|
let values = []; |
|
|
|
if(res.data === undefined || res.data.responses.length < 1) { |
|
console.log('datasource return empty response, no data'); |
|
return { |
|
columns, |
|
values |
|
}; |
|
} |
|
|
|
let aggregations = res.data.responses[0].aggregations; |
|
let aggrgAgg: any = this.targets[0].bucketAggs.filter(a => { |
|
return !a.fake && _.has(aggregations, a.id) |
|
}); |
|
if(_.isEmpty(aggrgAgg)) { |
|
const bucketAggs = JSON.stringify(this.targets[0].bucketAggs); |
|
const aggregationKeys = JSON.stringify(_.keys(aggregations)); |
|
console.error(`can't find related aggregation id. bucketAggs:${bucketAggs} aggregationKeys:${aggregationKeys}`); |
|
throw new TsdbKitError(`can't find related aggregation id`); |
|
} else { |
|
aggrgAgg = aggrgAgg[0].id; |
|
} |
|
let responseValues = aggregations[aggrgAgg].buckets; |
|
let agg = this.targets[0].metrics.filter(m => !m.hide).map(m => m.id); |
|
|
|
if(agg.length > 1) { |
|
throw new TsdbKitError(`multiple series for metric are not supported currently: ${JSON.stringify(agg)}`); |
|
} |
|
|
|
agg = agg[0]; |
|
|
|
if(responseValues.length > 0) { |
|
values = responseValues.map(r => [r.key, _.has(r, agg) ? r[agg].value: null]); |
|
} |
|
|
|
return { |
|
columns, |
|
values |
|
} |
|
} |
|
} |
|
|
|
|