From 9473781c06e88b7898cca8d85a421a58675e470e Mon Sep 17 00:00:00 2001 From: Alexey Velikiy Date: Mon, 13 Dec 2021 21:34:41 +0300 Subject: [PATCH 1/2] dete4ction runner begin --- .../src/services/analytic_service/analytic_service.rs | 10 ++++++++-- server/src/services/analytic_service/types.rs | 7 +++++++ 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/server/src/services/analytic_service/analytic_service.rs b/server/src/services/analytic_service/analytic_service.rs index 9f2592d..66e8c14 100644 --- a/server/src/services/analytic_service/analytic_service.rs +++ b/server/src/services/analytic_service/analytic_service.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use super::analytic_unit::types::{AnalyticUnitConfig, PatchConfig}; use super::detection_runner::DetectionRunner; -use super::types::{self, AnalyticUnitRF, DetectionRunnerConfig, LearningWaiter, HSR}; +use super::types::{self, AnalyticUnitRF, DetectionRunnerConfig, LearningWaiter, HSR, DetectionRunnerTask}; use super::{ analytic_client::AnalyticClient, types::{AnalyticServiceMessage, LearningStatus, RequestType, ResponseType}, @@ -20,6 +20,7 @@ use crate::services::analytic_service::analytic_unit::types::{AnalyticUnit, Lear use anyhow; +use chrono::{TimeZone, DateTime, Utc}; use tokio::sync::{mpsc, oneshot}; // TODO: now it's basically single analytic unit, service will operate on many AU @@ -109,7 +110,12 @@ impl AnalyticService { } if self.analytic_unit_learning_status != LearningStatus::Ready { - // TODO: add to waiter + let now: DateTime = Utc::now(); + let from = now.timestamp() as u64; + let task = DetectionRunnerTask { + from: from + } + self.learning_waiters.push(LearningWaiter::DetectionRunner(task)); return; } diff --git a/server/src/services/analytic_service/types.rs b/server/src/services/analytic_service/types.rs index 2b702a9..880d634 100644 --- a/server/src/services/analytic_service/types.rs +++ b/server/src/services/analytic_service/types.rs @@ -63,6 +63,12 @@ pub struct DetectionTask { pub to: u64, } +#[derive(Debug)] +pub struct DetectionRunnerTask { + pub from: u64, +} + + #[derive(Debug, Serialize)] pub struct AnomalyHSRConfig { pub timestamp: u64, @@ -88,6 +94,7 @@ pub struct HSRTask { #[derive(Debug)] pub enum LearningWaiter { Detection(DetectionTask), + DetectionRunner(DetectionRunnerTask), HSR(HSRTask), } From e527c54eed93fcdd6b88391461f6b7f373afb396 Mon Sep 17 00:00:00 2001 From: Alexey Velikiy Date: Mon, 13 Dec 2021 22:14:57 +0300 Subject: [PATCH 2/2] run_detection_runner --- .../analytic_service/analytic_service.rs | 43 ++++++++++++------- 1 file changed, 27 insertions(+), 16 deletions(-) diff --git a/server/src/services/analytic_service/analytic_service.rs b/server/src/services/analytic_service/analytic_service.rs index 66e8c14..b63ca32 100644 --- a/server/src/services/analytic_service/analytic_service.rs +++ b/server/src/services/analytic_service/analytic_service.rs @@ -83,26 +83,36 @@ impl AnalyticService { AnalyticClient::new(self.tx.clone()) } - fn run_learning_waiter(&self, learning_waiter: LearningWaiter) { + fn run_learning_waiter(&mut self, learning_waiter: LearningWaiter) { // TODO: save handler of the task - tokio::spawn({ - let ms = self.metric_service.clone(); - let au = self.analytic_unit.as_ref().unwrap().clone(); - async move { - match learning_waiter { - LearningWaiter::Detection(task) => { + match learning_waiter { + LearningWaiter::Detection(task) => { + tokio::spawn({ + let ms = self.metric_service.clone(); + let au = self.analytic_unit.as_ref().unwrap().clone(); + async move { AnalyticService::get_detections(task.sender, au, ms, task.from, task.to) .await } - LearningWaiter::HSR(task) => { + }); + } + LearningWaiter::HSR(task) => { + tokio::spawn({ + let ms = self.metric_service.clone(); + let au = self.analytic_unit.as_ref().unwrap().clone(); + async move { AnalyticService::get_hsr(task.sender, au, ms, task.from, task.to).await } - } + }); + } + LearningWaiter::DetectionRunner(task) => { + self.run_detection_runner(task.from); } - }); + } } - fn run_detection_runner(&mut self) { + // TODO: make + fn run_detection_runner(&mut self, from: u64) { // TODO: handle case or make it impossible to run_detection_runner second time if self.analytic_unit.is_none() { @@ -110,11 +120,9 @@ impl AnalyticService { } if self.analytic_unit_learning_status != LearningStatus::Ready { - let now: DateTime = Utc::now(); - let from = now.timestamp() as u64; let task = DetectionRunnerTask { - from: from - } + from + }; self.learning_waiters.push(LearningWaiter::DetectionRunner(task)); return; } @@ -293,7 +301,10 @@ impl AnalyticService { // TODO: remove this hack self.consume_request(RequestType::RunLearning); if self.alerting.is_some() { - self.run_detection_runner(); + // TODO: get it from persistance + let now: DateTime = Utc::now(); + let from = now.timestamp() as u64; + self.run_detection_runner(from); } while let Some(message) = self.rx.recv().await {