From 09936e9f34a6ecbfb273c272511102c4946c5e71 Mon Sep 17 00:00:00 2001 From: Alexey Velikiy Date: Thu, 4 Nov 2021 04:10:54 +0300 Subject: [PATCH] range --- src/cli.rs | 11 ++++++---- src/datasources/influx.rs | 40 +++++++++++++++++++++++------------ src/datasources/mod.rs | 7 +++--- src/datasources/prometheus.rs | 6 +++++- src/main.rs | 4 +--- src/metric.rs | 2 +- src/types.rs | 9 ++------ src/utils.rs | 21 ++++++++---------- 8 files changed, 55 insertions(+), 45 deletions(-) diff --git a/src/cli.rs b/src/cli.rs index a065809..f05abe6 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -1,5 +1,8 @@ use clap::{App, Arg, SubCommand}; -use subbeat::{datasources::grafana::Grafana, types::{DatasourceConfig, GrafanaConfig, InfluxConfig, PrometheusConfig, QueryConfig}}; +use subbeat::{ + datasources::grafana::Grafana, + types::{DatasourceConfig, GrafanaConfig, InfluxConfig, PrometheusConfig, QueryConfig}, +}; pub struct CLI { pub query_config: QueryConfig, @@ -155,7 +158,7 @@ impl CLI { url: url.to_owned(), api_key: key.to_owned(), datasource_url: datasource_url.to_owned(), - query: query.to_owned() + query: query.to_owned(), }), from, to, @@ -174,7 +177,7 @@ impl CLI { query_config: QueryConfig { datasource_config: DatasourceConfig::Prometheus(PrometheusConfig { url: url.to_owned(), - query: query.to_owned() + query: query.to_owned(), }), from, to, @@ -197,7 +200,7 @@ impl CLI { url: url.to_owned(), org_id: org_id.to_owned(), token: token.to_owned(), - query: query.to_owned() + query: query.to_owned(), }), from, to, diff --git a/src/datasources/influx.rs b/src/datasources/influx.rs index 1f4beee..aa19ae3 100644 --- a/src/datasources/influx.rs +++ b/src/datasources/influx.rs @@ -2,13 +2,13 @@ use std::collections::HashMap; use async_trait::async_trait; -use bytes::{Buf, buf::Reader}; -use serde_derive::{Deserialize, Serialize}; -use serde_json::Value; +use bytes::{buf::Reader, Buf}; -use crate::{metric::{Metric, MetricResult}, types::{self, InfluxConfig}, utils::{self, normalize_url}}; - -use serde_qs as qs; +use crate::{ + metric::{Metric, MetricResult}, + types::{self, InfluxConfig}, + utils::{self, normalize_url}, +}; #[derive(Clone)] pub struct Influx { @@ -18,9 +18,14 @@ pub struct Influx { token: String, } - impl Influx { pub fn new(cfg: &InfluxConfig) -> Influx { + if !cfg.query.contains("$range") { + panic!( + "Bad query: missing $range variable. Example: \n from(bucket:\"main-backet\") |> $range |> filter(fn:(r) => r._measurement == \"cpu\")" + ); + } + Influx { url: cfg.url.to_owned(), org_id: cfg.org_id.to_owned(), @@ -46,7 +51,6 @@ pub fn parse_result(reader: Reader) -> types::Result { #[async_trait] impl Metric for Influx { async fn query_chunk(&self, from: u64, to: u64, step: u64) -> types::Result { - let url = format!( "{}/api/v2/query?orgID={}", normalize_url(self.url.to_owned()), @@ -55,19 +59,29 @@ impl Metric for Influx { let mut headers = HashMap::new(); headers.insert("Accept".to_string(), "application/csv".to_owned()); - headers.insert("Authorization".to_string(), format!("Token {}", self.token).to_owned()); - headers.insert("Content-type".to_string(), "application/vnd.flux".to_owned()); - let (_status_code, value) = utils::post_with_headers(&url, headers).await?; + headers.insert( + "Authorization".to_string(), + format!("Token {}", self.token).to_owned(), + ); + headers.insert( + "Content-type".to_string(), + "application/vnd.flux".to_owned(), + ); + + let range_str = format!("range(start:{},stop:{})", from, to); + let query = self.query.replace("$range", range_str.as_str()); + let body = hyper::Body::from(query); + + let (_status_code, value) = utils::post_with_headers(&url, &headers, body).await?; return parse_result(value); } } - // curl -XPOST "localhost:8086/api/v2/query?orgID=5abe4759f7360f1c" -sS \ // -H 'Accept:application/csv' \ // -H 'Authorization: Token sCAB2MVo8TJxhUH8UDJZIeCPwf-cykBtO0jhr207qCQSZ9d43JXObCYK_uAml2BL26JBYFauz95yIeC51kxQog==' \ // -H 'Content-type:application/vnd.flux' \ // -d 'from(bucket:"main-backet") -// |> range(start:-5m) +// |> range(start:-5m) // |> filter(fn:(r) => r._measurement == "cpu")' diff --git a/src/datasources/mod.rs b/src/datasources/mod.rs index 011f277..faac95b 100644 --- a/src/datasources/mod.rs +++ b/src/datasources/mod.rs @@ -1,14 +1,13 @@ pub mod grafana; -pub mod prometheus; pub mod influx; +pub mod prometheus; -use crate::{metric::Metric, types::{DatasourceConfig}}; - +use crate::{metric::Metric, types::DatasourceConfig}; pub fn resolve(config: &DatasourceConfig) -> Box { match config { DatasourceConfig::Grafana(cfg) => Box::new(grafana::Grafana::new(cfg)), DatasourceConfig::Prometheus(cfg) => Box::new(prometheus::Prometheus::new(cfg)), - DatasourceConfig::Influx(cfg) => Box::new(influx::Influx::new(cfg)) + DatasourceConfig::Influx(cfg) => Box::new(influx::Influx::new(cfg)), } } diff --git a/src/datasources/prometheus.rs b/src/datasources/prometheus.rs index 832d1fa..5bc85b4 100644 --- a/src/datasources/prometheus.rs +++ b/src/datasources/prometheus.rs @@ -3,7 +3,11 @@ use async_trait::async_trait; use serde_derive::{Deserialize, Serialize}; use serde_json::Value; -use crate::{metric::{Metric, MetricResult}, types::{self, PrometheusConfig}, utils::{self, normalize_url}}; +use crate::{ + metric::{Metric, MetricResult}, + types::{self, PrometheusConfig}, + utils::{self, normalize_url}, +}; use serde_qs as qs; diff --git a/src/main.rs b/src/main.rs index d200065..e27c070 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,4 @@ -use subbeat::{ - datasources::resolve, -}; +use subbeat::datasources::resolve; mod cli; mod types; diff --git a/src/metric.rs b/src/metric.rs index 024e2df..7b9e5bf 100644 --- a/src/metric.rs +++ b/src/metric.rs @@ -1,5 +1,5 @@ use async_trait::async_trait; -use std::{collections::HashMap}; +use std::collections::HashMap; use crate::types; diff --git a/src/types.rs b/src/types.rs index dd4d5b7..6960e51 100644 --- a/src/types.rs +++ b/src/types.rs @@ -4,7 +4,6 @@ use anyhow; pub type Result = anyhow::Result; - pub struct PrometheusConfig { pub url: String, pub query: String, @@ -17,7 +16,6 @@ pub struct InfluxConfig { pub query: String, } - pub struct GrafanaConfig { pub url: String, pub api_key: String, @@ -25,18 +23,15 @@ pub struct GrafanaConfig { pub query: String, } - pub enum DatasourceConfig { Grafana(GrafanaConfig), Prometheus(PrometheusConfig), - Influx(InfluxConfig) + Influx(InfluxConfig), } pub struct QueryConfig { pub datasource_config: DatasourceConfig, pub from: u64, pub to: u64, - pub step: u64 + pub step: u64, } - - diff --git a/src/utils.rs b/src/utils.rs index 6b59bed..bbe6622 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -1,5 +1,5 @@ use bytes::{buf::Reader, Buf}; -use hyper::{Body, Client, Method, Request, StatusCode}; +use hyper::{body, Body, Client, Method, Request, StatusCode}; use std::{collections::HashMap, io::Read}; use crate::types; @@ -55,22 +55,19 @@ pub async fn get(url: &String) -> types::Result<(StatusCode, serde_json::Value)> Ok((status, result)) } -pub async fn post_with_headers(url: &String, headers: HashMap) -> types::Result<(StatusCode, Reader)> { - - let mut builder = Request::builder() - .method(Method::POST) - .uri(url); - //.header("Accept", "application/json"); +pub async fn post_with_headers( + url: &String, + headers: &HashMap, + body: hyper::Body, +) -> types::Result<(StatusCode, Reader)> { + let mut builder = Request::builder().method(Method::POST).uri(url); + //.header("Accept", "application/json"); for (k, v) in headers { builder = builder.header(k, v); } - - let req_result = builder.body(Body::from("from(bucket:\"main-backet\") - |> filter(fn:(r) => r._measurement == \"cpu\") - |> range(start:-1m) - ")); + let req_result = builder.body(body); if req_result.is_err() { println!("{:?}", req_result);