Browse Source

query by chunks

main
Alexey Velikiy 3 years ago
parent
commit
09c790f196
  1. 7
      Cargo.lock
  2. 2
      Cargo.toml
  3. 2
      README.md
  4. 11479
      metric.txt
  5. 130
      src/cli.rs
  6. 28
      src/grafana_service/prometheus.rs
  7. 16
      src/main.rs
  8. 64
      src/metric.rs
  9. 5
      src/types.rs

7
Cargo.lock generated

@ -11,6 +11,12 @@ dependencies = [
"winapi",
]
[[package]]
name = "anyhow"
version = "1.0.44"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "61604a8f862e1d5c3229fdd78f8b02c68dcf73a4c4b05fd636d12240aaa242c1"
[[package]]
name = "async-trait"
version = "0.1.51"
@ -686,6 +692,7 @@ checksum = "8ea5119cdb4c55b55d432abb513a0429384878c15dde60cc77b1c99de1a95a6a"
name = "subbeat"
version = "0.0.2"
dependencies = [
"anyhow",
"async-trait",
"bytes",
"clap",

2
Cargo.toml

@ -21,5 +21,5 @@ serde_json = "1.0.68"
serde_derive = "1.0"
serde_qs = "0.8.5"
async-trait = "0.1.51"
anyhow = "1.0.13"

2
README.md

@ -4,5 +4,5 @@ subbeat
## Example
```
subbeat http://localhost:3000 eyJrIjoiWnRRMTNmcGpvTHNPb3UzNzdUNUphRm53Rk9tMTNzOTQiLCJuIjoic3ViYmVhdC10ZXN0IiwiaWQiOjF9 "/api/datasources/proxy/1/api/v1/query_range" "rate(go_memstats_alloc_bytes_total[5m])" 1634672070 1634672970
subbeat grafana http://localhost:3000 eyJrIjoiWnRRMTNmcGpvTHNPb3UzNzdUNUphRm53Rk9tMTNzOTQiLCJuIjoic3ViYmVhdC10ZXN0IiwiaWQiOjF9 "/api/datasources/proxy/1/api/v1/query_range" "rate(go_memstats_alloc_bytes_total[5m])" 1634672070 1634672970 15
```

11479
metric.txt

File diff suppressed because it is too large Load Diff

130
src/cli.rs

@ -6,68 +6,92 @@ pub struct CLI {
pub datasource_url: String,
pub query: String,
pub from: u64,
pub to: u64
pub to: u64,
pub step: u64,
}
impl CLI {
pub fn new() -> CLI {
let matches = App::new("subbeat")
.version("0.0.2")
.about("Timeseries toolkit")
.arg(
Arg::with_name("GRAFANA_URL")
.help("URL to your Grafana instance")
.required(true)
.index(1),
)
.arg(
Arg::with_name("GRAFANA_API_KEY")
.help("Grafna API Key. Go to http://<grafana-url>/org/apikeys to get one")
.required(true)
.index(2),
)
.arg(
Arg::with_name("datasource_url")
.help("relative path to datasource")
.required(true)
.index(3),
)
.arg(
Arg::with_name("query")
.help("your query to datasource")
.required(true)
.index(4),
)
.arg(
Arg::with_name("from")
.help("timestamp")
.required(true)
.index(5),
)
.arg(
Arg::with_name("to")
.help("timestampt")
.required(true)
.index(6),
.subcommand(
SubCommand::with_name("grafana")
.about("Use Grafana as datasource")
.arg(
Arg::with_name("GRAFANA_URL")
.help("URL to your Grafana instance")
.required(true)
.index(1),
)
.arg(
Arg::with_name("GRAFANA_API_KEY")
.help(
"Grafna API Key. Go to http://<grafana-url>/org/apikeys to get one",
)
.required(true)
.index(2),
)
.arg(
Arg::with_name("datasource_url")
.help("relative path to datasource")
.required(true)
.index(3),
)
.arg(
Arg::with_name("query")
.help("your query to datasource")
.required(true)
.index(4),
)
.arg(
Arg::with_name("from")
.help("timestamp")
.required(true)
.index(5),
)
.arg(
Arg::with_name("to")
.help("timestampt")
.required(true)
.index(6),
)
.arg(
Arg::with_name("step")
.help("aggregation step")
.required(true)
.index(7),
),
)
.get_matches();
let url = matches.value_of("GRAFANA_URL").unwrap();
let key = matches.value_of("GRAFANA_API_KEY").unwrap();
let datasource_url = matches.value_of("datasource_url").unwrap();
let query = matches.value_of("query").unwrap();
let from = matches.value_of("from").unwrap().parse().unwrap();
let to = matches.value_of("to").unwrap().parse().unwrap();
CLI{
url: url.to_owned(),
key: key.to_owned(),
datasource_url: datasource_url.to_owned(),
query: query.to_owned(),
from,
to
if let Some(matches) = matches.subcommand_matches("grafana") {
let url = matches.value_of("GRAFANA_URL").unwrap();
let key = matches.value_of("GRAFANA_API_KEY").unwrap();
let datasource_url = matches.value_of("datasource_url").unwrap();
let query = matches.value_of("query").unwrap();
let from = matches.value_of("from").unwrap().parse().unwrap();
let to = matches.value_of("to").unwrap().parse().unwrap();
let step = matches.value_of("step").unwrap().parse().unwrap();
return CLI {
url: url.to_owned(),
key: key.to_owned(),
datasource_url: datasource_url.to_owned(),
query: query.to_owned(),
from,
to,
step,
};
} else {
return CLI {
url: "url.to_owned()".to_string(),
key: "key.to_owned()".to_string(),
datasource_url: "datasource_url.to_owned()".to_string(),
query: "query.to_owned()".to_string(),
from: 0,
to: 0,
step: 0,
};
}
}
}
}

28
src/grafana_service/prometheus.rs

@ -1,4 +1,5 @@
use async_trait::async_trait;
use hyper::StatusCode;
use serde_json::Value;
use crate::{
@ -61,8 +62,8 @@ fn parse_result(value: Value) -> types::Result<MetricResult> {
})
.collect::<Vec<(u64, f64)>>();
let mut result = MetricResult::new();
result.insert(metric_name, values.to_owned());
let mut result: MetricResult = Default::default();
result.data.insert(metric_name, values.to_owned());
// println!("{:?}", result);
@ -71,7 +72,11 @@ fn parse_result(value: Value) -> types::Result<MetricResult> {
#[async_trait]
impl Metric for Prometheus<'_> {
async fn query(&self, from: u64, to: u64, step: u64) -> types::Result<MetricResult> {
async fn query_chunk(&self, from: u64, to: u64, step: u64) -> types::Result<MetricResult> {
if from >= to {
panic!("from >= to");
}
let q = Query {
query: self.query.to_owned(),
step: step,
@ -79,14 +84,19 @@ impl Metric for Prometheus<'_> {
end: to,
};
// TODO: use serialisatoin from serde
let rq = qs::to_string(&q)?;
let (status_code, value) = self.grafana_service.post_form(&self.url, &rq).await?;
// TODO: return error
// if status_code != StatusCode::OK {
// return std::error::("Bad status code");
// }
if status_code != StatusCode::OK {
// println!("Error: status code {:?}", status_code);
let error = &value["error"].as_str().unwrap();
return Err(anyhow::anyhow!("Can`t query: {}", error));
}
// println!("{:?}", value);
// return Ok(Default::default());
// println!("{:?}", value);

16
src/main.rs

@ -1,12 +1,10 @@
use subbeat::grafana_service;
mod types;
mod cli;
mod types;
#[tokio::main]
async fn main() -> types::Result<()> {
let cli = cli::CLI::new();
let gs = grafana_service::GrafanaService::new(cli.url.to_string(), cli.key.to_string());
@ -15,19 +13,13 @@ async fn main() -> types::Result<()> {
// gs.get_datasources().await?;
// "http://localhost:3000/d/YeBxHjzWz/starter-app-stats?editPanel=2&orgId=1"
let r = gs
.extract_metrics(
&cli.datasource_url,
&cli.query,
cli.from,
cli.to,
15,
)
.extract_metrics(&cli.datasource_url, &cli.query, cli.from, cli.to, cli.step)
.await?;
let key = r.keys().nth(0).unwrap();
let key = r.data.keys().nth(0).unwrap();
println!("{}", key);
let vs = &r[key];
let vs = &r.data[key];
for (t, v) in vs.iter() {
println!("{}\t{}", t, v);
}

64
src/metric.rs

@ -1,10 +1,12 @@
use async_trait::async_trait;
use std::collections::HashMap;
use std::{collections::HashMap, result};
use crate::types;
pub type MetricId = String;
const CHUNK_SIZE: u64 = 10_000;
struct DatasourceParams {
db: String,
q: String,
@ -26,9 +28,65 @@ struct MetricQuery {
headers: Option<HashMap<String, String>>,
}
pub type MetricResult = HashMap<String, Vec<(u64, f64)>>;
pub struct MetricResult {
pub data: HashMap<String, Vec<(u64, f64)>>,
}
impl MetricResult {
pub fn merge_with(&mut self, mr: &MetricResult) {
for (k, v) in mr.data.iter() {
// TODO: think how to make it faster
if self.data.contains_key(k) {
self.data.get_mut(k).unwrap().extend(v.iter())
} else {
// panic!("not contain key");
self.data.insert(k.to_owned(), v.to_owned());
}
}
}
}
impl Default for MetricResult {
fn default() -> MetricResult {
return MetricResult {
data: HashMap::<String, Vec<(u64, f64)>>::new(),
};
}
}
#[async_trait]
pub trait Metric {
async fn query(&self, from: u64, to: u64, step: u64) -> types::Result<MetricResult>;
// (to - from) / step < 10000
async fn query_chunk(&self, from: u64, to: u64, step: u64) -> types::Result<MetricResult>;
// TODO: mov eit to grafana-spicific logic
async fn query(&self, from: u64, to: u64, step: u64) -> types::Result<MetricResult> {
if from >= to {
panic!("from >= to");
}
if step == 0 {
panic!("step equals 0");
}
let chunk_timespan_size = CHUNK_SIZE * step;
let mut chunks = Vec::<(u64, u64)>::new();
{
let mut f = from;
while f < to {
let next = to.min(f + chunk_timespan_size);
chunks.push((f, next));
f = next;
}
}
let mut result: MetricResult = Default::default();
for (f, t) in chunks.iter() {
let r = self.query_chunk(*f, *t, step).await?;
result.merge_with(&r);
}
Ok(result)
}
}

5
src/types.rs

@ -1,2 +1,5 @@
use anyhow;
// A simple type alias so as to DRY.
pub type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
pub type Result<T> = anyhow::Result<T>;

Loading…
Cancel
Save