diff --git a/Cargo.lock b/Cargo.lock index c4caba0..1f46eda 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -51,6 +51,18 @@ version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" +[[package]] +name = "bstr" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba3569f383e8f1598449f1a423e72e99569137b47740b1da11ef19af3d5c3223" +dependencies = [ + "lazy_static", + "memchr", + "regex-automata", + "serde", +] + [[package]] name = "bytes" version = "1.1.0" @@ -100,6 +112,28 @@ version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5827cebf4670468b8772dd191856768aedcb1b0278a04f989f7766351917b9dc" +[[package]] +name = "csv" +version = "1.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22813a6dc45b335f9bade10bf7271dc477e81113e89eb251a0bc2a8a81c536e1" +dependencies = [ + "bstr", + "csv-core", + "itoa", + "ryu", + "serde", +] + +[[package]] +name = "csv-core" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b2466559f260f48ad25fe6317b3c8dac77b5bdb5763ac7d9d6103530663bc90" +dependencies = [ + "memchr", +] + [[package]] name = "fnv" version = "1.0.7" @@ -558,6 +592,12 @@ dependencies = [ "bitflags", ] +[[package]] +name = "regex-automata" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" + [[package]] name = "remove_dir_all" version = "0.5.3" @@ -696,6 +736,7 @@ dependencies = [ "async-trait", "bytes", "clap", + "csv", "hyper", "hyper-tls", "serde", diff --git a/Cargo.toml b/Cargo.toml index 9b5c063..a090ffa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,3 +22,4 @@ serde_derive = "1.0" serde_qs = "0.8.5" async-trait = "0.1.51" anyhow = "1.0.13" +csv = "1.1" diff --git a/src/datasources/influx.rs b/src/datasources/influx.rs index 694802b..1f4beee 100644 --- a/src/datasources/influx.rs +++ b/src/datasources/influx.rs @@ -2,6 +2,7 @@ use std::collections::HashMap; use async_trait::async_trait; +use bytes::{Buf, buf::Reader}; use serde_derive::{Deserialize, Serialize}; use serde_json::Value; @@ -29,8 +30,17 @@ impl Influx { } } -pub fn parse_result(value: Value) -> types::Result { - Err(anyhow::format_err!("not implemented")) +pub fn parse_result(reader: Reader) -> types::Result { + let mut rdr = csv::Reader::from_reader(reader); + for result in rdr.records() { + let record = result?; + println!("{:?}", record); + } + + let mut result: MetricResult = Default::default(); + // result.data.insert(metric_name, values.to_owned()); + + Ok(result) } #[async_trait] @@ -43,7 +53,6 @@ impl Metric for Influx { self.org_id ); - println!("URL: {}", url); let mut headers = HashMap::new(); headers.insert("Accept".to_string(), "application/csv".to_owned()); headers.insert("Authorization".to_string(), format!("Token {}", self.token).to_owned()); diff --git a/src/utils.rs b/src/utils.rs index 17e9db0..6b59bed 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -55,7 +55,7 @@ pub async fn get(url: &String) -> types::Result<(StatusCode, serde_json::Value)> Ok((status, result)) } -pub async fn post_with_headers(url: &String, headers: HashMap) -> types::Result<(StatusCode, serde_json::Value)> { +pub async fn post_with_headers(url: &String, headers: HashMap) -> types::Result<(StatusCode, Reader)> { let mut builder = Request::builder() .method(Method::POST) @@ -68,8 +68,9 @@ pub async fn post_with_headers(url: &String, headers: HashMap) - let req_result = builder.body(Body::from("from(bucket:\"main-backet\") + |> filter(fn:(r) => r._measurement == \"cpu\") |> range(start:-1m) - |> filter(fn:(r) => r._measurement == \"cpu\")")); + ")); if req_result.is_err() { println!("{:?}", req_result); @@ -85,8 +86,5 @@ pub async fn post_with_headers(url: &String, headers: HashMap) - let body = hyper::body::aggregate(res).await?; let reader = body.reader(); - print_buf(reader); - // let result: serde_json::Value = serde_json::from_reader(reader)?; - Err(anyhow::format_err!("yo yo")) - // Ok((status, result)) + Ok((status, reader)) }