Alexey Velikiy 3 years ago
parent
commit
09936e9f34
  1. 11
      src/cli.rs
  2. 38
      src/datasources/influx.rs
  3. 7
      src/datasources/mod.rs
  4. 6
      src/datasources/prometheus.rs
  5. 4
      src/main.rs
  6. 2
      src/metric.rs
  7. 9
      src/types.rs
  8. 21
      src/utils.rs

11
src/cli.rs

@ -1,5 +1,8 @@
use clap::{App, Arg, SubCommand}; use clap::{App, Arg, SubCommand};
use subbeat::{datasources::grafana::Grafana, types::{DatasourceConfig, GrafanaConfig, InfluxConfig, PrometheusConfig, QueryConfig}}; use subbeat::{
datasources::grafana::Grafana,
types::{DatasourceConfig, GrafanaConfig, InfluxConfig, PrometheusConfig, QueryConfig},
};
pub struct CLI { pub struct CLI {
pub query_config: QueryConfig, pub query_config: QueryConfig,
@ -155,7 +158,7 @@ impl CLI {
url: url.to_owned(), url: url.to_owned(),
api_key: key.to_owned(), api_key: key.to_owned(),
datasource_url: datasource_url.to_owned(), datasource_url: datasource_url.to_owned(),
query: query.to_owned() query: query.to_owned(),
}), }),
from, from,
to, to,
@ -174,7 +177,7 @@ impl CLI {
query_config: QueryConfig { query_config: QueryConfig {
datasource_config: DatasourceConfig::Prometheus(PrometheusConfig { datasource_config: DatasourceConfig::Prometheus(PrometheusConfig {
url: url.to_owned(), url: url.to_owned(),
query: query.to_owned() query: query.to_owned(),
}), }),
from, from,
to, to,
@ -197,7 +200,7 @@ impl CLI {
url: url.to_owned(), url: url.to_owned(),
org_id: org_id.to_owned(), org_id: org_id.to_owned(),
token: token.to_owned(), token: token.to_owned(),
query: query.to_owned() query: query.to_owned(),
}), }),
from, from,
to, to,

38
src/datasources/influx.rs

@ -2,13 +2,13 @@ use std::collections::HashMap;
use async_trait::async_trait; use async_trait::async_trait;
use bytes::{Buf, buf::Reader}; use bytes::{buf::Reader, Buf};
use serde_derive::{Deserialize, Serialize};
use serde_json::Value;
use crate::{metric::{Metric, MetricResult}, types::{self, InfluxConfig}, utils::{self, normalize_url}}; use crate::{
metric::{Metric, MetricResult},
use serde_qs as qs; types::{self, InfluxConfig},
utils::{self, normalize_url},
};
#[derive(Clone)] #[derive(Clone)]
pub struct Influx { pub struct Influx {
@ -18,9 +18,14 @@ pub struct Influx {
token: String, token: String,
} }
impl Influx { impl Influx {
pub fn new(cfg: &InfluxConfig) -> Influx { pub fn new(cfg: &InfluxConfig) -> Influx {
if !cfg.query.contains("$range") {
panic!(
"Bad query: missing $range variable. Example: \n from(bucket:\"main-backet\") |> $range |> filter(fn:(r) => r._measurement == \"cpu\")"
);
}
Influx { Influx {
url: cfg.url.to_owned(), url: cfg.url.to_owned(),
org_id: cfg.org_id.to_owned(), org_id: cfg.org_id.to_owned(),
@ -46,7 +51,6 @@ pub fn parse_result(reader: Reader<impl Buf>) -> types::Result<MetricResult> {
#[async_trait] #[async_trait]
impl Metric for Influx { impl Metric for Influx {
async fn query_chunk(&self, from: u64, to: u64, step: u64) -> types::Result<MetricResult> { async fn query_chunk(&self, from: u64, to: u64, step: u64) -> types::Result<MetricResult> {
let url = format!( let url = format!(
"{}/api/v2/query?orgID={}", "{}/api/v2/query?orgID={}",
normalize_url(self.url.to_owned()), normalize_url(self.url.to_owned()),
@ -55,15 +59,25 @@ impl Metric for Influx {
let mut headers = HashMap::new(); let mut headers = HashMap::new();
headers.insert("Accept".to_string(), "application/csv".to_owned()); headers.insert("Accept".to_string(), "application/csv".to_owned());
headers.insert("Authorization".to_string(), format!("Token {}", self.token).to_owned()); headers.insert(
headers.insert("Content-type".to_string(), "application/vnd.flux".to_owned()); "Authorization".to_string(),
let (_status_code, value) = utils::post_with_headers(&url, headers).await?; format!("Token {}", self.token).to_owned(),
);
headers.insert(
"Content-type".to_string(),
"application/vnd.flux".to_owned(),
);
let range_str = format!("range(start:{},stop:{})", from, to);
let query = self.query.replace("$range", range_str.as_str());
let body = hyper::Body::from(query);
let (_status_code, value) = utils::post_with_headers(&url, &headers, body).await?;
return parse_result(value); return parse_result(value);
} }
} }
// curl -XPOST "localhost:8086/api/v2/query?orgID=5abe4759f7360f1c" -sS \ // curl -XPOST "localhost:8086/api/v2/query?orgID=5abe4759f7360f1c" -sS \
// -H 'Accept:application/csv' \ // -H 'Accept:application/csv' \
// -H 'Authorization: Token sCAB2MVo8TJxhUH8UDJZIeCPwf-cykBtO0jhr207qCQSZ9d43JXObCYK_uAml2BL26JBYFauz95yIeC51kxQog==' \ // -H 'Authorization: Token sCAB2MVo8TJxhUH8UDJZIeCPwf-cykBtO0jhr207qCQSZ9d43JXObCYK_uAml2BL26JBYFauz95yIeC51kxQog==' \

7
src/datasources/mod.rs

@ -1,14 +1,13 @@
pub mod grafana; pub mod grafana;
pub mod prometheus;
pub mod influx; pub mod influx;
pub mod prometheus;
use crate::{metric::Metric, types::{DatasourceConfig}}; use crate::{metric::Metric, types::DatasourceConfig};
pub fn resolve(config: &DatasourceConfig) -> Box<dyn Metric + Sync> { pub fn resolve(config: &DatasourceConfig) -> Box<dyn Metric + Sync> {
match config { match config {
DatasourceConfig::Grafana(cfg) => Box::new(grafana::Grafana::new(cfg)), DatasourceConfig::Grafana(cfg) => Box::new(grafana::Grafana::new(cfg)),
DatasourceConfig::Prometheus(cfg) => Box::new(prometheus::Prometheus::new(cfg)), DatasourceConfig::Prometheus(cfg) => Box::new(prometheus::Prometheus::new(cfg)),
DatasourceConfig::Influx(cfg) => Box::new(influx::Influx::new(cfg)) DatasourceConfig::Influx(cfg) => Box::new(influx::Influx::new(cfg)),
} }
} }

6
src/datasources/prometheus.rs

@ -3,7 +3,11 @@ use async_trait::async_trait;
use serde_derive::{Deserialize, Serialize}; use serde_derive::{Deserialize, Serialize};
use serde_json::Value; use serde_json::Value;
use crate::{metric::{Metric, MetricResult}, types::{self, PrometheusConfig}, utils::{self, normalize_url}}; use crate::{
metric::{Metric, MetricResult},
types::{self, PrometheusConfig},
utils::{self, normalize_url},
};
use serde_qs as qs; use serde_qs as qs;

4
src/main.rs

@ -1,6 +1,4 @@
use subbeat::{ use subbeat::datasources::resolve;
datasources::resolve,
};
mod cli; mod cli;
mod types; mod types;

2
src/metric.rs

@ -1,5 +1,5 @@
use async_trait::async_trait; use async_trait::async_trait;
use std::{collections::HashMap}; use std::collections::HashMap;
use crate::types; use crate::types;

9
src/types.rs

@ -4,7 +4,6 @@ use anyhow;
pub type Result<T> = anyhow::Result<T>; pub type Result<T> = anyhow::Result<T>;
pub struct PrometheusConfig { pub struct PrometheusConfig {
pub url: String, pub url: String,
pub query: String, pub query: String,
@ -17,7 +16,6 @@ pub struct InfluxConfig {
pub query: String, pub query: String,
} }
pub struct GrafanaConfig { pub struct GrafanaConfig {
pub url: String, pub url: String,
pub api_key: String, pub api_key: String,
@ -25,18 +23,15 @@ pub struct GrafanaConfig {
pub query: String, pub query: String,
} }
pub enum DatasourceConfig { pub enum DatasourceConfig {
Grafana(GrafanaConfig), Grafana(GrafanaConfig),
Prometheus(PrometheusConfig), Prometheus(PrometheusConfig),
Influx(InfluxConfig) Influx(InfluxConfig),
} }
pub struct QueryConfig { pub struct QueryConfig {
pub datasource_config: DatasourceConfig, pub datasource_config: DatasourceConfig,
pub from: u64, pub from: u64,
pub to: u64, pub to: u64,
pub step: u64 pub step: u64,
} }

21
src/utils.rs

@ -1,5 +1,5 @@
use bytes::{buf::Reader, Buf}; use bytes::{buf::Reader, Buf};
use hyper::{Body, Client, Method, Request, StatusCode}; use hyper::{body, Body, Client, Method, Request, StatusCode};
use std::{collections::HashMap, io::Read}; use std::{collections::HashMap, io::Read};
use crate::types; use crate::types;
@ -55,22 +55,19 @@ pub async fn get(url: &String) -> types::Result<(StatusCode, serde_json::Value)>
Ok((status, result)) Ok((status, result))
} }
pub async fn post_with_headers(url: &String, headers: HashMap<String, String>) -> types::Result<(StatusCode, Reader<impl Buf>)> { pub async fn post_with_headers(
url: &String,
let mut builder = Request::builder() headers: &HashMap<String, String>,
.method(Method::POST) body: hyper::Body,
.uri(url); ) -> types::Result<(StatusCode, Reader<impl Buf>)> {
//.header("Accept", "application/json"); let mut builder = Request::builder().method(Method::POST).uri(url);
//.header("Accept", "application/json");
for (k, v) in headers { for (k, v) in headers {
builder = builder.header(k, v); builder = builder.header(k, v);
} }
let req_result = builder.body(body);
let req_result = builder.body(Body::from("from(bucket:\"main-backet\")
|> filter(fn:(r) => r._measurement == \"cpu\")
|> range(start:-1m)
"));
if req_result.is_err() { if req_result.is_err() {
println!("{:?}", req_result); println!("{:?}", req_result);

Loading…
Cancel
Save