Browse Source

influx++

main
Alexey Velikiy 3 years ago
parent
commit
152772601d
  1. 41
      Cargo.lock
  2. 1
      Cargo.toml
  3. 15
      src/datasources/influx.rs
  4. 10
      src/utils.rs

41
Cargo.lock generated

@ -51,6 +51,18 @@ version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]]
name = "bstr"
version = "0.2.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba3569f383e8f1598449f1a423e72e99569137b47740b1da11ef19af3d5c3223"
dependencies = [
"lazy_static",
"memchr",
"regex-automata",
"serde",
]
[[package]] [[package]]
name = "bytes" name = "bytes"
version = "1.1.0" version = "1.1.0"
@ -100,6 +112,28 @@ version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5827cebf4670468b8772dd191856768aedcb1b0278a04f989f7766351917b9dc" checksum = "5827cebf4670468b8772dd191856768aedcb1b0278a04f989f7766351917b9dc"
[[package]]
name = "csv"
version = "1.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "22813a6dc45b335f9bade10bf7271dc477e81113e89eb251a0bc2a8a81c536e1"
dependencies = [
"bstr",
"csv-core",
"itoa",
"ryu",
"serde",
]
[[package]]
name = "csv-core"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2b2466559f260f48ad25fe6317b3c8dac77b5bdb5763ac7d9d6103530663bc90"
dependencies = [
"memchr",
]
[[package]] [[package]]
name = "fnv" name = "fnv"
version = "1.0.7" version = "1.0.7"
@ -558,6 +592,12 @@ dependencies = [
"bitflags", "bitflags",
] ]
[[package]]
name = "regex-automata"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132"
[[package]] [[package]]
name = "remove_dir_all" name = "remove_dir_all"
version = "0.5.3" version = "0.5.3"
@ -696,6 +736,7 @@ dependencies = [
"async-trait", "async-trait",
"bytes", "bytes",
"clap", "clap",
"csv",
"hyper", "hyper",
"hyper-tls", "hyper-tls",
"serde", "serde",

1
Cargo.toml

@ -22,3 +22,4 @@ serde_derive = "1.0"
serde_qs = "0.8.5" serde_qs = "0.8.5"
async-trait = "0.1.51" async-trait = "0.1.51"
anyhow = "1.0.13" anyhow = "1.0.13"
csv = "1.1"

15
src/datasources/influx.rs

@ -2,6 +2,7 @@ use std::collections::HashMap;
use async_trait::async_trait; use async_trait::async_trait;
use bytes::{Buf, buf::Reader};
use serde_derive::{Deserialize, Serialize}; use serde_derive::{Deserialize, Serialize};
use serde_json::Value; use serde_json::Value;
@ -29,8 +30,17 @@ impl Influx {
} }
} }
pub fn parse_result(value: Value) -> types::Result<MetricResult> { pub fn parse_result(reader: Reader<impl Buf>) -> types::Result<MetricResult> {
Err(anyhow::format_err!("not implemented")) let mut rdr = csv::Reader::from_reader(reader);
for result in rdr.records() {
let record = result?;
println!("{:?}", record);
}
let mut result: MetricResult = Default::default();
// result.data.insert(metric_name, values.to_owned());
Ok(result)
} }
#[async_trait] #[async_trait]
@ -43,7 +53,6 @@ impl Metric for Influx {
self.org_id self.org_id
); );
println!("URL: {}", url);
let mut headers = HashMap::new(); let mut headers = HashMap::new();
headers.insert("Accept".to_string(), "application/csv".to_owned()); headers.insert("Accept".to_string(), "application/csv".to_owned());
headers.insert("Authorization".to_string(), format!("Token {}", self.token).to_owned()); headers.insert("Authorization".to_string(), format!("Token {}", self.token).to_owned());

10
src/utils.rs

@ -55,7 +55,7 @@ pub async fn get(url: &String) -> types::Result<(StatusCode, serde_json::Value)>
Ok((status, result)) Ok((status, result))
} }
pub async fn post_with_headers(url: &String, headers: HashMap<String, String>) -> types::Result<(StatusCode, serde_json::Value)> { pub async fn post_with_headers(url: &String, headers: HashMap<String, String>) -> types::Result<(StatusCode, Reader<impl Buf>)> {
let mut builder = Request::builder() let mut builder = Request::builder()
.method(Method::POST) .method(Method::POST)
@ -68,8 +68,9 @@ pub async fn post_with_headers(url: &String, headers: HashMap<String, String>) -
let req_result = builder.body(Body::from("from(bucket:\"main-backet\") let req_result = builder.body(Body::from("from(bucket:\"main-backet\")
|> filter(fn:(r) => r._measurement == \"cpu\")
|> range(start:-1m) |> range(start:-1m)
|> filter(fn:(r) => r._measurement == \"cpu\")")); "));
if req_result.is_err() { if req_result.is_err() {
println!("{:?}", req_result); println!("{:?}", req_result);
@ -85,8 +86,5 @@ pub async fn post_with_headers(url: &String, headers: HashMap<String, String>) -
let body = hyper::body::aggregate(res).await?; let body = hyper::body::aggregate(res).await?;
let reader = body.reader(); let reader = body.reader();
print_buf(reader); Ok((status, reader))
// let result: serde_json::Value = serde_json::from_reader(reader)?;
Err(anyhow::format_err!("yo yo"))
// Ok((status, result))
} }

Loading…
Cancel
Save