From 422305bf4418d8230323466d1a74300dfc52db3f Mon Sep 17 00:00:00 2001 From: Alexey Velikiy Date: Thu, 4 Nov 2021 21:02:27 +0300 Subject: [PATCH] influx empty results case --- src/datasources/influx.rs | 57 ++++++++++++++++++++++++++++++++------- src/metric.rs | 1 - src/utils.rs | 4 +-- 3 files changed, 49 insertions(+), 13 deletions(-) diff --git a/src/datasources/influx.rs b/src/datasources/influx.rs index edb0149..3c8f8bd 100644 --- a/src/datasources/influx.rs +++ b/src/datasources/influx.rs @@ -22,7 +22,7 @@ impl Influx { pub fn new(cfg: &InfluxConfig) -> Influx { if !cfg.query.contains("$range") { panic!( - "Bad query: missing $range variable. Example: \n from(bucket:\"main-backet\") |> $range |> filter(fn:(r) => r._measurement == \"cpu\")" + "Bad query: missing $range variable. Example: \n 'from(bucket: \"main-backet\") |> $range |> filter(fn: (r) => r[\"_measurement\"] == \"cpu\") |> filter(fn: (r) => r[\"_field\"] == \"usage_softirq\") |> filter(fn: (r) => r[\"cpu\"] == \"cpu-total\") |> filter(fn: (r) => r[\"host\"] == \"roid\") |> yield(name: \"mean\")')" ); } @@ -36,20 +36,51 @@ impl Influx { } pub fn parse_result(reader: Reader) -> types::Result { + + // println!("---------------"); + // utils::print_buf(reader); + // println!("xxxxxxxxxxxxxxx"); + // return Ok(Default::default()); + let mut rdr = csv::Reader::from_reader(reader); - // for h in rdr.headers() { - // println!("{:?}", h); - // } + + let hdrs = rdr.headers(); + if hdrs.is_err() { + return Err(anyhow::format_err!( + "Cant' extract metric: headers are empty" + )); + } + + let hdrs = hdrs.unwrap(); + if hdrs.len() == 0 { + return Ok(Default::default()); + } + + // TODO: get it from actual response + let measurement_name_position = 8usize; + + if hdrs.get(measurement_name_position).is_none() { + // println!("HEADERS:"); + // for h in hdrs { + // println!("{}", h); + // } + return Err(anyhow::format_err!( + "Cant' extract metric: no measurement at position {}", measurement_name_position + )); + } // println!("len: {:?}", rdr.headers().unwrap().get(9)); - let measurement = rdr.headers().unwrap().get(9).unwrap().to_owned(); + let measurement = hdrs.get(measurement_name_position).unwrap().to_owned(); + // println!("_measurement {}", measurement); let mut ts = Vec::new(); for result in rdr.records() { let record = result?; - let t = chrono::DateTime:: parse_from_rfc3339(record.get(5).unwrap()).unwrap().timestamp() as u64; + let t = chrono::DateTime::parse_from_rfc3339(record.get(5).unwrap()) + .unwrap() + .timestamp() as u64; let v = record.get(6).unwrap().parse::().unwrap(); - ts.push((t,v)); + ts.push((t, v)); // println!("{:?} > {:?}", t, v); // println!("{:?}", record); } @@ -64,12 +95,16 @@ pub fn parse_result(reader: Reader) -> types::Result { #[async_trait] impl Metric for Influx { async fn query_chunk(&self, from: u64, to: u64, step: u64) -> types::Result { + // TODO: use step + let url = format!( "{}/api/v2/query?orgID={}", normalize_url(self.url.to_owned()), self.org_id ); + // println!("{}", url); + let mut headers = HashMap::new(); headers.insert("Accept".to_string(), "application/csv".to_owned()); headers.insert( @@ -83,15 +118,17 @@ impl Metric for Influx { let range_str = format!("range(start:{},stop:{})", from, to); let query = self.query.replace("$range", range_str.as_str()); + + // println!("query: {}", query); let body = hyper::Body::from(query); - let (_status_code, value) = utils::post_with_headers(&url, &headers, body).await?; + let (_status_code, reader) = utils::post_with_headers(&url, &headers, body).await?; - return parse_result(value); + parse_result(reader) } fn boxed_clone(&self) -> Box { - return Box::new(self.clone()); + Box::new(self.clone()) } } diff --git a/src/metric.rs b/src/metric.rs index f6c93fc..76546d2 100644 --- a/src/metric.rs +++ b/src/metric.rs @@ -58,7 +58,6 @@ impl Default for MetricResult { #[async_trait] pub trait Metric { - fn boxed_clone(&self) -> Box; // (to - from) / step < 10000 diff --git a/src/utils.rs b/src/utils.rs index bbe6622..f3ed790 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -4,7 +4,7 @@ use std::{collections::HashMap, io::Read}; use crate::types; -fn print_buf(mut reader: Reader) { +pub fn print_buf(mut reader: Reader) { let mut dst = [0; 1024]; let mut vec = Vec::::new(); @@ -61,7 +61,6 @@ pub async fn post_with_headers( body: hyper::Body, ) -> types::Result<(StatusCode, Reader)> { let mut builder = Request::builder().method(Method::POST).uri(url); - //.header("Accept", "application/json"); for (k, v) in headers { builder = builder.header(k, v); @@ -83,5 +82,6 @@ pub async fn post_with_headers( let body = hyper::body::aggregate(res).await?; let reader = body.reader(); + // Err(anyhow::format_err!("bad")) Ok((status, reader)) }