From aba6034441ea5bb062981f60fec5fbbe938870c4 Mon Sep 17 00:00:00 2001 From: Alexey Velikiy Date: Fri, 5 Nov 2021 01:14:07 +0300 Subject: [PATCH] webhook continue --- server/Cargo.lock | 5 +-- server/Cargo.toml | 3 +- server/config.example.toml | 3 ++ server/src/api/mod.rs | 2 +- server/src/config.rs | 17 ++++++++++ server/src/main.rs | 10 ++++-- .../analytic_service/analytic_service.rs | 31 ++++++++++++++++--- server/src/services/analytic_service/types.rs | 4 +-- 8 files changed, 62 insertions(+), 13 deletions(-) diff --git a/server/Cargo.lock b/server/Cargo.lock index f1c1c59..f354c76 100644 --- a/server/Cargo.lock +++ b/server/Cargo.lock @@ -458,6 +458,7 @@ name = "hastic" version = "0.0.1" dependencies = [ "anyhow", + "chrono", "config", "fastrand", "futures", @@ -1321,9 +1322,9 @@ checksum = "8ea5119cdb4c55b55d432abb513a0429384878c15dde60cc77b1c99de1a95a6a" [[package]] name = "subbeat" -version = "0.0.13" +version = "0.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e935262d66772027700000fb1d8c81bb4bc7db8ef884880ce658686406e5416b" +checksum = "6c9f8b9c73d37420622f933fabdf8a783a98fd24804b7def4c2fc140820105d8" dependencies = [ "anyhow", "async-trait", diff --git a/server/Cargo.toml b/server/Cargo.toml index c7ad0ad..afdbef5 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -13,10 +13,11 @@ parking_lot = "0.11.2" serde = { version = "1.0", features = ["derive"] } fastrand = "1.5.0" # subbeat = { path = "../../../subbeat/subbeat/" } -subbeat = "0.0.13" +subbeat = "0.0.14" config = "0.11.0" openssl = { version = "=0.10.33", features = ["vendored"] } rusqlite = "0.26.1" # https://github.com/rusqlite/rusqlite/issues/914 libsqlite3-sys = { version = "*", features = ["bundled"] } futures = "0.3.17" +chrono = "0.4.19" diff --git a/server/config.example.toml b/server/config.example.toml index 834c667..d442724 100644 --- a/server/config.example.toml +++ b/server/config.example.toml @@ -16,3 +16,6 @@ query = "rate(go_memstats_alloc_bytes_total[5m])" # |> yield(name: "mean") # """ +[webhook] +endpoint = "http://localhost:8086" + diff --git a/server/src/api/mod.rs b/server/src/api/mod.rs index 42866bc..7ed1909 100644 --- a/server/src/api/mod.rs +++ b/server/src/api/mod.rs @@ -96,7 +96,7 @@ impl API<'_> { .or(not_found); warp::serve(routes) - .run(([127, 0, 0, 1], self.config.port)) + .run(([0, 0, 0, 0], self.config.port)) .await; } } diff --git a/server/src/config.rs b/server/src/config.rs index 6c78b46..da97c5b 100644 --- a/server/src/config.rs +++ b/server/src/config.rs @@ -3,6 +3,17 @@ use subbeat::types::{DatasourceConfig, InfluxConfig, PrometheusConfig}; pub struct Config { pub port: u16, pub datasource_config: DatasourceConfig, + pub endpoint: Option, +} + +impl Clone for Config { + fn clone(&self) -> Self { + return Config { + port: self.port, + datasource_config: self.datasource_config.clone(), + endpoint: self.endpoint.clone(), + }; + } } fn resolve_datasource(config: &mut config::Config) -> anyhow::Result { @@ -41,9 +52,15 @@ impl Config { config.set("port", "8000").unwrap(); } + let mut endpoint = None; + if config.get::("webhook.endpoint").is_ok() { + endpoint = Some(config.get("webhook.endpoint").unwrap()); + } + Ok(Config { port: config.get::("port").unwrap(), datasource_config: resolve_datasource(&mut config)?, + endpoint, }) } } diff --git a/server/src/main.rs b/server/src/main.rs index a9464af..b710c88 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -7,15 +7,19 @@ use anyhow; #[tokio::main] async fn main() -> anyhow::Result<()> { let config = hastic::config::Config::new()?; + let cfg_clone = config.clone(); let metric_service = metric_service::MetricService::new(&config.datasource_config); let segments_service = segments_service::SegmentsService::new()?; - let mut analytic_service = - analytic_service::AnalyticService::new(metric_service.clone(), segments_service.clone()); + let mut analytic_service = analytic_service::AnalyticService::new( + metric_service.clone(), + segments_service.clone(), + config.endpoint, + ); let api = api::API::new( - &config, + &cfg_clone, metric_service.clone(), segments_service.clone(), analytic_service.get_client(), diff --git a/server/src/services/analytic_service/analytic_service.rs b/server/src/services/analytic_service/analytic_service.rs index e141d05..179d795 100644 --- a/server/src/services/analytic_service/analytic_service.rs +++ b/server/src/services/analytic_service/analytic_service.rs @@ -17,6 +17,8 @@ use tokio::sync::{mpsc, oneshot}; use futures::future; +use chrono::Utc; + // TODO: get this from pattern detector const DETECTION_STEP: u64 = 10; @@ -29,17 +31,23 @@ pub struct AnalyticService { tx: mpsc::Sender, rx: mpsc::Receiver, + endpoint: Option, + // handlers learning_handler: Option>, // awaiters learning_waiters: Vec, + + // runner + runner_handler: Option>, } impl AnalyticService { pub fn new( metric_service: MetricService, segments_service: segments_service::SegmentsService, + endpoint: Option, ) -> AnalyticService { let (tx, rx) = mpsc::channel::(32); @@ -52,11 +60,15 @@ impl AnalyticService { tx, rx, + endpoint, + // handlers learning_handler: None, // awaiters learning_waiters: Vec::new(), + + runner_handler: None, } } @@ -76,15 +88,18 @@ impl AnalyticService { }); } - fn run_detection_runner(&self, task: DetectionRunnerConfig) { + fn run_detection_runner(&mut self, task: DetectionRunnerConfig) { + if self.runner_handler.is_some() { + self.runner_handler.as_mut().unwrap().abort(); + } // TODO: save handler of the task - tokio::spawn({ + self.runner_handler = Some(tokio::spawn({ let lr = self.learning_results.as_ref().unwrap().clone(); let ms = self.metric_service.clone(); async move { - + // TODO: implement } - }); + })); } fn consume_request(&mut self, req: types::RequestType) -> () { @@ -144,6 +159,14 @@ impl AnalyticService { let task = self.learning_waiters.pop().unwrap(); self.run_detection_task(task); } + + // TODO: fix this + if self.endpoint.is_some() { + self.run_detection_runner(DetectionRunnerConfig { + endpoint: self.endpoint.as_ref().unwrap().clone(), + from: Utc::now().timestamp() as u64, + }); + } } ResponseType::LearningFinishedEmpty => { // TODO: drop all learning_waiters with empty results diff --git a/server/src/services/analytic_service/types.rs b/server/src/services/analytic_service/types.rs index 50e3586..fdcbb7e 100644 --- a/server/src/services/analytic_service/types.rs +++ b/server/src/services/analytic_service/types.rs @@ -32,9 +32,9 @@ pub struct DetectionTask { #[derive(Debug)] pub struct DetectionRunnerConfig { - pub sender: mpsc::Sender>>, + // pub sender: mpsc::Sender>>, pub endpoint: String, - pub from: u64 + pub from: u64, } #[derive(Debug)]