Browse Source

prometheus first implementation

main
Alexey Velikiy 3 years ago
parent
commit
30a7b46391
  1. 54
      src/datasources/prometheus.rs
  2. 62
      src/utils.rs

54
src/datasources/prometheus.rs

@ -2,10 +2,12 @@ use async_trait::async_trait;
use hyper::{Body, Client, Method, Request, StatusCode}; use hyper::{Body, Client, Method, Request, StatusCode};
use serde_derive::{Deserialize, Serialize}; use serde_derive::{Deserialize, Serialize};
use serde_json::Value;
use crate::{ use crate::{
metric::{Metric, MetricResult}, metric::{Metric, MetricResult},
types, utils, types,
utils::{self, normalize_url},
}; };
use serde_qs as qs; use serde_qs as qs;
@ -35,6 +37,40 @@ impl Prometheus {
} }
} }
// code duplication...
fn parse_result(value: Value) -> types::Result<MetricResult> {
let metric = &value["data"]["result"][0]["metric"];
let metric_name = metric
.as_object()
.unwrap()
.iter()
.map(|(k, v)| format!("{}=\"{}\"", k, v.as_str().unwrap()))
.collect::<Vec<String>>()
.join(",");
let metric_name = format!("{{{}}}", metric_name);
let values = &value["data"]["result"][0]["values"]
.as_array()
.unwrap()
.iter()
.map(|e| {
let r = e.as_array().unwrap();
return (
r[0].as_u64().unwrap(),
r[1].as_str().unwrap().to_string().parse::<f64>().unwrap(),
);
})
.collect::<Vec<(u64, f64)>>();
let mut result: MetricResult = Default::default();
result.data.insert(metric_name, values.to_owned());
// println!("{:?}", result);
return Ok(result);
}
#[async_trait] #[async_trait]
impl Metric for Prometheus { impl Metric for Prometheus {
async fn query_chunk(&self, from: u64, to: u64, step: u64) -> types::Result<MetricResult> { async fn query_chunk(&self, from: u64, to: u64, step: u64) -> types::Result<MetricResult> {
@ -45,15 +81,13 @@ impl Metric for Prometheus {
step, step,
}; };
let qs = qs::to_string(&q)?; let qs = qs::to_string(&q)?;
let url = format!(
"{}/api/v1/query_range?{}",
normalize_url(self.url.to_owned()),
qs
);
let (_status_code, value) = utils::get(&url).await?;
let url = format!("{}/api/v1/query_range?{}", self.url, qs); return parse_result(value);
let v = utils::get(&url).await?;
println!("Prom value:");
println!("{:?}", &v);
// TODO: query
// TODO: parse
return Ok(Default::default());
} }
} }

62
src/utils.rs

@ -1,40 +1,56 @@
use bytes::Buf as _; use bytes::{buf::Reader, Buf};
use hyper::{Body, Client, Method, Request, StatusCode}; use hyper::{Body, Client, Method, Request, StatusCode};
use std::io::Read; use std::io::Read;
use crate::types; use crate::types;
// TODO: move to utils fn print_buf(mut reader: Reader<impl Buf>) {
pub async fn get(url: &String) -> types::Result<(StatusCode, serde_json::Value)> {
let req = Request::builder()
.method(Method::GET)
.uri(url)
.header("Accept", "application/json")
.body(Body::empty())
.unwrap();
let client = Client::new();
let res = client.request(req).await?;
let status = res.status();
let body = hyper::body::aggregate(res).await?;
let mut reader = body.reader();
{
let mut dst = [0; 1024]; let mut dst = [0; 1024];
let num = reader.read(&mut dst).unwrap();
let mut vec = Vec::<u8>::new(); let mut vec = Vec::<u8>::new();
loop {
let num = reader.read(&mut dst).unwrap();
if num == 0 {
break;
}
for i in 0..num { for i in 0..num {
vec.push(dst[i]); vec.push(dst[i]);
} }
}
let str = String::from_utf8(vec).unwrap(); let str = String::from_utf8(vec).unwrap();
println!("{}", str); println!("{}", str);
} }
panic!("asdas"); pub fn normalize_url(url: String) -> String {
if url.ends_with("/") {
let res = url.strip_suffix("/").unwrap();
return res.to_string();
}
return url;
}
pub async fn get(url: &String) -> types::Result<(StatusCode, serde_json::Value)> {
let req_result = Request::builder()
.method(Method::GET)
.uri(url)
.header("Accept", "application/json")
.body(Body::empty());
if req_result.is_err() {
println!("{:?}", req_result);
panic!("Can`t connect");
}
let req = req_result.unwrap();
let client = Client::new();
let res = client.request(req).await?;
let status = res.status();
// String::from_utf8(reader.bytes()); let body = hyper::body::aggregate(res).await?;
let reader = body.reader();
// let result: serde_json::Value = serde_json::from_reader(reader)?; let result: serde_json::Value = serde_json::from_reader(reader)?;
// Ok((status, result)) Ok((status, result))
} }

Loading…
Cancel
Save