Browse Source

basic influx suport

pull/25/head
Alexey Velikiy 3 years ago
parent
commit
b982d7b817
  1. 86
      server/Cargo.lock
  2. 4
      server/Cargo.toml
  3. 11
      server/config.example.toml
  4. 53
      server/src/config.rs
  5. 4
      server/src/main.rs
  6. 18
      server/src/services/metric_service.rs

86
server/Cargo.lock generated

@ -92,6 +92,18 @@ dependencies = [
"generic-array",
]
[[package]]
name = "bstr"
version = "0.2.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba3569f383e8f1598449f1a423e72e99569137b47740b1da11ef19af3d5c3223"
dependencies = [
"lazy_static",
"memchr",
"regex-automata",
"serde 1.0.130",
]
[[package]]
name = "buf_redux"
version = "0.8.4"
@ -126,6 +138,19 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "chrono"
version = "0.4.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "670ad68c9088c2a963aaa298cb369688cf3f9465ce5e2d4ca10e6e0098a1ce73"
dependencies = [
"libc",
"num-integer",
"num-traits 0.2.14",
"time",
"winapi",
]
[[package]]
name = "clap"
version = "2.33.3"
@ -182,6 +207,28 @@ dependencies = [
"libc",
]
[[package]]
name = "csv"
version = "1.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "22813a6dc45b335f9bade10bf7271dc477e81113e89eb251a0bc2a8a81c536e1"
dependencies = [
"bstr",
"csv-core",
"itoa",
"ryu",
"serde 1.0.130",
]
[[package]]
name = "csv-core"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2b2466559f260f48ad25fe6317b3c8dac77b5bdb5763ac7d9d6103530663bc90"
dependencies = [
"memchr",
]
[[package]]
name = "digest"
version = "0.9.0"
@ -366,7 +413,7 @@ checksum = "7fcd999463524c52659517fe2cea98493cfe485d10565e7b0fb07dbba7ad2753"
dependencies = [
"cfg-if",
"libc",
"wasi 0.10.2+wasi-snapshot-preview1",
"wasi 0.10.0+wasi-snapshot-preview1",
]
[[package]]
@ -740,6 +787,16 @@ dependencies = [
"winapi",
]
[[package]]
name = "num-integer"
version = "0.1.44"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2cc698a63b549a70bc047073d2949cce27cd1c7b0a4a862d08a8031bc2801db"
dependencies = [
"autocfg",
"num-traits 0.2.14",
]
[[package]]
name = "num-traits"
version = "0.1.43"
@ -1035,6 +1092,12 @@ dependencies = [
"regex-syntax",
]
[[package]]
name = "regex-automata"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132"
[[package]]
name = "regex-syntax"
version = "0.6.25"
@ -1258,14 +1321,16 @@ checksum = "8ea5119cdb4c55b55d432abb513a0429384878c15dde60cc77b1c99de1a95a6a"
[[package]]
name = "subbeat"
version = "0.0.7"
version = "0.0.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b4f17db62e5423f5cf82139d2e45ec13ea10a3cab03e3bc50f6b5c3961a4e34d"
checksum = "c1293b7c5e75d456eb4a22a4eabe0327029f437250da8b0c70e1183f76a84132"
dependencies = [
"anyhow",
"async-trait",
"bytes",
"chrono",
"clap",
"csv",
"hyper",
"hyper-tls",
"serde 1.0.130",
@ -1329,6 +1394,17 @@ dependencies = [
"syn",
]
[[package]]
name = "time"
version = "0.1.44"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6db9e6914ab8b1ae1c260a4ae7a49b6c5611b40328a735b21862567685e73255"
dependencies = [
"libc",
"wasi 0.10.0+wasi-snapshot-preview1",
"winapi",
]
[[package]]
name = "tinyvec"
version = "1.5.0"
@ -1618,9 +1694,9 @@ checksum = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519"
[[package]]
name = "wasi"
version = "0.10.2+wasi-snapshot-preview1"
version = "0.10.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6"
checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f"
[[package]]
name = "winapi"

4
server/Cargo.toml

@ -12,8 +12,8 @@ warp = "0.3"
parking_lot = "0.11.2"
serde = { version = "1.0", features = ["derive"] }
fastrand = "1.5.0"
# subbeat = { path = "../../../subbeat/subbeat/" }
subbeat = "0.0.7"
#subbeat = { path = "../../../subbeat/subbeat/" }
subbeat = "0.0.12"
config = "0.11.0"
openssl = { version = "=0.10.33", features = ["vendored"] }
rusqlite = "0.26.1"

11
server/config.example.toml

@ -1,3 +1,12 @@
port = 8000
[prometheus]
url = "http://localhost:9090"
query = "rate(go_memstats_alloc_bytes_total[5m])"
prom_url = "http://localhost:9090"
# [influx]
# url = "http://localhost:8086"
# org_id = "5abe4759f7360f1c"
# token = "sCAB2MVo8TJxhUH8UDJZIeCPwf-cykBtO0jhr207qCQSZ9d43JXObCYK_uAml2BL26JBYFauz95yIeC51kxQog=="
# query = 'from(bucket:"main-backet") |> $range |> filter(fn:(r) => r._measurement == "cpu")'

53
server/src/config.rs

@ -1,14 +1,45 @@
use std::collections::HashMap;
use subbeat::types::{DatasourceConfig, InfluxConfig, PrometheusConfig};
pub struct Config {
pub prom_url: String,
pub query: String,
pub port: u16,
pub datasource_config: DatasourceConfig
}
fn resolve_datasource(config: &mut config::Config) -> anyhow::Result<DatasourceConfig> {
if config.get::<String>("prometheus.url").is_ok() {
return Ok(DatasourceConfig::Prometheus(PrometheusConfig {
url: config.get("prometheus.url")?,
query: config.get("prometheus.query")?,
}));
}
if config.get::<String>("influx.url").is_ok() {
return Ok(DatasourceConfig::Influx(InfluxConfig {
url: config.get("influx.url")?,
org_id: config.get("influx.org_id")?,
token: config.get("influx.token")?,
query: config.get("influx.query")?,
}));
}
return Err(anyhow::format_err!("no datasource found"));
// if config.get::<String>("prometheus.url").is_err() {
// config.set("url", "http://localhost:9090").unwrap();
// }
// if config.get::<String>("prometheus.query").is_err() {
// config
// .set("query", "rate(go_memstats_alloc_bytes_total[5m])")
// .unwrap();
// }
}
// TODO: use actual config and env variables
impl Config {
pub fn new() -> Config {
pub fn new() -> anyhow::Result<Config> {
let mut config = config::Config::default();
if std::path::Path::new("config.toml").exists() {
@ -21,19 +52,11 @@ impl Config {
if config.get::<u16>("port").is_err() {
config.set("port", "8000").unwrap();
}
if config.get::<String>("prom_url").is_err() {
config.set("prom_url", "http://localhost:9090").unwrap();
}
if config.get::<String>("query").is_err() {
config
.set("query", "rate(go_memstats_alloc_bytes_total[5m])")
.unwrap();
}
Config {
Ok(Config {
port: config.get::<u16>("port").unwrap(),
prom_url: config.get("prom_url").unwrap(),
query: config.get("query").unwrap(),
}
datasource_config: resolve_datasource(&mut config)?
})
}
}

4
server/src/main.rs

@ -6,9 +6,9 @@ use anyhow;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let config = hastic::config::Config::new();
let config = hastic::config::Config::new()?;
let metric_service = metric_service::MetricService::new(&config.prom_url, &config.query);
let metric_service = metric_service::MetricService::new(&config.datasource_config);
let segments_service = segments_service::SegmentsService::new()?;
let mut analytic_service =

18
server/src/services/metric_service.rs

@ -1,25 +1,33 @@
use subbeat::{datasources::prometheus::Prometheus, metric::{Metric, MetricResult}};
// TODO: use resolve function as in subbeat itself
#[derive(Clone)]
pub struct MetricService {
// url: String,
// query: String,
prom: Prometheus
datasource: Box<dyn Metric + Sync + Send>
}
impl Clone for MetricService {
fn clone(&self) -> Self {
return MetricService {
datasource: self.datasource.boxed_clone()
}
}
}
impl MetricService {
pub fn new(url: &str, query: &str) -> MetricService {
pub fn new(ds_config: &subbeat::types::DatasourceConfig) -> MetricService {
MetricService {
// url: url.to_string(),
// query: query.to_string(),
prom: Prometheus::new(&url.to_string(), &query.to_string())
datasource: subbeat::datasources::resolve(ds_config)
}
}
pub async fn query(&self, from: u64, to: u64, step: u64) -> anyhow::Result<MetricResult> {
return self.prom.query(from, to, step).await;
return self.datasource.query(from, to, step).await;
}
}

Loading…
Cancel
Save