From 9904e4d4491afdcbe14492b86c9829707ba491a6 Mon Sep 17 00:00:00 2001 From: Alexey Velikiy Date: Tue, 21 Dec 2021 23:14:36 +0300 Subject: [PATCH] metric_service to detections runner + basic detections --- .../analytic_service/analytic_service.rs | 2 +- .../analytic_service/detection_runner.rs | 29 ++++++++++++++----- 2 files changed, 23 insertions(+), 8 deletions(-) diff --git a/server/src/services/analytic_service/analytic_service.rs b/server/src/services/analytic_service/analytic_service.rs index 81c9f22..090f108 100644 --- a/server/src/services/analytic_service/analytic_service.rs +++ b/server/src/services/analytic_service/analytic_service.rs @@ -135,7 +135,7 @@ impl AnalyticService { }; let tx = self.tx.clone(); let au = self.analytic_unit.as_ref().unwrap().clone(); - let dr = DetectionRunner::new(tx, drcfg, au); + let dr = DetectionRunner::new(self.metric_service.clone(), tx, drcfg, au); self.detection_runner = Some(dr); self.detection_runner.as_mut().unwrap().run(from); diff --git a/server/src/services/analytic_service/detection_runner.rs b/server/src/services/analytic_service/detection_runner.rs index a634b83..49f1357 100644 --- a/server/src/services/analytic_service/detection_runner.rs +++ b/server/src/services/analytic_service/detection_runner.rs @@ -2,6 +2,8 @@ use chrono::{Utc, DateTime}; use tokio::sync::{mpsc}; +use crate::services::metric_service::MetricService; + use super::types::{AnalyticServiceMessage, AnalyticUnitRF, DetectionRunnerConfig, ResponseType}; use tokio::time::{sleep, Duration}; @@ -10,6 +12,7 @@ const DETECTION_STEP: u64 = 10; pub struct DetectionRunner { + metric_service: MetricService, tx: mpsc::Sender, config: DetectionRunnerConfig, analytic_unit: AnalyticUnitRF, @@ -18,11 +21,13 @@ pub struct DetectionRunner { impl DetectionRunner { pub fn new( + metric_service: MetricService, tx: mpsc::Sender, config: DetectionRunnerConfig, analytic_unit: AnalyticUnitRF, ) -> DetectionRunner { DetectionRunner { + metric_service, tx, config, analytic_unit, @@ -37,8 +42,8 @@ impl DetectionRunner { self.running_handler.as_mut().unwrap().abort(); } self.running_handler = Some(tokio::spawn({ - // TODO: clone channel let cfg = self.config.clone(); + let ms = self.metric_service.clone(); let tx = self.tx.clone(); let au = self.analytic_unit.clone(); async move { @@ -46,6 +51,9 @@ impl DetectionRunner { // TODO: parse detections to webhooks // TODO: define window for detection // TODO: handle case when detection is in the end and continues after "now" + let window_size = au.as_ref().read().await.get_detection_window(); + let mut t_from = from - window_size; + let mut t_to = from; match tx .send(AnalyticServiceMessage::Response(Ok( @@ -58,18 +66,25 @@ impl DetectionRunner { } loop { - // TODO: don't use DateTime, but count timestamp by steps - let now: DateTime = Utc::now(); - let to = now.timestamp() as u64; + let a = au.as_ref().read().await; + let detections = a.detect(ms.clone(), t_from, t_to).await.unwrap(); + + for d in detections { + println!("detection: {} {}", d); + } + // TODO: run detection periodically - sleep(Duration::from_secs(cfg.interval)).await; - // TODO: update from / to based on detection step + // TODO: set info about detections to tx + + match tx.send(AnalyticServiceMessage::Response(Ok( - ResponseType::DetectionRunnerUpdate(au.as_ref().read().await.get_id(), to) + ResponseType::DetectionRunnerUpdate(au.as_ref().read().await.get_id(), t_to) ))).await { Ok(_) => {}, Err(_e) => println!("Fail to send detection runner started notification"), } + + sleep(Duration::from_secs(cfg.interval)).await; } } }));