From 0761e4d9b2c53d87912a90537d8beba19e5ff190 Mon Sep 17 00:00:00 2001 From: Alexey Velikiy Date: Tue, 21 Dec 2021 22:51:34 +0300 Subject: [PATCH 1/3] detections window begin --- .../analytic_unit/anomaly_analytic_unit.rs | 4 ++++ .../analytic_unit/pattern_analytic_unit.rs | 4 ++++ .../analytic_unit/threshold_analytic_unit.rs | 3 +++ .../src/services/analytic_service/analytic_unit/types.rs | 1 + server/src/services/analytic_service/detection_runner.rs | 8 ++++++-- 5 files changed, 18 insertions(+), 2 deletions(-) diff --git a/server/src/services/analytic_service/analytic_unit/anomaly_analytic_unit.rs b/server/src/services/analytic_service/analytic_unit/anomaly_analytic_unit.rs index 15ad5a3..cf23f1d 100644 --- a/server/src/services/analytic_service/analytic_unit/anomaly_analytic_unit.rs +++ b/server/src/services/analytic_service/analytic_unit/anomaly_analytic_unit.rs @@ -164,6 +164,10 @@ impl AnalyticUnit for AnomalyAnalyticUnit { fn get_id(&self) -> String { return self.id.to_owned(); } + fn get_detection_window(&self) -> u64 { + // TODO: return window based on real petterns info + return DETECTION_STEP; + } fn set_config(&mut self, config: AnalyticUnitConfig) { if let AnalyticUnitConfig::Anomaly(cfg) = config { self.config = cfg; diff --git a/server/src/services/analytic_service/analytic_unit/pattern_analytic_unit.rs b/server/src/services/analytic_service/analytic_unit/pattern_analytic_unit.rs index 0619526..3beb203 100644 --- a/server/src/services/analytic_service/analytic_unit/pattern_analytic_unit.rs +++ b/server/src/services/analytic_service/analytic_unit/pattern_analytic_unit.rs @@ -219,6 +219,10 @@ impl AnalyticUnit for PatternAnalyticUnit { fn get_id(&self) -> String { return self.id.to_owned(); } + fn get_detection_window(&self) -> u64 { + // TODO: return window based on real petterns info + return DETECTION_STEP; + } fn set_config(&mut self, config: AnalyticUnitConfig) { if let AnalyticUnitConfig::Pattern(cfg) = config { self.config = cfg; diff --git a/server/src/services/analytic_service/analytic_unit/threshold_analytic_unit.rs b/server/src/services/analytic_service/analytic_unit/threshold_analytic_unit.rs index 579c3ab..d65ed70 100644 --- a/server/src/services/analytic_service/analytic_unit/threshold_analytic_unit.rs +++ b/server/src/services/analytic_service/analytic_unit/threshold_analytic_unit.rs @@ -25,6 +25,9 @@ impl AnalyticUnit for ThresholdAnalyticUnit { fn get_id(&self) -> String { return self.id.to_owned(); } + fn get_detection_window(&self) -> u64 { + return DETECTION_STEP; + } async fn learn( &mut self, _ms: MetricService, diff --git a/server/src/services/analytic_service/analytic_unit/types.rs b/server/src/services/analytic_service/analytic_unit/types.rs index d7deaa6..b591d94 100644 --- a/server/src/services/analytic_service/analytic_unit/types.rs +++ b/server/src/services/analytic_service/analytic_unit/types.rs @@ -131,6 +131,7 @@ pub enum LearningResult { #[async_trait] pub trait AnalyticUnit { fn get_id(&self) -> String; + fn get_detection_window(&self) -> u64; async fn learn( &mut self, ms: MetricService, diff --git a/server/src/services/analytic_service/detection_runner.rs b/server/src/services/analytic_service/detection_runner.rs index e881483..a634b83 100644 --- a/server/src/services/analytic_service/detection_runner.rs +++ b/server/src/services/analytic_service/detection_runner.rs @@ -1,10 +1,14 @@ use chrono::{Utc, DateTime}; -use tokio::sync::{mpsc, RwLock}; +use tokio::sync::{mpsc}; use super::types::{AnalyticServiceMessage, AnalyticUnitRF, DetectionRunnerConfig, ResponseType}; use tokio::time::{sleep, Duration}; + +const DETECTION_STEP: u64 = 10; + + pub struct DetectionRunner { tx: mpsc::Sender, config: DetectionRunnerConfig, @@ -41,7 +45,6 @@ impl DetectionRunner { // TODO: run detection "from" for big timespan // TODO: parse detections to webhooks // TODO: define window for detection - // TODO: save last detection // TODO: handle case when detection is in the end and continues after "now" match tx @@ -60,6 +63,7 @@ impl DetectionRunner { let to = now.timestamp() as u64; // TODO: run detection periodically sleep(Duration::from_secs(cfg.interval)).await; + // TODO: update from / to based on detection step match tx.send(AnalyticServiceMessage::Response(Ok( ResponseType::DetectionRunnerUpdate(au.as_ref().read().await.get_id(), to) ))).await { From 9904e4d4491afdcbe14492b86c9829707ba491a6 Mon Sep 17 00:00:00 2001 From: Alexey Velikiy Date: Tue, 21 Dec 2021 23:14:36 +0300 Subject: [PATCH 2/3] metric_service to detections runner + basic detections --- .../analytic_service/analytic_service.rs | 2 +- .../analytic_service/detection_runner.rs | 29 ++++++++++++++----- 2 files changed, 23 insertions(+), 8 deletions(-) diff --git a/server/src/services/analytic_service/analytic_service.rs b/server/src/services/analytic_service/analytic_service.rs index 81c9f22..090f108 100644 --- a/server/src/services/analytic_service/analytic_service.rs +++ b/server/src/services/analytic_service/analytic_service.rs @@ -135,7 +135,7 @@ impl AnalyticService { }; let tx = self.tx.clone(); let au = self.analytic_unit.as_ref().unwrap().clone(); - let dr = DetectionRunner::new(tx, drcfg, au); + let dr = DetectionRunner::new(self.metric_service.clone(), tx, drcfg, au); self.detection_runner = Some(dr); self.detection_runner.as_mut().unwrap().run(from); diff --git a/server/src/services/analytic_service/detection_runner.rs b/server/src/services/analytic_service/detection_runner.rs index a634b83..49f1357 100644 --- a/server/src/services/analytic_service/detection_runner.rs +++ b/server/src/services/analytic_service/detection_runner.rs @@ -2,6 +2,8 @@ use chrono::{Utc, DateTime}; use tokio::sync::{mpsc}; +use crate::services::metric_service::MetricService; + use super::types::{AnalyticServiceMessage, AnalyticUnitRF, DetectionRunnerConfig, ResponseType}; use tokio::time::{sleep, Duration}; @@ -10,6 +12,7 @@ const DETECTION_STEP: u64 = 10; pub struct DetectionRunner { + metric_service: MetricService, tx: mpsc::Sender, config: DetectionRunnerConfig, analytic_unit: AnalyticUnitRF, @@ -18,11 +21,13 @@ pub struct DetectionRunner { impl DetectionRunner { pub fn new( + metric_service: MetricService, tx: mpsc::Sender, config: DetectionRunnerConfig, analytic_unit: AnalyticUnitRF, ) -> DetectionRunner { DetectionRunner { + metric_service, tx, config, analytic_unit, @@ -37,8 +42,8 @@ impl DetectionRunner { self.running_handler.as_mut().unwrap().abort(); } self.running_handler = Some(tokio::spawn({ - // TODO: clone channel let cfg = self.config.clone(); + let ms = self.metric_service.clone(); let tx = self.tx.clone(); let au = self.analytic_unit.clone(); async move { @@ -46,6 +51,9 @@ impl DetectionRunner { // TODO: parse detections to webhooks // TODO: define window for detection // TODO: handle case when detection is in the end and continues after "now" + let window_size = au.as_ref().read().await.get_detection_window(); + let mut t_from = from - window_size; + let mut t_to = from; match tx .send(AnalyticServiceMessage::Response(Ok( @@ -58,18 +66,25 @@ impl DetectionRunner { } loop { - // TODO: don't use DateTime, but count timestamp by steps - let now: DateTime = Utc::now(); - let to = now.timestamp() as u64; + let a = au.as_ref().read().await; + let detections = a.detect(ms.clone(), t_from, t_to).await.unwrap(); + + for d in detections { + println!("detection: {} {}", d); + } + // TODO: run detection periodically - sleep(Duration::from_secs(cfg.interval)).await; - // TODO: update from / to based on detection step + // TODO: set info about detections to tx + + match tx.send(AnalyticServiceMessage::Response(Ok( - ResponseType::DetectionRunnerUpdate(au.as_ref().read().await.get_id(), to) + ResponseType::DetectionRunnerUpdate(au.as_ref().read().await.get_id(), t_to) ))).await { Ok(_) => {}, Err(_e) => println!("Fail to send detection runner started notification"), } + + sleep(Duration::from_secs(cfg.interval)).await; } } })); From a338b3339ba39b92cbb979bdac45812977c36d99 Mon Sep 17 00:00:00 2001 From: Alexey Velikiy Date: Tue, 21 Dec 2021 23:15:16 +0300 Subject: [PATCH 3/3] detectoin println fix --- server/src/services/analytic_service/detection_runner.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/services/analytic_service/detection_runner.rs b/server/src/services/analytic_service/detection_runner.rs index 49f1357..6ec3e8e 100644 --- a/server/src/services/analytic_service/detection_runner.rs +++ b/server/src/services/analytic_service/detection_runner.rs @@ -70,7 +70,7 @@ impl DetectionRunner { let detections = a.detect(ms.clone(), t_from, t_to).await.unwrap(); for d in detections { - println!("detection: {} {}", d); + println!("detection: {} {}", d.0, d.1); } // TODO: run detection periodically