From 30a7b46391af418c1e502183ffaca0b20f07a681 Mon Sep 17 00:00:00 2001 From: Alexey Velikiy Date: Mon, 25 Oct 2021 18:09:48 +0300 Subject: [PATCH] prometheus first implementation --- src/datasources/prometheus.rs | 54 ++++++++++++++++++++++++------ src/utils.rs | 62 ++++++++++++++++++++++------------- 2 files changed, 83 insertions(+), 33 deletions(-) diff --git a/src/datasources/prometheus.rs b/src/datasources/prometheus.rs index aa36433..3ada29a 100644 --- a/src/datasources/prometheus.rs +++ b/src/datasources/prometheus.rs @@ -2,10 +2,12 @@ use async_trait::async_trait; use hyper::{Body, Client, Method, Request, StatusCode}; use serde_derive::{Deserialize, Serialize}; +use serde_json::Value; use crate::{ metric::{Metric, MetricResult}, - types, utils, + types, + utils::{self, normalize_url}, }; use serde_qs as qs; @@ -35,6 +37,40 @@ impl Prometheus { } } +// code duplication... +fn parse_result(value: Value) -> types::Result { + let metric = &value["data"]["result"][0]["metric"]; + let metric_name = metric + .as_object() + .unwrap() + .iter() + .map(|(k, v)| format!("{}=\"{}\"", k, v.as_str().unwrap())) + .collect::>() + .join(","); + + let metric_name = format!("{{{}}}", metric_name); + + let values = &value["data"]["result"][0]["values"] + .as_array() + .unwrap() + .iter() + .map(|e| { + let r = e.as_array().unwrap(); + return ( + r[0].as_u64().unwrap(), + r[1].as_str().unwrap().to_string().parse::().unwrap(), + ); + }) + .collect::>(); + + let mut result: MetricResult = Default::default(); + result.data.insert(metric_name, values.to_owned()); + + // println!("{:?}", result); + + return Ok(result); +} + #[async_trait] impl Metric for Prometheus { async fn query_chunk(&self, from: u64, to: u64, step: u64) -> types::Result { @@ -45,15 +81,13 @@ impl Metric for Prometheus { step, }; let qs = qs::to_string(&q)?; + let url = format!( + "{}/api/v1/query_range?{}", + normalize_url(self.url.to_owned()), + qs + ); + let (_status_code, value) = utils::get(&url).await?; - let url = format!("{}/api/v1/query_range?{}", self.url, qs); - - let v = utils::get(&url).await?; - - println!("Prom value:"); - println!("{:?}", &v); - // TODO: query - // TODO: parse - return Ok(Default::default()); + return parse_result(value); } } diff --git a/src/utils.rs b/src/utils.rs index e03eed0..1a35631 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -1,40 +1,56 @@ -use bytes::Buf as _; +use bytes::{buf::Reader, Buf}; use hyper::{Body, Client, Method, Request, StatusCode}; use std::io::Read; use crate::types; -// TODO: move to utils +fn print_buf(mut reader: Reader) { + let mut dst = [0; 1024]; + let mut vec = Vec::::new(); + + loop { + let num = reader.read(&mut dst).unwrap(); + if num == 0 { + break; + } + for i in 0..num { + vec.push(dst[i]); + } + } + + let str = String::from_utf8(vec).unwrap(); + println!("{}", str); +} + +pub fn normalize_url(url: String) -> String { + if url.ends_with("/") { + let res = url.strip_suffix("/").unwrap(); + return res.to_string(); + } + return url; +} + pub async fn get(url: &String) -> types::Result<(StatusCode, serde_json::Value)> { - let req = Request::builder() + let req_result = Request::builder() .method(Method::GET) .uri(url) .header("Accept", "application/json") - .body(Body::empty()) - .unwrap(); + .body(Body::empty()); + + if req_result.is_err() { + println!("{:?}", req_result); + panic!("Can`t connect"); + } + + let req = req_result.unwrap(); let client = Client::new(); let res = client.request(req).await?; let status = res.status(); let body = hyper::body::aggregate(res).await?; - let mut reader = body.reader(); - - { - let mut dst = [0; 1024]; - let num = reader.read(&mut dst).unwrap(); - let mut vec = Vec::::new(); - for i in 0..num { - vec.push(dst[i]); - } - let str = String::from_utf8(vec).unwrap(); - println!("{}", str); - } - - panic!("asdas"); - - // String::from_utf8(reader.bytes()); + let reader = body.reader(); - // let result: serde_json::Value = serde_json::from_reader(reader)?; - // Ok((status, result)) + let result: serde_json::Value = serde_json::from_reader(reader)?; + Ok((status, result)) }