diff --git a/server/src/services/analytic_service/analytic_service.rs b/server/src/services/analytic_service/analytic_service.rs index e8c1711..ff0c5cc 100644 --- a/server/src/services/analytic_service/analytic_service.rs +++ b/server/src/services/analytic_service/analytic_service.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use super::analytic_unit::types::{AnalyticUnitConfig, PatchConfig}; use super::detection_runner::DetectionRunner; -use super::types::{self, AnalyticUnitRF, DetectionRunnerConfig, LearningWaiter, HSR}; +use super::types::{self, AnalyticUnitRF, DetectionRunnerConfig, LearningWaiter, HSR, DetectionRunnerTask}; use super::{ analytic_client::AnalyticClient, types::{AnalyticServiceMessage, LearningStatus, RequestType, ResponseType}, @@ -20,6 +20,7 @@ use crate::services::analytic_service::analytic_unit::types::{AnalyticUnit, Lear use anyhow; +use chrono::{TimeZone, DateTime, Utc}; use tokio::sync::{mpsc, oneshot}; // TODO: now it's basically single analytic unit, service will operate on many AU @@ -82,26 +83,36 @@ impl AnalyticService { AnalyticClient::new(self.tx.clone()) } - fn run_learning_waiter(&self, learning_waiter: LearningWaiter) { + fn run_learning_waiter(&mut self, learning_waiter: LearningWaiter) { // TODO: save handler of the task - tokio::spawn({ - let ms = self.metric_service.clone(); - let au = self.analytic_unit.as_ref().unwrap().clone(); - async move { - match learning_waiter { - LearningWaiter::Detection(task) => { + match learning_waiter { + LearningWaiter::Detection(task) => { + tokio::spawn({ + let ms = self.metric_service.clone(); + let au = self.analytic_unit.as_ref().unwrap().clone(); + async move { AnalyticService::get_detections(task.sender, au, ms, task.from, task.to) .await } - LearningWaiter::HSR(task) => { + }); + } + LearningWaiter::HSR(task) => { + tokio::spawn({ + let ms = self.metric_service.clone(); + let au = self.analytic_unit.as_ref().unwrap().clone(); + async move { AnalyticService::get_hsr(task.sender, au, ms, task.from, task.to).await } - } + }); } - }); + LearningWaiter::DetectionRunner(task) => { + self.run_detection_runner(task.from); + } + } } - fn run_detection_runner(&mut self) { + // TODO: make + fn run_detection_runner(&mut self, from: u64) { // TODO: handle case or make it impossible to run_detection_runner second time if self.analytic_unit.is_none() { @@ -109,7 +120,10 @@ impl AnalyticService { } if self.analytic_unit_learning_status != LearningStatus::Ready { - // TODO: add to waiter + let task = DetectionRunnerTask { + from + }; + self.learning_waiters.push(LearningWaiter::DetectionRunner(task)); return; } @@ -290,7 +304,10 @@ impl AnalyticService { // TODO: remove this hack self.consume_request(RequestType::RunLearning); if self.alerting.is_some() { - self.run_detection_runner(); + // TODO: get it from persistance + let now: DateTime = Utc::now(); + let from = now.timestamp() as u64; + self.run_detection_runner(from); } while let Some(message) = self.rx.recv().await { diff --git a/server/src/services/analytic_service/types.rs b/server/src/services/analytic_service/types.rs index 922dc46..767c7b1 100644 --- a/server/src/services/analytic_service/types.rs +++ b/server/src/services/analytic_service/types.rs @@ -62,6 +62,12 @@ pub struct DetectionTask { pub to: u64, } +#[derive(Debug)] +pub struct DetectionRunnerTask { + pub from: u64, +} + + #[derive(Debug, Serialize)] pub struct AnomalyHSRConfig { pub timestamp: u64, @@ -87,6 +93,7 @@ pub struct HSRTask { #[derive(Debug)] pub enum LearningWaiter { Detection(DetectionTask), + DetectionRunner(DetectionRunnerTask), HSR(HSRTask), }