From 8975a501fa38c07ed872a50e1b33bae0718a3d7d Mon Sep 17 00:00:00 2001 From: Alexey Velikiy Date: Wed, 10 Nov 2021 17:36:11 +0300 Subject: [PATCH] hsr begin --- .../analytic_service/analytic_client.rs | 16 ++++++++ .../analytic_service/analytic_service.rs | 41 +++++++++++++++---- .../analytic_unit/anomaly_analytic_unit.rs | 27 ++++++++++++ .../analytic_unit/pattern_analytic_unit.rs | 18 ++++++++ .../analytic_unit/threshold_analytic_unit.rs | 18 ++++++++ .../analytic_service/analytic_unit/types.rs | 1 + server/src/services/analytic_service/types.rs | 15 +++++++ 7 files changed, 129 insertions(+), 7 deletions(-) diff --git a/server/src/services/analytic_service/analytic_client.rs b/server/src/services/analytic_service/analytic_client.rs index 098ea9f..62d2442 100644 --- a/server/src/services/analytic_service/analytic_client.rs +++ b/server/src/services/analytic_service/analytic_client.rs @@ -7,6 +7,7 @@ use crate::services::segments_service::Segment; use super::analytic_unit::types::AnalyticUnitConfig; use super::analytic_unit::types::PatchConfig; use super::types::DetectionTask; +use super::types::HSRTask; use super::types::LearningStatus; use super::types::LearningTrain; use super::types::{AnalyticServiceMessage, RequestType}; @@ -75,4 +76,19 @@ impl AnalyticClient { Err(e) => Ok(Vec::new()), } } + + pub async fn get_hsr(&self, from: u64, to: u64) -> anyhow::Result> { + let (tx, rx) = oneshot::channel(); + let req = AnalyticServiceMessage::Request(RequestType::GetHSR(HSRTask { + sender: tx, + from, + to, + })); + self.tx.send(req).await?; + // TODO: handle second error + match rx.await? { + Ok(r) => Ok(r), + Err(e) => Ok(Vec::new()), + } + } } diff --git a/server/src/services/analytic_service/analytic_service.rs b/server/src/services/analytic_service/analytic_service.rs index 1dabf44..cd7e762 100644 --- a/server/src/services/analytic_service/analytic_service.rs +++ b/server/src/services/analytic_service/analytic_service.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use super::analytic_unit::types::{AnalyticUnitConfig, PatchConfig, PatternConfig}; -use super::types::{self, DetectionRunnerConfig, LearningTrain}; +use super::types::{self, DetectionRunnerConfig, LearningTrain, LearningWaiter}; use super::{ analytic_client::AnalyticClient, analytic_unit::pattern_analytic_unit::{self, LearningResults, PatternAnalyticUnit}, @@ -42,7 +42,7 @@ pub struct AnalyticService { learning_handler: Option>, // awaiters - learning_waiters: Vec, + learning_waiters: Vec, // runner runner_handler: Option>, @@ -84,13 +84,17 @@ impl AnalyticService { AnalyticClient::new(self.tx.clone()) } - fn run_detection_task(&self, task: DetectionTask) { + fn run_learning_waiter(&self, learning_waiter: LearningWaiter) { // TODO: save handler of the task tokio::spawn({ let ms = self.metric_service.clone(); let au = self.analytic_unit.as_ref().unwrap().clone(); async move { - AnalyticService::get_detections(task.sender, au, ms, task.from, task.to).await; + match learning_waiter { + LearningWaiter::Detection(task) => AnalyticService::get_detections(task.sender, au, ms, task.from, task.to).await, + LearningWaiter::HSR(task) => AnalyticService::get_hsr(task.sender, au, ms, task.from, task.to).await, + } + } }); } @@ -129,6 +133,7 @@ impl AnalyticService { })); } RequestType::RunDetection(task) => { + // TODO: signle source of truth: Option vs LearningStatus if self.analytic_unit_learning_status == LearningStatus::Initialization { match task .sender @@ -143,9 +148,9 @@ impl AnalyticService { return; } if self.analytic_unit_learning_status == LearningStatus::Ready { - self.run_detection_task(task); + self.run_learning_waiter(LearningWaiter::Detection(task)); } else { - self.learning_waiters.push(task); + self.learning_waiters.push(LearningWaiter::Detection(task)); } } RequestType::GetStatus(tx) => { @@ -172,6 +177,10 @@ impl AnalyticService { self.patch_config(patch_obj, tx); // tx.send(()).unwrap(); } + RequestType::GetHSR(task) => { + // self.analytic_unit. + // TODO: implement + } }; } @@ -190,7 +199,7 @@ impl AnalyticService { // TODO: run tasks from self.learning_waiter while self.learning_waiters.len() > 0 { let task = self.learning_waiters.pop().unwrap(); - self.run_detection_task(task); + self.run_learning_waiter(task); } // TODO: fix this @@ -329,4 +338,22 @@ impl AnalyticService { } return; } + + async fn get_hsr( + tx: oneshot::Sender>>, + analytic_unit: Arc>>, + ms: MetricService, + from: u64, + to: u64, + ) { + let hsr = analytic_unit.read().await.get_hsr(ms, from, to).await.unwrap(); + + match tx.send(Ok(hsr)) { + Ok(_) => {} + Err(_e) => { + println!("failed to send results"); + } + } + + } } diff --git a/server/src/services/analytic_service/analytic_unit/anomaly_analytic_unit.rs b/server/src/services/analytic_service/analytic_unit/anomaly_analytic_unit.rs index 66c6401..02d9509 100644 --- a/server/src/services/analytic_service/analytic_unit/anomaly_analytic_unit.rs +++ b/server/src/services/analytic_service/analytic_unit/anomaly_analytic_unit.rs @@ -46,9 +46,36 @@ impl AnalyticUnit for AnomalyAnalyticUnit { let k = mr.data.keys().nth(0).unwrap(); let ts = &mr.data[k]; + if ts.len() == 0 { + return Ok(Vec::new()); + } + + let ct = ts[0]; + + // TODO: implement // TODO: decide what to do it from is Some() in the end Ok(Default::default()) } + + async fn get_hsr( + &self, + ms: MetricService, + from: u64, + to: u64, + ) -> anyhow::Result> { + + // TODO: implement + let mr = ms.query(from, to, DETECTION_STEP).await.unwrap(); + + if mr.data.keys().len() == 0 { + return Ok(Vec::new()); + } + + let k = mr.data.keys().nth(0).unwrap(); + let ts = mr.data[k].clone(); + + Ok(ts) + } } diff --git a/server/src/services/analytic_service/analytic_unit/pattern_analytic_unit.rs b/server/src/services/analytic_service/analytic_unit/pattern_analytic_unit.rs index e7f30a0..d028b2a 100644 --- a/server/src/services/analytic_service/analytic_unit/pattern_analytic_unit.rs +++ b/server/src/services/analytic_service/analytic_unit/pattern_analytic_unit.rs @@ -398,4 +398,22 @@ impl AnalyticUnit for PatternAnalyticUnit { Ok(results) } + + async fn get_hsr( + &self, + ms: MetricService, + from: u64, + to: u64, + ) -> anyhow::Result> { + let mr = ms.query(from, to, DETECTION_STEP).await.unwrap(); + + if mr.data.keys().len() == 0 { + return Ok(Vec::new()); + } + + let k = mr.data.keys().nth(0).unwrap(); + let ts = mr.data[k].clone(); + + Ok(ts) + } } diff --git a/server/src/services/analytic_service/analytic_unit/threshold_analytic_unit.rs b/server/src/services/analytic_service/analytic_unit/threshold_analytic_unit.rs index ba95b58..5c46908 100644 --- a/server/src/services/analytic_service/analytic_unit/threshold_analytic_unit.rs +++ b/server/src/services/analytic_service/analytic_unit/threshold_analytic_unit.rs @@ -74,4 +74,22 @@ impl AnalyticUnit for ThresholdAnalyticUnit { Ok(result) } + + async fn get_hsr( + &self, + ms: MetricService, + from: u64, + to: u64, + ) -> anyhow::Result> { + let mr = ms.query(from, to, DETECTION_STEP).await.unwrap(); + + if mr.data.keys().len() == 0 { + return Ok(Vec::new()); + } + + let k = mr.data.keys().nth(0).unwrap(); + let ts = mr.data[k].clone(); + + Ok(ts) + } } diff --git a/server/src/services/analytic_service/analytic_unit/types.rs b/server/src/services/analytic_service/analytic_unit/types.rs index 6345510..0f63507 100644 --- a/server/src/services/analytic_service/analytic_unit/types.rs +++ b/server/src/services/analytic_service/analytic_unit/types.rs @@ -128,6 +128,7 @@ pub trait AnalyticUnit { ) -> anyhow::Result>; fn set_config(&mut self, c: AnalyticUnitConfig); + async fn get_hsr(&self, ms: MetricService, from: u64, to: u64) -> anyhow::Result> ; } #[derive(Deserialize, Serialize, Debug)] diff --git a/server/src/services/analytic_service/types.rs b/server/src/services/analytic_service/types.rs index 9654e65..fd39126 100644 --- a/server/src/services/analytic_service/types.rs +++ b/server/src/services/analytic_service/types.rs @@ -61,6 +61,20 @@ pub struct DetectionTask { pub to: u64, } +#[derive(Debug)] +pub struct HSRTask { + // TODO: make enum for HSR which is different for different Analytic Types + pub sender: oneshot::Sender>>, + pub from: u64, + pub to: u64, +} + +#[derive(Debug)] +pub enum LearningWaiter { + Detection(DetectionTask), + HSR(HSRTask) +} + #[derive(Debug)] pub struct DetectionRunnerConfig { // pub sender: mpsc::Sender>>, @@ -72,6 +86,7 @@ pub struct DetectionRunnerConfig { pub enum RequestType { // TODO: convert to result RunLearning(anyhow::Result<()>) RunLearning, + GetHSR(HSRTask), RunDetection(DetectionTask), GetStatus(oneshot::Sender), // TODO: make type of Value