diff --git a/server/src/services/analytic_service/analytic_service.rs b/server/src/services/analytic_service/analytic_service.rs index 81c9f22..090f108 100644 --- a/server/src/services/analytic_service/analytic_service.rs +++ b/server/src/services/analytic_service/analytic_service.rs @@ -135,7 +135,7 @@ impl AnalyticService { }; let tx = self.tx.clone(); let au = self.analytic_unit.as_ref().unwrap().clone(); - let dr = DetectionRunner::new(tx, drcfg, au); + let dr = DetectionRunner::new(self.metric_service.clone(), tx, drcfg, au); self.detection_runner = Some(dr); self.detection_runner.as_mut().unwrap().run(from); diff --git a/server/src/services/analytic_service/analytic_unit/anomaly_analytic_unit.rs b/server/src/services/analytic_service/analytic_unit/anomaly_analytic_unit.rs index 15ad5a3..cf23f1d 100644 --- a/server/src/services/analytic_service/analytic_unit/anomaly_analytic_unit.rs +++ b/server/src/services/analytic_service/analytic_unit/anomaly_analytic_unit.rs @@ -164,6 +164,10 @@ impl AnalyticUnit for AnomalyAnalyticUnit { fn get_id(&self) -> String { return self.id.to_owned(); } + fn get_detection_window(&self) -> u64 { + // TODO: return window based on real petterns info + return DETECTION_STEP; + } fn set_config(&mut self, config: AnalyticUnitConfig) { if let AnalyticUnitConfig::Anomaly(cfg) = config { self.config = cfg; diff --git a/server/src/services/analytic_service/analytic_unit/pattern_analytic_unit.rs b/server/src/services/analytic_service/analytic_unit/pattern_analytic_unit.rs index 0619526..3beb203 100644 --- a/server/src/services/analytic_service/analytic_unit/pattern_analytic_unit.rs +++ b/server/src/services/analytic_service/analytic_unit/pattern_analytic_unit.rs @@ -219,6 +219,10 @@ impl AnalyticUnit for PatternAnalyticUnit { fn get_id(&self) -> String { return self.id.to_owned(); } + fn get_detection_window(&self) -> u64 { + // TODO: return window based on real petterns info + return DETECTION_STEP; + } fn set_config(&mut self, config: AnalyticUnitConfig) { if let AnalyticUnitConfig::Pattern(cfg) = config { self.config = cfg; diff --git a/server/src/services/analytic_service/analytic_unit/threshold_analytic_unit.rs b/server/src/services/analytic_service/analytic_unit/threshold_analytic_unit.rs index 579c3ab..d65ed70 100644 --- a/server/src/services/analytic_service/analytic_unit/threshold_analytic_unit.rs +++ b/server/src/services/analytic_service/analytic_unit/threshold_analytic_unit.rs @@ -25,6 +25,9 @@ impl AnalyticUnit for ThresholdAnalyticUnit { fn get_id(&self) -> String { return self.id.to_owned(); } + fn get_detection_window(&self) -> u64 { + return DETECTION_STEP; + } async fn learn( &mut self, _ms: MetricService, diff --git a/server/src/services/analytic_service/analytic_unit/types.rs b/server/src/services/analytic_service/analytic_unit/types.rs index d7deaa6..b591d94 100644 --- a/server/src/services/analytic_service/analytic_unit/types.rs +++ b/server/src/services/analytic_service/analytic_unit/types.rs @@ -131,6 +131,7 @@ pub enum LearningResult { #[async_trait] pub trait AnalyticUnit { fn get_id(&self) -> String; + fn get_detection_window(&self) -> u64; async fn learn( &mut self, ms: MetricService, diff --git a/server/src/services/analytic_service/detection_runner.rs b/server/src/services/analytic_service/detection_runner.rs index e881483..6ec3e8e 100644 --- a/server/src/services/analytic_service/detection_runner.rs +++ b/server/src/services/analytic_service/detection_runner.rs @@ -1,11 +1,18 @@ use chrono::{Utc, DateTime}; -use tokio::sync::{mpsc, RwLock}; +use tokio::sync::{mpsc}; + +use crate::services::metric_service::MetricService; use super::types::{AnalyticServiceMessage, AnalyticUnitRF, DetectionRunnerConfig, ResponseType}; use tokio::time::{sleep, Duration}; + +const DETECTION_STEP: u64 = 10; + + pub struct DetectionRunner { + metric_service: MetricService, tx: mpsc::Sender, config: DetectionRunnerConfig, analytic_unit: AnalyticUnitRF, @@ -14,11 +21,13 @@ pub struct DetectionRunner { impl DetectionRunner { pub fn new( + metric_service: MetricService, tx: mpsc::Sender, config: DetectionRunnerConfig, analytic_unit: AnalyticUnitRF, ) -> DetectionRunner { DetectionRunner { + metric_service, tx, config, analytic_unit, @@ -33,16 +42,18 @@ impl DetectionRunner { self.running_handler.as_mut().unwrap().abort(); } self.running_handler = Some(tokio::spawn({ - // TODO: clone channel let cfg = self.config.clone(); + let ms = self.metric_service.clone(); let tx = self.tx.clone(); let au = self.analytic_unit.clone(); async move { // TODO: run detection "from" for big timespan // TODO: parse detections to webhooks // TODO: define window for detection - // TODO: save last detection // TODO: handle case when detection is in the end and continues after "now" + let window_size = au.as_ref().read().await.get_detection_window(); + let mut t_from = from - window_size; + let mut t_to = from; match tx .send(AnalyticServiceMessage::Response(Ok( @@ -55,17 +66,25 @@ impl DetectionRunner { } loop { - // TODO: don't use DateTime, but count timestamp by steps - let now: DateTime = Utc::now(); - let to = now.timestamp() as u64; + let a = au.as_ref().read().await; + let detections = a.detect(ms.clone(), t_from, t_to).await.unwrap(); + + for d in detections { + println!("detection: {} {}", d.0, d.1); + } + // TODO: run detection periodically - sleep(Duration::from_secs(cfg.interval)).await; + // TODO: set info about detections to tx + + match tx.send(AnalyticServiceMessage::Response(Ok( - ResponseType::DetectionRunnerUpdate(au.as_ref().read().await.get_id(), to) + ResponseType::DetectionRunnerUpdate(au.as_ref().read().await.get_id(), t_to) ))).await { Ok(_) => {}, Err(_e) => println!("Fail to send detection runner started notification"), } + + sleep(Duration::from_secs(cfg.interval)).await; } } }));