Browse Source

basic detector runniung

detection_runner_updade
Alexey Velikiy 2 years ago
parent
commit
926f49c168
  1. 9
      server/src/services/analytic_service/analytic_service.rs
  2. 13
      server/src/services/analytic_service/detection_runner.rs

9
server/src/services/analytic_service/analytic_service.rs

@ -35,6 +35,7 @@ pub struct AnalyticService {
analytic_unit_config: AnalyticUnitConfig, analytic_unit_config: AnalyticUnitConfig,
analytic_unit_learning_status: LearningStatus, analytic_unit_learning_status: LearningStatus,
// TODO: add comment about how it's used
tx: mpsc::Sender<AnalyticServiceMessage>, tx: mpsc::Sender<AnalyticServiceMessage>,
rx: mpsc::Receiver<AnalyticServiceMessage>, rx: mpsc::Receiver<AnalyticServiceMessage>,
@ -113,16 +114,14 @@ impl AnalyticService {
// TODO: make // TODO: make
fn run_detection_runner(&mut self, from: u64) { fn run_detection_runner(&mut self, from: u64) {
println!("run_detection_runner");
// TODO: handle case or make it impossible to run_detection_runner second time // TODO: handle case or make it impossible to run_detection_runner second time
if self.analytic_unit.is_none() {
return;
}
if self.analytic_unit_learning_status != LearningStatus::Ready { if self.analytic_unit_learning_status != LearningStatus::Ready {
let task = DetectionRunnerTask { let task = DetectionRunnerTask {
from from
}; };
println!("add learning waiter");
self.learning_waiters.push(LearningWaiter::DetectionRunner(task)); self.learning_waiters.push(LearningWaiter::DetectionRunner(task));
return; return;
} }
@ -135,7 +134,7 @@ impl AnalyticService {
let dr = DetectionRunner::new(drcfg, self.analytic_unit.as_ref().unwrap().clone()); let dr = DetectionRunner::new(drcfg, self.analytic_unit.as_ref().unwrap().clone());
self.detection_runner = Some(dr); self.detection_runner = Some(dr);
// dr.run(); self.detection_runner.as_mut().unwrap().run(from);
// TODO: create DetectionRunnerConfig from alerting // TODO: create DetectionRunnerConfig from alerting
// TODO: rerun detection runner on analytic unit change // TODO: rerun detection runner on analytic unit change

13
server/src/services/analytic_service/detection_runner.rs

@ -24,9 +24,10 @@ impl DetectionRunner {
} }
} }
pub async fn run(&mut self, from: u64) { pub fn run(&mut self, from: u64) {
// TODO: get last detection timestamp from persistance // TODO: get last detection timestamp from persistance
// TODO: set last detection from "now" // TODO: set last detection from "now"
println!("Run begn");
if self.running_handler.is_some() { if self.running_handler.is_some() {
self.running_handler.as_mut().unwrap().abort(); self.running_handler.as_mut().unwrap().abort();
} }
@ -37,8 +38,11 @@ impl DetectionRunner {
// AnalyticService::run_learning(tx, cfg, ms, ss).await; // AnalyticService::run_learning(tx, cfg, ms, ss).await;
// TODO: run detection "from" // TODO: run detection "from"
// TODO: define window for detection // TODO: define window for detection
// TODO: save last detection
loop { loop {
println!("runner detect");
// TODO: run detection periodically // TODO: run detection periodically
sleep(Duration::from_secs(cfg.interval)).await; sleep(Duration::from_secs(cfg.interval)).await;
} }
@ -46,9 +50,10 @@ impl DetectionRunner {
})); }));
} }
pub async fn set_analytic_unit( pub async fn set_analytic_unit(&mut self, analytic_unit: AnalyticUnitRF,
analytic_unit: Arc<RwLock<Box<dyn AnalyticUnit + Send + Sync>>>,
) { ) {
// TODO: implement self.analytic_unit = analytic_unit;
// TODO: stop running_handler
// TODO: rerun detection with new anomaly units
} }
} }

Loading…
Cancel
Save