Browse Source

metric_service to detections runner + basic detections

pull/63/head
Alexey Velikiy 2 years ago
parent
commit
9904e4d449
  1. 2
      server/src/services/analytic_service/analytic_service.rs
  2. 29
      server/src/services/analytic_service/detection_runner.rs

2
server/src/services/analytic_service/analytic_service.rs

@ -135,7 +135,7 @@ impl AnalyticService {
}; };
let tx = self.tx.clone(); let tx = self.tx.clone();
let au = self.analytic_unit.as_ref().unwrap().clone(); let au = self.analytic_unit.as_ref().unwrap().clone();
let dr = DetectionRunner::new(tx, drcfg, au); let dr = DetectionRunner::new(self.metric_service.clone(), tx, drcfg, au);
self.detection_runner = Some(dr); self.detection_runner = Some(dr);
self.detection_runner.as_mut().unwrap().run(from); self.detection_runner.as_mut().unwrap().run(from);

29
server/src/services/analytic_service/detection_runner.rs

@ -2,6 +2,8 @@ use chrono::{Utc, DateTime};
use tokio::sync::{mpsc}; use tokio::sync::{mpsc};
use crate::services::metric_service::MetricService;
use super::types::{AnalyticServiceMessage, AnalyticUnitRF, DetectionRunnerConfig, ResponseType}; use super::types::{AnalyticServiceMessage, AnalyticUnitRF, DetectionRunnerConfig, ResponseType};
use tokio::time::{sleep, Duration}; use tokio::time::{sleep, Duration};
@ -10,6 +12,7 @@ const DETECTION_STEP: u64 = 10;
pub struct DetectionRunner { pub struct DetectionRunner {
metric_service: MetricService,
tx: mpsc::Sender<AnalyticServiceMessage>, tx: mpsc::Sender<AnalyticServiceMessage>,
config: DetectionRunnerConfig, config: DetectionRunnerConfig,
analytic_unit: AnalyticUnitRF, analytic_unit: AnalyticUnitRF,
@ -18,11 +21,13 @@ pub struct DetectionRunner {
impl DetectionRunner { impl DetectionRunner {
pub fn new( pub fn new(
metric_service: MetricService,
tx: mpsc::Sender<AnalyticServiceMessage>, tx: mpsc::Sender<AnalyticServiceMessage>,
config: DetectionRunnerConfig, config: DetectionRunnerConfig,
analytic_unit: AnalyticUnitRF, analytic_unit: AnalyticUnitRF,
) -> DetectionRunner { ) -> DetectionRunner {
DetectionRunner { DetectionRunner {
metric_service,
tx, tx,
config, config,
analytic_unit, analytic_unit,
@ -37,8 +42,8 @@ impl DetectionRunner {
self.running_handler.as_mut().unwrap().abort(); self.running_handler.as_mut().unwrap().abort();
} }
self.running_handler = Some(tokio::spawn({ self.running_handler = Some(tokio::spawn({
// TODO: clone channel
let cfg = self.config.clone(); let cfg = self.config.clone();
let ms = self.metric_service.clone();
let tx = self.tx.clone(); let tx = self.tx.clone();
let au = self.analytic_unit.clone(); let au = self.analytic_unit.clone();
async move { async move {
@ -46,6 +51,9 @@ impl DetectionRunner {
// TODO: parse detections to webhooks // TODO: parse detections to webhooks
// TODO: define window for detection // TODO: define window for detection
// TODO: handle case when detection is in the end and continues after "now" // TODO: handle case when detection is in the end and continues after "now"
let window_size = au.as_ref().read().await.get_detection_window();
let mut t_from = from - window_size;
let mut t_to = from;
match tx match tx
.send(AnalyticServiceMessage::Response(Ok( .send(AnalyticServiceMessage::Response(Ok(
@ -58,18 +66,25 @@ impl DetectionRunner {
} }
loop { loop {
// TODO: don't use DateTime, but count timestamp by steps let a = au.as_ref().read().await;
let now: DateTime<Utc> = Utc::now(); let detections = a.detect(ms.clone(), t_from, t_to).await.unwrap();
let to = now.timestamp() as u64;
for d in detections {
println!("detection: {} {}", d);
}
// TODO: run detection periodically // TODO: run detection periodically
sleep(Duration::from_secs(cfg.interval)).await; // TODO: set info about detections to tx
// TODO: update from / to based on detection step
match tx.send(AnalyticServiceMessage::Response(Ok( match tx.send(AnalyticServiceMessage::Response(Ok(
ResponseType::DetectionRunnerUpdate(au.as_ref().read().await.get_id(), to) ResponseType::DetectionRunnerUpdate(au.as_ref().read().await.get_id(), t_to)
))).await { ))).await {
Ok(_) => {}, Ok(_) => {},
Err(_e) => println!("Fail to send detection runner started notification"), Err(_e) => println!("Fail to send detection runner started notification"),
} }
sleep(Duration::from_secs(cfg.interval)).await;
} }
} }
})); }));

Loading…
Cancel
Save