diff --git a/src/cli.rs b/src/cli.rs index 3af01a1..6cef687 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -1,11 +1,12 @@ use clap::{App, Arg, SubCommand}; -use subbeat::{datasources::grafana::Grafana, types::{DatasourceConfig, GrafanaConfig, PrometheusConfig, QueryConfig}}; +use subbeat::{datasources::grafana::Grafana, types::{DatasourceConfig, GrafanaConfig, InfluxConfig, PrometheusConfig, QueryConfig}}; pub struct CLI { pub query_config: QueryConfig, } impl CLI { + // TODO: convert to result pub fn new() -> CLI { let matches = App::new("subbeat") .version("0.0.5") @@ -63,7 +64,7 @@ impl CLI { .about("Use prometheus API as datasource") .arg( Arg::with_name("PROM_URL") - .help("URL to your Grafana instance") + .help("URL to your Prometheus instance") .required(true) .index(1), ) @@ -92,6 +93,52 @@ impl CLI { .index(5), ), ) + .subcommand( + SubCommand::with_name("influx") + .about("Use influxdb API as datasource") + .arg( + Arg::with_name("INFLUX_URL") + .help("URL to your influxdb instance") + .required(true) + .index(1), + ) + .arg( + Arg::with_name("ORG_ID") + .help("URL to your influxdb instance") + .required(true) + .index(1), + ) + .arg( + Arg::with_name("TOKEN") + .help("URL to your influxdb instance") + .required(true) + .index(1), + ) + .arg( + Arg::with_name("query") + .help("your flux query to datasource") + .required(true) + .index(2), + ) + .arg( + Arg::with_name("from") + .help("timestamp") + .required(true) + .index(3), + ) + .arg( + Arg::with_name("to") + .help("timestampt") + .required(true) + .index(4), + ) + .arg( + Arg::with_name("step") + .help("aggregation step") + .required(true) + .index(5), + ), + ) .get_matches(); if let Some(matches) = matches.subcommand_matches("grafana") { @@ -136,6 +183,29 @@ impl CLI { }; }; + if let Some(matches) = matches.subcommand_matches("influx") { + let url = matches.value_of("PROM_URL").unwrap(); + let org_id = matches.value_of("ORG_ID").unwrap(); + let token = matches.value_of("TOKEN").unwrap(); + let query = matches.value_of("query").unwrap(); + let from = matches.value_of("from").unwrap().parse().unwrap(); + let to = matches.value_of("to").unwrap().parse().unwrap(); + let step = matches.value_of("step").unwrap().parse().unwrap(); + return CLI { + query_config: QueryConfig { + datasource_config: DatasourceConfig::Influx(InfluxConfig { + url: url.to_owned(), + org_id: org_id.to_owned(), + token: token.to_owned(), + query: query.to_owned() + }), + from, + to, + step, + }, + }; + }; + panic!("Unknown datasource"); } } diff --git a/src/datasources/influx.rs b/src/datasources/influx.rs new file mode 100644 index 0000000..09bd7aa --- /dev/null +++ b/src/datasources/influx.rs @@ -0,0 +1,60 @@ +use std::collections::HashMap; + +use async_trait::async_trait; + +use serde_derive::{Deserialize, Serialize}; +use serde_json::Value; + +use crate::{metric::{Metric, MetricResult}, types::{self, InfluxConfig}, utils::{self, normalize_url}}; + +use serde_qs as qs; + +#[derive(Clone)] +pub struct Influx { + url: String, + org_id: String, + query: String, + token: String, +} + + +impl Influx { + pub fn new(cfg: &InfluxConfig) -> Influx { + Influx { + url: cfg.url.to_owned(), + org_id: cfg.org_id.to_owned(), + token: cfg.token.to_owned(), + query: cfg.query.to_owned(), + } + } +} + +pub fn parse_result(value: Value) -> types::Result { + Err(anyhow::format_err!("not implemented")) +} + +#[async_trait] +impl Metric for Influx { + async fn query_chunk(&self, from: u64, to: u64, step: u64) -> types::Result { + + let url = format!( + "{}/api/v2/query?orgId={}", + normalize_url(self.url.to_owned()), + self.org_id + ); + let mut headers = HashMap::new(); + headers.insert("Authorization".to_string(), format!("Token {}", self.token).to_owned()); + let (_status_code, value) = utils::post_with_headers(&url, headers).await?; + + return parse_result(value); + } +} + + +// curl -XPOST "localhost:8086/api/v2/query?orgID=5abe4759f7360f1c" -sS \ +// -H 'Accept:application/csv' \ +// -H 'Authorization: Token sCAB2MVo8TJxhUH8UDJZIeCPwf-cykBtO0jhr207qCQSZ9d43JXObCYK_uAml2BL26JBYFauz95yIeC51kxQog==' \ +// -H 'Content-type:application/vnd.flux' \ +// -d 'from(bucket:"main-backet") +// |> range(start:-5m) +// |> filter(fn:(r) => r._measurement == "cpu")' diff --git a/src/datasources.rs b/src/datasources/mod.rs similarity index 74% rename from src/datasources.rs rename to src/datasources/mod.rs index e3953b4..011f277 100644 --- a/src/datasources.rs +++ b/src/datasources/mod.rs @@ -1,11 +1,14 @@ -use crate::{metric::Metric, types::{DatasourceConfig}}; - pub mod grafana; pub mod prometheus; +pub mod influx; + +use crate::{metric::Metric, types::{DatasourceConfig}}; + pub fn resolve(config: &DatasourceConfig) -> Box { match config { DatasourceConfig::Grafana(cfg) => Box::new(grafana::Grafana::new(cfg)), - DatasourceConfig::Prometheus(cfg) => Box::new(prometheus::Prometheus::new(cfg)) + DatasourceConfig::Prometheus(cfg) => Box::new(prometheus::Prometheus::new(cfg)), + DatasourceConfig::Influx(cfg) => Box::new(influx::Influx::new(cfg)) } } diff --git a/src/metric.rs b/src/metric.rs index ced8553..024e2df 100644 --- a/src/metric.rs +++ b/src/metric.rs @@ -1,5 +1,5 @@ use async_trait::async_trait; -use std::{collections::HashMap, result}; +use std::{collections::HashMap}; use crate::types; diff --git a/src/types.rs b/src/types.rs index fbc81c1..dd4d5b7 100644 --- a/src/types.rs +++ b/src/types.rs @@ -10,6 +10,13 @@ pub struct PrometheusConfig { pub query: String, } +pub struct InfluxConfig { + pub url: String, + pub org_id: String, + pub token: String, + pub query: String, +} + pub struct GrafanaConfig { pub url: String, @@ -21,7 +28,8 @@ pub struct GrafanaConfig { pub enum DatasourceConfig { Grafana(GrafanaConfig), - Prometheus(PrometheusConfig) + Prometheus(PrometheusConfig), + Influx(InfluxConfig) } pub struct QueryConfig { diff --git a/src/utils.rs b/src/utils.rs index 1a35631..b718d69 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -1,6 +1,6 @@ use bytes::{buf::Reader, Buf}; use hyper::{Body, Client, Method, Request, StatusCode}; -use std::io::Read; +use std::{collections::HashMap, io::Read}; use crate::types; @@ -54,3 +54,39 @@ pub async fn get(url: &String) -> types::Result<(StatusCode, serde_json::Value)> let result: serde_json::Value = serde_json::from_reader(reader)?; Ok((status, result)) } + +pub async fn post_with_headers(url: &String, headers: HashMap) -> types::Result<(StatusCode, serde_json::Value)> { + + let mut builder = Request::builder() + .method(Method::POST) + .uri(url); + //.header("Accept", "application/json"); + + for (k, v) in headers { + builder = builder.header(k, v); + } + + + let req_result = builder.body(Body::from("from(bucket:\"main-backet\") + // |> range(start:-1m) + // |> filter(fn:(r) => r._measurement == \"cpu\")")); + + if req_result.is_err() { + println!("{:?}", req_result); + panic!("Can`t connect"); + } + + let req = req_result.unwrap(); + + let client = Client::new(); + let res = client.request(req).await?; + let status = res.status(); + + let body = hyper::body::aggregate(res).await?; + let reader = body.reader(); + + print_buf(reader); + // let result: serde_json::Value = serde_json::from_reader(reader)?; + Err(anyhow::format_err!("yo yo")) + // Ok((status, result)) +}