Browse Source

influx begin

main
Alexey Velikiy 3 years ago
parent
commit
097d113317
  1. 74
      src/cli.rs
  2. 60
      src/datasources/influx.rs
  3. 9
      src/datasources/mod.rs
  4. 2
      src/metric.rs
  5. 10
      src/types.rs
  6. 38
      src/utils.rs

74
src/cli.rs

@ -1,11 +1,12 @@
use clap::{App, Arg, SubCommand};
use subbeat::{datasources::grafana::Grafana, types::{DatasourceConfig, GrafanaConfig, PrometheusConfig, QueryConfig}};
use subbeat::{datasources::grafana::Grafana, types::{DatasourceConfig, GrafanaConfig, InfluxConfig, PrometheusConfig, QueryConfig}};
pub struct CLI {
pub query_config: QueryConfig,
}
impl CLI {
// TODO: convert to result
pub fn new() -> CLI {
let matches = App::new("subbeat")
.version("0.0.5")
@ -63,7 +64,7 @@ impl CLI {
.about("Use prometheus API as datasource")
.arg(
Arg::with_name("PROM_URL")
.help("URL to your Grafana instance")
.help("URL to your Prometheus instance")
.required(true)
.index(1),
)
@ -92,6 +93,52 @@ impl CLI {
.index(5),
),
)
.subcommand(
SubCommand::with_name("influx")
.about("Use influxdb API as datasource")
.arg(
Arg::with_name("INFLUX_URL")
.help("URL to your influxdb instance")
.required(true)
.index(1),
)
.arg(
Arg::with_name("ORG_ID")
.help("URL to your influxdb instance")
.required(true)
.index(1),
)
.arg(
Arg::with_name("TOKEN")
.help("URL to your influxdb instance")
.required(true)
.index(1),
)
.arg(
Arg::with_name("query")
.help("your flux query to datasource")
.required(true)
.index(2),
)
.arg(
Arg::with_name("from")
.help("timestamp")
.required(true)
.index(3),
)
.arg(
Arg::with_name("to")
.help("timestampt")
.required(true)
.index(4),
)
.arg(
Arg::with_name("step")
.help("aggregation step")
.required(true)
.index(5),
),
)
.get_matches();
if let Some(matches) = matches.subcommand_matches("grafana") {
@ -136,6 +183,29 @@ impl CLI {
};
};
if let Some(matches) = matches.subcommand_matches("influx") {
let url = matches.value_of("PROM_URL").unwrap();
let org_id = matches.value_of("ORG_ID").unwrap();
let token = matches.value_of("TOKEN").unwrap();
let query = matches.value_of("query").unwrap();
let from = matches.value_of("from").unwrap().parse().unwrap();
let to = matches.value_of("to").unwrap().parse().unwrap();
let step = matches.value_of("step").unwrap().parse().unwrap();
return CLI {
query_config: QueryConfig {
datasource_config: DatasourceConfig::Influx(InfluxConfig {
url: url.to_owned(),
org_id: org_id.to_owned(),
token: token.to_owned(),
query: query.to_owned()
}),
from,
to,
step,
},
};
};
panic!("Unknown datasource");
}
}

60
src/datasources/influx.rs

@ -0,0 +1,60 @@
use std::collections::HashMap;
use async_trait::async_trait;
use serde_derive::{Deserialize, Serialize};
use serde_json::Value;
use crate::{metric::{Metric, MetricResult}, types::{self, InfluxConfig}, utils::{self, normalize_url}};
use serde_qs as qs;
#[derive(Clone)]
pub struct Influx {
url: String,
org_id: String,
query: String,
token: String,
}
impl Influx {
pub fn new(cfg: &InfluxConfig) -> Influx {
Influx {
url: cfg.url.to_owned(),
org_id: cfg.org_id.to_owned(),
token: cfg.token.to_owned(),
query: cfg.query.to_owned(),
}
}
}
pub fn parse_result(value: Value) -> types::Result<MetricResult> {
Err(anyhow::format_err!("not implemented"))
}
#[async_trait]
impl Metric for Influx {
async fn query_chunk(&self, from: u64, to: u64, step: u64) -> types::Result<MetricResult> {
let url = format!(
"{}/api/v2/query?orgId={}",
normalize_url(self.url.to_owned()),
self.org_id
);
let mut headers = HashMap::new();
headers.insert("Authorization".to_string(), format!("Token {}", self.token).to_owned());
let (_status_code, value) = utils::post_with_headers(&url, headers).await?;
return parse_result(value);
}
}
// curl -XPOST "localhost:8086/api/v2/query?orgID=5abe4759f7360f1c" -sS \
// -H 'Accept:application/csv' \
// -H 'Authorization: Token sCAB2MVo8TJxhUH8UDJZIeCPwf-cykBtO0jhr207qCQSZ9d43JXObCYK_uAml2BL26JBYFauz95yIeC51kxQog==' \
// -H 'Content-type:application/vnd.flux' \
// -d 'from(bucket:"main-backet")
// |> range(start:-5m)
// |> filter(fn:(r) => r._measurement == "cpu")'

9
src/datasources.rs → src/datasources/mod.rs

@ -1,11 +1,14 @@
use crate::{metric::Metric, types::{DatasourceConfig}};
pub mod grafana;
pub mod prometheus;
pub mod influx;
use crate::{metric::Metric, types::{DatasourceConfig}};
pub fn resolve(config: &DatasourceConfig) -> Box<dyn Metric + Sync> {
match config {
DatasourceConfig::Grafana(cfg) => Box::new(grafana::Grafana::new(cfg)),
DatasourceConfig::Prometheus(cfg) => Box::new(prometheus::Prometheus::new(cfg))
DatasourceConfig::Prometheus(cfg) => Box::new(prometheus::Prometheus::new(cfg)),
DatasourceConfig::Influx(cfg) => Box::new(influx::Influx::new(cfg))
}
}

2
src/metric.rs

@ -1,5 +1,5 @@
use async_trait::async_trait;
use std::{collections::HashMap, result};
use std::{collections::HashMap};
use crate::types;

10
src/types.rs

@ -10,6 +10,13 @@ pub struct PrometheusConfig {
pub query: String,
}
pub struct InfluxConfig {
pub url: String,
pub org_id: String,
pub token: String,
pub query: String,
}
pub struct GrafanaConfig {
pub url: String,
@ -21,7 +28,8 @@ pub struct GrafanaConfig {
pub enum DatasourceConfig {
Grafana(GrafanaConfig),
Prometheus(PrometheusConfig)
Prometheus(PrometheusConfig),
Influx(InfluxConfig)
}
pub struct QueryConfig {

38
src/utils.rs

@ -1,6 +1,6 @@
use bytes::{buf::Reader, Buf};
use hyper::{Body, Client, Method, Request, StatusCode};
use std::io::Read;
use std::{collections::HashMap, io::Read};
use crate::types;
@ -54,3 +54,39 @@ pub async fn get(url: &String) -> types::Result<(StatusCode, serde_json::Value)>
let result: serde_json::Value = serde_json::from_reader(reader)?;
Ok((status, result))
}
pub async fn post_with_headers(url: &String, headers: HashMap<String, String>) -> types::Result<(StatusCode, serde_json::Value)> {
let mut builder = Request::builder()
.method(Method::POST)
.uri(url);
//.header("Accept", "application/json");
for (k, v) in headers {
builder = builder.header(k, v);
}
let req_result = builder.body(Body::from("from(bucket:\"main-backet\")
// |> range(start:-1m)
// |> filter(fn:(r) => r._measurement == \"cpu\")"));
if req_result.is_err() {
println!("{:?}", req_result);
panic!("Can`t connect");
}
let req = req_result.unwrap();
let client = Client::new();
let res = client.request(req).await?;
let status = res.status();
let body = hyper::body::aggregate(res).await?;
let reader = body.reader();
print_buf(reader);
// let result: serde_json::Value = serde_json::from_reader(reader)?;
Err(anyhow::format_err!("yo yo"))
// Ok((status, result))
}

Loading…
Cancel
Save