Browse Source

influx empty results case

main
Alexey Velikiy 3 years ago
parent
commit
422305bf44
  1. 55
      src/datasources/influx.rs
  2. 1
      src/metric.rs
  3. 4
      src/utils.rs

55
src/datasources/influx.rs

@ -22,7 +22,7 @@ impl Influx {
pub fn new(cfg: &InfluxConfig) -> Influx { pub fn new(cfg: &InfluxConfig) -> Influx {
if !cfg.query.contains("$range") { if !cfg.query.contains("$range") {
panic!( panic!(
"Bad query: missing $range variable. Example: \n from(bucket:\"main-backet\") |> $range |> filter(fn:(r) => r._measurement == \"cpu\")" "Bad query: missing $range variable. Example: \n 'from(bucket: \"main-backet\") |> $range |> filter(fn: (r) => r[\"_measurement\"] == \"cpu\") |> filter(fn: (r) => r[\"_field\"] == \"usage_softirq\") |> filter(fn: (r) => r[\"cpu\"] == \"cpu-total\") |> filter(fn: (r) => r[\"host\"] == \"roid\") |> yield(name: \"mean\")')"
); );
} }
@ -36,20 +36,51 @@ impl Influx {
} }
pub fn parse_result(reader: Reader<impl Buf>) -> types::Result<MetricResult> { pub fn parse_result(reader: Reader<impl Buf>) -> types::Result<MetricResult> {
// println!("---------------");
// utils::print_buf(reader);
// println!("xxxxxxxxxxxxxxx");
// return Ok(Default::default());
let mut rdr = csv::Reader::from_reader(reader); let mut rdr = csv::Reader::from_reader(reader);
// for h in rdr.headers() {
// println!("{:?}", h); let hdrs = rdr.headers();
if hdrs.is_err() {
return Err(anyhow::format_err!(
"Cant' extract metric: headers are empty"
));
}
let hdrs = hdrs.unwrap();
if hdrs.len() == 0 {
return Ok(Default::default());
}
// TODO: get it from actual response
let measurement_name_position = 8usize;
if hdrs.get(measurement_name_position).is_none() {
// println!("HEADERS:");
// for h in hdrs {
// println!("{}", h);
// } // }
return Err(anyhow::format_err!(
"Cant' extract metric: no measurement at position {}", measurement_name_position
));
}
// println!("len: {:?}", rdr.headers().unwrap().get(9)); // println!("len: {:?}", rdr.headers().unwrap().get(9));
let measurement = rdr.headers().unwrap().get(9).unwrap().to_owned(); let measurement = hdrs.get(measurement_name_position).unwrap().to_owned();
// println!("_measurement {}", measurement); // println!("_measurement {}", measurement);
let mut ts = Vec::new(); let mut ts = Vec::new();
for result in rdr.records() { for result in rdr.records() {
let record = result?; let record = result?;
let t = chrono::DateTime:: parse_from_rfc3339(record.get(5).unwrap()).unwrap().timestamp() as u64; let t = chrono::DateTime::parse_from_rfc3339(record.get(5).unwrap())
.unwrap()
.timestamp() as u64;
let v = record.get(6).unwrap().parse::<f64>().unwrap(); let v = record.get(6).unwrap().parse::<f64>().unwrap();
ts.push((t,v)); ts.push((t, v));
// println!("{:?} > {:?}", t, v); // println!("{:?} > {:?}", t, v);
// println!("{:?}", record); // println!("{:?}", record);
} }
@ -64,12 +95,16 @@ pub fn parse_result(reader: Reader<impl Buf>) -> types::Result<MetricResult> {
#[async_trait] #[async_trait]
impl Metric for Influx { impl Metric for Influx {
async fn query_chunk(&self, from: u64, to: u64, step: u64) -> types::Result<MetricResult> { async fn query_chunk(&self, from: u64, to: u64, step: u64) -> types::Result<MetricResult> {
// TODO: use step
let url = format!( let url = format!(
"{}/api/v2/query?orgID={}", "{}/api/v2/query?orgID={}",
normalize_url(self.url.to_owned()), normalize_url(self.url.to_owned()),
self.org_id self.org_id
); );
// println!("{}", url);
let mut headers = HashMap::new(); let mut headers = HashMap::new();
headers.insert("Accept".to_string(), "application/csv".to_owned()); headers.insert("Accept".to_string(), "application/csv".to_owned());
headers.insert( headers.insert(
@ -83,15 +118,17 @@ impl Metric for Influx {
let range_str = format!("range(start:{},stop:{})", from, to); let range_str = format!("range(start:{},stop:{})", from, to);
let query = self.query.replace("$range", range_str.as_str()); let query = self.query.replace("$range", range_str.as_str());
// println!("query: {}", query);
let body = hyper::Body::from(query); let body = hyper::Body::from(query);
let (_status_code, value) = utils::post_with_headers(&url, &headers, body).await?; let (_status_code, reader) = utils::post_with_headers(&url, &headers, body).await?;
return parse_result(value); parse_result(reader)
} }
fn boxed_clone(&self) -> Box<dyn Metric + Sync + Send> { fn boxed_clone(&self) -> Box<dyn Metric + Sync + Send> {
return Box::new(self.clone()); Box::new(self.clone())
} }
} }

1
src/metric.rs

@ -58,7 +58,6 @@ impl Default for MetricResult {
#[async_trait] #[async_trait]
pub trait Metric { pub trait Metric {
fn boxed_clone(&self) -> Box<dyn Metric + Sync + Send>; fn boxed_clone(&self) -> Box<dyn Metric + Sync + Send>;
// (to - from) / step < 10000 // (to - from) / step < 10000

4
src/utils.rs

@ -4,7 +4,7 @@ use std::{collections::HashMap, io::Read};
use crate::types; use crate::types;
fn print_buf(mut reader: Reader<impl Buf>) { pub fn print_buf(mut reader: Reader<impl Buf>) {
let mut dst = [0; 1024]; let mut dst = [0; 1024];
let mut vec = Vec::<u8>::new(); let mut vec = Vec::<u8>::new();
@ -61,7 +61,6 @@ pub async fn post_with_headers(
body: hyper::Body, body: hyper::Body,
) -> types::Result<(StatusCode, Reader<impl Buf>)> { ) -> types::Result<(StatusCode, Reader<impl Buf>)> {
let mut builder = Request::builder().method(Method::POST).uri(url); let mut builder = Request::builder().method(Method::POST).uri(url);
//.header("Accept", "application/json");
for (k, v) in headers { for (k, v) in headers {
builder = builder.header(k, v); builder = builder.header(k, v);
@ -83,5 +82,6 @@ pub async fn post_with_headers(
let body = hyper::body::aggregate(res).await?; let body = hyper::body::aggregate(res).await?;
let reader = body.reader(); let reader = body.reader();
// Err(anyhow::format_err!("bad"))
Ok((status, reader)) Ok((status, reader))
} }

Loading…
Cancel
Save