import { DatasourceConnector, Datasource, DatasourceQuery, DataTable } from './metric'; import { TsdbKitError } from '../types'; import * as _ from 'lodash'; export type RangeFilter = { range: { [key: string]: { gte: String, lte: String } } }; export type QueryStringFilter = { query_string: { analyze_wildcard: Boolean, query: String } }; export type ElasticsearchQuery = { size: number, query: { bool: { filter: (RangeFilter | QueryStringFilter)[] } }, aggs: { [key: string]: Aggregation } }; export type Aggregation = { date_histogram: { interval: string, field: string, min_doc_count: number, extended_bounds: { min: string, max: string }, format: string } }; const DATE_HISTOGRAM_FIELD = 'date_histogram'; export class ElasticsearchMetric extends DatasourceConnector { constructor(datasource: Datasource, targets: any[]) { super(datasource, targets); } getQuery(from: number, to: number, limit: number, offset: number): DatasourceQuery { let data = this.datasource.data.split('\n').map(d => d === '' ? d: JSON.parse(d)); if(data.length === 0) { throw new TsdbKitError('Datasource data is empty'); } const query: ElasticsearchQuery = data[1]; query.size = 0; let timeField: string | null = null; let aggs = _.filter(query.aggs, f => _.has(f, DATE_HISTOGRAM_FIELD)); _.each(aggs, (agg: Aggregation) => { agg[DATE_HISTOGRAM_FIELD].extended_bounds = { min: from.toString(), max: to.toString() }; if(timeField !== null) { console.warn( `got more than one datasource time field, change ${timeField} to ${agg[DATE_HISTOGRAM_FIELD].field}` ); } timeField = agg[DATE_HISTOGRAM_FIELD].field; }); if(timeField === null) { throw new Error('datasource time field not found'); } let filters = query.query.bool.filter.filter(f => _.has(f, 'range')) as RangeFilter[]; if(filters.length === 0) { throw new TsdbKitError('Empty filters'); } let range = filters[0].range; range[timeField].gte = from.toString(); range[timeField].lte = to.toString(); data = data .filter(d => d !== '') .map(d => JSON.stringify(d)) .join('\n'); data += '\n'; return { url: this.datasource.url, method: 'POST', schema: { data }, headers: {'Content-Type': 'application/json'} } } parseResponse(res): DataTable { let columns = ['timestamp', 'target']; let values = []; if(res.data === undefined || res.data.responses.length < 1) { console.log('datasource return empty response, no data'); return { columns, values }; } let aggregations = res.data.responses[0].aggregations; let aggrgAgg: any = this.targets[0].bucketAggs.filter(a => { return !a.fake && _.has(aggregations, a.id) }); if(_.isEmpty(aggrgAgg)) { const bucketAggs = JSON.stringify(this.targets[0].bucketAggs); const aggregationKeys = JSON.stringify(_.keys(aggregations)); console.error(`can't find related aggregation id. bucketAggs:${bucketAggs} aggregationKeys:${aggregationKeys}`); throw new TsdbKitError(`can't find related aggregation id`); } else { aggrgAgg = aggrgAgg[0].id; } let responseValues = aggregations[aggrgAgg].buckets; let agg = this.targets[0].metrics.filter(m => !m.hide).map(m => m.id); if(agg.length > 1) { throw new TsdbKitError(`multiple series for metric are not supported currently: ${JSON.stringify(agg)}`); } agg = agg[0]; if(responseValues.length > 0) { values = responseValues.map(r => [r.key, _.has(r, agg) ? r[agg].value: null]); } return { columns, values } } }