From e71d87e6bea99f46b8e7e78af067df10db71b6a4 Mon Sep 17 00:00:00 2001 From: Alexey Velikiy Date: Fri, 12 Nov 2021 06:32:11 +0300 Subject: [PATCH] begin detection runner and seasonality --- server/Cargo.lock | 4 ++- .../analytic_service/analytic_service.rs | 36 +++++++++---------- .../analytic_unit/anomaly_analytic_unit.rs | 15 +++++++- .../analytic_service/detection_runner.rs | 27 ++++++++++++++ server/src/services/analytic_service/mod.rs | 2 ++ server/src/services/analytic_service/types.rs | 2 ++ server/src/services/metric_service.rs | 12 ++++++- 7 files changed, 75 insertions(+), 23 deletions(-) create mode 100644 server/src/services/analytic_service/detection_runner.rs diff --git a/server/Cargo.lock b/server/Cargo.lock index 30c6899..42cef73 100644 --- a/server/Cargo.lock +++ b/server/Cargo.lock @@ -1538,7 +1538,9 @@ checksum = "8ea5119cdb4c55b55d432abb513a0429384878c15dde60cc77b1c99de1a95a6a" [[package]] name = "subbeat" -version = "0.0.14" +version = "0.0.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5fbf55cd694bd61f334302faf844f8eecff37fdc731a53c9c0a547793cb9c432" dependencies = [ "anyhow", "async-trait", diff --git a/server/src/services/analytic_service/analytic_service.rs b/server/src/services/analytic_service/analytic_service.rs index 8ab63cb..acd2607 100644 --- a/server/src/services/analytic_service/analytic_service.rs +++ b/server/src/services/analytic_service/analytic_service.rs @@ -21,7 +21,6 @@ use anyhow; use tokio::sync::{mpsc, oneshot, RwLock}; -use chrono::Utc; // TODO: now it's basically single analytic unit, service will operate on many AU pub struct AnalyticService { @@ -35,16 +34,14 @@ pub struct AnalyticService { tx: mpsc::Sender, rx: mpsc::Receiver, - endpoint: Option, - // handlers learning_handler: Option>, // awaiters learning_waiters: Vec, - // runner - runner_handler: Option>, + + detection_runner: Option } impl AnalyticService { @@ -67,7 +64,6 @@ impl AnalyticService { tx, rx, - endpoint, // handlers learning_handler: None, @@ -75,7 +71,7 @@ impl AnalyticService { // awaiters learning_waiters: Vec::new(), - runner_handler: None, + detection_runner: None } } @@ -102,19 +98,19 @@ impl AnalyticService { }); } - // fn run_detection_runner(&mut self, task: DetectionRunnerConfig) { - // if self.runner_handler.is_some() { - // self.runner_handler.as_mut().unwrap().abort(); - // } - // // TODO: save handler of the task - // self.runner_handler = Some(tokio::spawn({ - // let au = self.analytic_unit.unwrap(); - // let ms = self.metric_service.clone(); - // async move { - // // TODO: implement - // } - // })); - // } + fn run_detection_runner(&mut self, task: DetectionRunnerConfig) { + // if self.runner_handler.is_some() { + // self.runner_handler.as_mut().unwrap().abort(); + // } + // // TODO: save handler of the task + // self.runner_handler = Some(tokio::spawn({ + // let au = self.analytic_unit.unwrap(); + // let ms = self.metric_service.clone(); + // async move { + // // TODO: implement + // } + // })); + } // TODO: maybe make `consume_request` async fn consume_request(&mut self, req: types::RequestType) -> () { diff --git a/server/src/services/analytic_service/analytic_unit/anomaly_analytic_unit.rs b/server/src/services/analytic_service/analytic_unit/anomaly_analytic_unit.rs index bfcf210..4cbba0d 100644 --- a/server/src/services/analytic_service/analytic_unit/anomaly_analytic_unit.rs +++ b/server/src/services/analytic_service/analytic_unit/anomaly_analytic_unit.rs @@ -8,6 +8,17 @@ use subbeat::metric::MetricResult; // TODO: move to config const DETECTION_STEP: u64 = 10; +// offset from intex in timrange in seconds +fn get_value_with_offset(ts: &Vec<(u64, f64)>, index: usize, offset: u64) -> anyhow::Result { + if index == 0 { + return Err(anyhow::format_err!("index should be > 0")); + } + return Ok(0.0); + // let step = + // let index_candidate = + // let intex_candidate = +} + pub struct AnomalyAnalyticUnit { config: AnomalyConfig, } @@ -60,7 +71,7 @@ impl AnalyticUnit for AnomalyAnalyticUnit { from: u64, to: u64, ) -> anyhow::Result> { - let mr = ms.query(from - self.config.seasonality, to, DETECTION_STEP).await.unwrap(); + let mr = ms.query(from - self.config.seasonality * 5, to, DETECTION_STEP).await.unwrap(); if mr.data.keys().len() == 0 { return Ok(Vec::new()); @@ -119,4 +130,6 @@ impl AnalyticUnit for AnomalyAnalyticUnit { let mr = ms.query(from, to, DETECTION_STEP).await.unwrap(); return self.get_hsr_from_metric_result(&mr); } + + } diff --git a/server/src/services/analytic_service/detection_runner.rs b/server/src/services/analytic_service/detection_runner.rs new file mode 100644 index 0000000..4f819f9 --- /dev/null +++ b/server/src/services/analytic_service/detection_runner.rs @@ -0,0 +1,27 @@ +use crate::services::analytic_service::analytic_unit::types::{AnalyticUnit}; + +use std::sync::Arc; + +use crate::config::Config; + +use chrono::Utc; + +use tokio::sync::{mpsc, RwLock}; + + + + +struct DetectionRunner { + config: Config, + analytic_unit: Arc>>, +} + +impl DetectionRunner { + pub fn new(config: Config, analytic_unit: Arc>>) -> DetectionRunner { + DetectionRunner { config, analytic_unit } + } + + pub async fn run() { + + } +} diff --git a/server/src/services/analytic_service/mod.rs b/server/src/services/analytic_service/mod.rs index 6f83efc..d59dc4c 100644 --- a/server/src/services/analytic_service/mod.rs +++ b/server/src/services/analytic_service/mod.rs @@ -4,4 +4,6 @@ pub mod types; pub mod analytic_client; +mod detection_runner; + pub use analytic_service::AnalyticService; diff --git a/server/src/services/analytic_service/types.rs b/server/src/services/analytic_service/types.rs index ea4c58f..8a00d03 100644 --- a/server/src/services/analytic_service/types.rs +++ b/server/src/services/analytic_service/types.rs @@ -84,6 +84,8 @@ pub enum LearningWaiter { HSR(HSRTask), } + +// TODO: review if it's needed #[derive(Debug)] pub struct DetectionRunnerConfig { // pub sender: mpsc::Sender>>, diff --git a/server/src/services/metric_service.rs b/server/src/services/metric_service.rs index a3a76f4..3f6c076 100644 --- a/server/src/services/metric_service.rs +++ b/server/src/services/metric_service.rs @@ -19,6 +19,16 @@ impl MetricService { } } pub async fn query(&self, from: u64, to: u64, step: u64) -> anyhow::Result { - return self.datasource.query(from, to, step).await; + let mut mr = self.datasource.query(from, to, step).await?; + // let keys: Vec<_> = mr.data.keys().into_iter().collect(); + + if mr.data.keys().len() > 0 { + // TODO: it's a hack, should replace all metrics + let key = mr.data.keys().nth(0).unwrap().clone(); + let ts = mr.data.get_mut(&key).unwrap(); + *ts = subbeat::utils::interpolate_nans_and_gaps_with_zeros(&ts, from, to, step); + // mr.data.insert(*k, ts_interpolated); + } + return Ok(mr); } }