From 926f49c168ae6ed32f2fc7fb320bc95b16a033ff Mon Sep 17 00:00:00 2001 From: Alexey Velikiy Date: Sun, 19 Dec 2021 19:13:16 +0300 Subject: [PATCH] basic detector runniung --- .../services/analytic_service/analytic_service.rs | 9 ++++----- .../services/analytic_service/detection_runner.rs | 13 +++++++++---- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/server/src/services/analytic_service/analytic_service.rs b/server/src/services/analytic_service/analytic_service.rs index ff0c5cc..bd5756a 100644 --- a/server/src/services/analytic_service/analytic_service.rs +++ b/server/src/services/analytic_service/analytic_service.rs @@ -35,6 +35,7 @@ pub struct AnalyticService { analytic_unit_config: AnalyticUnitConfig, analytic_unit_learning_status: LearningStatus, + // TODO: add comment about how it's used tx: mpsc::Sender, rx: mpsc::Receiver, @@ -113,16 +114,14 @@ impl AnalyticService { // TODO: make fn run_detection_runner(&mut self, from: u64) { + println!("run_detection_runner"); // TODO: handle case or make it impossible to run_detection_runner second time - if self.analytic_unit.is_none() { - return; - } - if self.analytic_unit_learning_status != LearningStatus::Ready { let task = DetectionRunnerTask { from }; + println!("add learning waiter"); self.learning_waiters.push(LearningWaiter::DetectionRunner(task)); return; } @@ -135,7 +134,7 @@ impl AnalyticService { let dr = DetectionRunner::new(drcfg, self.analytic_unit.as_ref().unwrap().clone()); self.detection_runner = Some(dr); - // dr.run(); + self.detection_runner.as_mut().unwrap().run(from); // TODO: create DetectionRunnerConfig from alerting // TODO: rerun detection runner on analytic unit change diff --git a/server/src/services/analytic_service/detection_runner.rs b/server/src/services/analytic_service/detection_runner.rs index a25801d..a00a3b5 100644 --- a/server/src/services/analytic_service/detection_runner.rs +++ b/server/src/services/analytic_service/detection_runner.rs @@ -24,9 +24,10 @@ impl DetectionRunner { } } - pub async fn run(&mut self, from: u64) { + pub fn run(&mut self, from: u64) { // TODO: get last detection timestamp from persistance // TODO: set last detection from "now" + println!("Run begn"); if self.running_handler.is_some() { self.running_handler.as_mut().unwrap().abort(); } @@ -37,8 +38,11 @@ impl DetectionRunner { // AnalyticService::run_learning(tx, cfg, ms, ss).await; // TODO: run detection "from" // TODO: define window for detection + // TODO: save last detection + loop { + println!("runner detect"); // TODO: run detection periodically sleep(Duration::from_secs(cfg.interval)).await; } @@ -46,9 +50,10 @@ impl DetectionRunner { })); } - pub async fn set_analytic_unit( - analytic_unit: Arc>>, + pub async fn set_analytic_unit(&mut self, analytic_unit: AnalyticUnitRF, ) { - // TODO: implement + self.analytic_unit = analytic_unit; + // TODO: stop running_handler + // TODO: rerun detection with new anomaly units } }