From df3be270b8083d0c55c6d770f1ed6f2ccc029f8a Mon Sep 17 00:00:00 2001 From: Alexey Velikiy Date: Fri, 24 Dec 2021 11:28:45 +0300 Subject: [PATCH 1/6] better patch begin --- server/src/services/analytic_service/analytic_service.rs | 1 - server/src/services/analytic_service/analytic_unit/types.rs | 1 + 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/services/analytic_service/analytic_service.rs b/server/src/services/analytic_service/analytic_service.rs index 7c007dd..b8dacdf 100644 --- a/server/src/services/analytic_service/analytic_service.rs +++ b/server/src/services/analytic_service/analytic_service.rs @@ -274,7 +274,6 @@ impl AnalyticService { } fn patch_config(&mut self, patch: PatchConfig, tx: oneshot::Sender<()>) { - let (new_conf, need_learning, same_type) = self.analytic_unit_config.patch(patch); self.analytic_unit_config = new_conf.clone(); if need_learning { diff --git a/server/src/services/analytic_service/analytic_unit/types.rs b/server/src/services/analytic_service/analytic_unit/types.rs index 2ae8221..95f9b06 100644 --- a/server/src/services/analytic_service/analytic_unit/types.rs +++ b/server/src/services/analytic_service/analytic_unit/types.rs @@ -71,6 +71,7 @@ impl AnalyticUnitConfig { if tcfg.is_some() { return (AnalyticUnitConfig::Pattern(tcfg.unwrap()), false, true); } else { + // TODO: it should be extraced from db return (AnalyticUnitConfig::Pattern(Default::default()), false, true); } } From 1af18e1118ff9df3c26d6eb9471198dfc2429eb2 Mon Sep 17 00:00:00 2001 From: Alexey Velikiy Date: Fri, 24 Dec 2021 12:02:08 +0300 Subject: [PATCH 2/6] rm 'same_type' in pathc config and patching continue --- .../analytic_service/analytic_service.rs | 63 ++++++------------- .../analytic_service/analytic_unit/types.rs | 38 ++++++----- server/src/services/analytic_unit_service.rs | 25 ++++++++ 3 files changed, 69 insertions(+), 57 deletions(-) diff --git a/server/src/services/analytic_service/analytic_service.rs b/server/src/services/analytic_service/analytic_service.rs index b8dacdf..582e097 100644 --- a/server/src/services/analytic_service/analytic_service.rs +++ b/server/src/services/analytic_service/analytic_service.rs @@ -274,53 +274,30 @@ impl AnalyticService { } fn patch_config(&mut self, patch: PatchConfig, tx: oneshot::Sender<()>) { - let (new_conf, need_learning, same_type) = self.analytic_unit_config.patch(patch); - self.analytic_unit_config = new_conf.clone(); - if need_learning { - self.consume_request(RequestType::RunLearning); - // TODO: it's not fully correct: we need to wait when the learning starts - match tx.send(()) { - Ok(_) => {} - Err(_e) => { - println!("Can`t send patch config notification"); - } - } - } else { - if self.analytic_unit.is_some() { - tokio::spawn({ - let au = self.analytic_unit.clone(); - let cfg = self.analytic_unit_config.clone(); - async move { - au.unwrap().write().await.set_config(cfg); - match tx.send(()) { - Ok(_) => {} - Err(_e) => { - println!("Can`t send patch config notification"); - } - } - } - }); - } else { - match tx.send(()) { - Ok(_) => {} - Err(_e) => { - println!("Can`t send patch config notification"); - } - } + + let my_id = self.analytic_unit_service.get_config_id(&self.analytic_unit_config); + let patch_id = patch.get_type_id(); + + println!("my id: {}", my_id); + println!("patch id: {}", patch_id); + + println!("equals: {}", my_id == patch_id); + + // TODO: update analytic_unit config if some + // TODO: save updated + // TODO: run learning when different + // TODO: run learning when it's necessary + + + match tx.send(()) { + Ok(_) => {} + Err(_e) => { + println!("Can`t send patch config notification"); } } - - if same_type { - // TODO: avoid using `unwrap` - self.analytic_unit_service.update_active_config(&new_conf).unwrap(); - } else { - // TODO: it's a hack, make it a better way - // TODO: avoid using unwrap - self.analytic_unit_service.resolve(&new_conf).unwrap(); - self.analytic_unit_service.update_active_config(&new_conf).unwrap(); - } + } pub async fn serve(&mut self) { diff --git a/server/src/services/analytic_service/analytic_unit/types.rs b/server/src/services/analytic_service/analytic_unit/types.rs index 95f9b06..cdbed87 100644 --- a/server/src/services/analytic_service/analytic_unit/types.rs +++ b/server/src/services/analytic_service/analytic_unit/types.rs @@ -63,23 +63,23 @@ pub enum AnalyticUnitConfig { } impl AnalyticUnitConfig { - // return true if need needs relearning and true if the config of the same type - pub fn patch(&self, patch: PatchConfig) -> (AnalyticUnitConfig, bool, bool) { + // return true if need needs relearning + pub fn patch(&self, patch: PatchConfig) -> (AnalyticUnitConfig, bool) { match patch { PatchConfig::Pattern(tcfg) => match self.clone() { AnalyticUnitConfig::Pattern(_) => { if tcfg.is_some() { - return (AnalyticUnitConfig::Pattern(tcfg.unwrap()), false, true); + return (AnalyticUnitConfig::Pattern(tcfg.unwrap()), false); } else { // TODO: it should be extraced from db - return (AnalyticUnitConfig::Pattern(Default::default()), false, true); + return (AnalyticUnitConfig::Pattern(Default::default()), false); } } _ => { if tcfg.is_some() { - return (AnalyticUnitConfig::Pattern(tcfg.unwrap()), true, false); + return (AnalyticUnitConfig::Pattern(tcfg.unwrap()), true); } else { - return (AnalyticUnitConfig::Pattern(Default::default()), true, false); + return (AnalyticUnitConfig::Pattern(Default::default()), true); } } }, @@ -90,16 +90,16 @@ impl AnalyticUnitConfig { let t = tcfg.as_ref().unwrap(); let mut need_learning = t.seasonality != scfg.seasonality; need_learning |= t.seasonality_iterations != scfg.seasonality_iterations; - return (AnalyticUnitConfig::Anomaly(tcfg.unwrap()), need_learning, true); + return (AnalyticUnitConfig::Anomaly(tcfg.unwrap()), need_learning); } else { - return (AnalyticUnitConfig::Anomaly(Default::default()), false, true); + return (AnalyticUnitConfig::Anomaly(Default::default()), false); } } _ => { if tcfg.is_some() { - return (AnalyticUnitConfig::Anomaly(tcfg.unwrap()), true, false); + return (AnalyticUnitConfig::Anomaly(tcfg.unwrap()), true); } else { - return (AnalyticUnitConfig::Anomaly(Default::default()), true, false); + return (AnalyticUnitConfig::Anomaly(Default::default()), true); } } }, @@ -107,16 +107,16 @@ impl AnalyticUnitConfig { PatchConfig::Threshold(tcfg) => match self.clone() { AnalyticUnitConfig::Threshold(_) => { if tcfg.is_some() { - return (AnalyticUnitConfig::Threshold(tcfg.unwrap()), false, true); + return (AnalyticUnitConfig::Threshold(tcfg.unwrap()), false); } else { - return (AnalyticUnitConfig::Threshold(Default::default()), false, true); + return (AnalyticUnitConfig::Threshold(Default::default()), false); } } _ => { if tcfg.is_some() { - return (AnalyticUnitConfig::Threshold(tcfg.unwrap()), true, false); + return (AnalyticUnitConfig::Threshold(tcfg.unwrap()), true); } else { - return (AnalyticUnitConfig::Threshold(Default::default()), true, false); + return (AnalyticUnitConfig::Threshold(Default::default()), true); } } }, @@ -155,3 +155,13 @@ pub enum PatchConfig { Threshold(Option), Anomaly(Option), } + +impl PatchConfig { + pub fn get_type_id(&self) -> String { + match &self { + PatchConfig::Threshold(_) => "1".to_string(), + PatchConfig::Pattern(_) => "2".to_string(), + PatchConfig::Anomaly(_) => "3".to_string() + } + } +} \ No newline at end of file diff --git a/server/src/services/analytic_unit_service.rs b/server/src/services/analytic_unit_service.rs index bd7de52..08ac3fc 100644 --- a/server/src/services/analytic_unit_service.rs +++ b/server/src/services/analytic_unit_service.rs @@ -127,6 +127,31 @@ impl AnalyticUnitService { } } + pub fn get_config_by_id(&self) { + // TODO: implement + } + + pub fn get_config_id(&self, cfg: &AnalyticUnitConfig) -> String { + match cfg { + AnalyticUnitConfig::Threshold(_) => "1".to_string(), + AnalyticUnitConfig::Pattern(_) => "2".to_string(), + AnalyticUnitConfig::Anomaly(_) => "3".to_string(), + } + } + + pub fn update_config_by_id(&self, id: &String, cfg: &AnalyticUnitConfig) -> anyhow::Result<()> { + let conn = self.connection.lock().unwrap(); + + let cfg_json = serde_json::to_string(&cfg)?; + + conn.execute( + "UPDATE analytic_unit SET config = ?1 WHERE id = ?2", + params![cfg_json, id] + )?; + + return Ok(()); + } + pub fn update_active_config(&self, cfg: &AnalyticUnitConfig) -> anyhow::Result<()> { let conn = self.connection.lock().unwrap(); From d7b283f1ff6a9d2d2bc3ab3ff12aadac7520f2d9 Mon Sep 17 00:00:00 2001 From: Alexey Velikiy Date: Fri, 24 Dec 2021 12:33:29 +0300 Subject: [PATCH 3/6] basic save of analytic_unit --- .../analytic_service/analytic_service.rs | 42 ++++++++++++++++--- .../analytic_service/analytic_unit/types.rs | 8 ++++ 2 files changed, 44 insertions(+), 6 deletions(-) diff --git a/server/src/services/analytic_service/analytic_service.rs b/server/src/services/analytic_service/analytic_service.rs index 582e097..9f023c2 100644 --- a/server/src/services/analytic_service/analytic_service.rs +++ b/server/src/services/analytic_service/analytic_service.rs @@ -283,18 +283,48 @@ impl AnalyticService { println!("equals: {}", my_id == patch_id); + let same_type = my_id == patch_id; + + if same_type { + let new_conf = patch.get_new_config(); + self.analytic_unit_config = new_conf.clone(); + self.analytic_unit_service.update_config_by_id(&my_id, &new_conf).unwrap(); + if self.analytic_unit.is_some() { + tokio::spawn({ + let au = self.analytic_unit.clone(); + let cfg = self.analytic_unit_config.clone(); + async move { + au.unwrap().write().await.set_config(cfg); + match tx.send(()) { + Ok(_) => {} + Err(_e) => { + println!("Can`t send patch config notification"); + } + } + } + }); + } else { + // TODO: implement + } + } else { + // TODO: extracdt from db + + match tx.send(()) { + Ok(_) => {} + Err(_e) => { + println!("Can`t send patch config notification"); + } + } + } + + // TODO: update analytic_unit config if some // TODO: save updated // TODO: run learning when different // TODO: run learning when it's necessary - match tx.send(()) { - Ok(_) => {} - Err(_e) => { - println!("Can`t send patch config notification"); - } - } + diff --git a/server/src/services/analytic_service/analytic_unit/types.rs b/server/src/services/analytic_service/analytic_unit/types.rs index cdbed87..4f3a9fb 100644 --- a/server/src/services/analytic_service/analytic_unit/types.rs +++ b/server/src/services/analytic_service/analytic_unit/types.rs @@ -164,4 +164,12 @@ impl PatchConfig { PatchConfig::Anomaly(_) => "3".to_string() } } + + pub fn get_new_config(&self) -> AnalyticUnitConfig { + match &self { + PatchConfig::Threshold(cfg) => AnalyticUnitConfig::Threshold(cfg.as_ref().unwrap().clone()), + PatchConfig::Pattern(cfg) => AnalyticUnitConfig::Pattern(cfg.as_ref().unwrap().clone()), + PatchConfig::Anomaly(cfg) => AnalyticUnitConfig::Anomaly(cfg.as_ref().unwrap().clone()) + } + } } \ No newline at end of file From a200e3d4501c095d78fd9e262e76f934313c9b00 Mon Sep 17 00:00:00 2001 From: Alexey Velikiy Date: Fri, 24 Dec 2021 12:55:07 +0300 Subject: [PATCH 4/6] basic seving of new analytic unit config --- .../analytic_service/analytic_service.rs | 18 +++++++----- .../analytic_service/analytic_unit/types.rs | 12 +++++++- server/src/services/analytic_unit_service.rs | 29 +++++++++++++++++-- 3 files changed, 49 insertions(+), 10 deletions(-) diff --git a/server/src/services/analytic_service/analytic_service.rs b/server/src/services/analytic_service/analytic_service.rs index 9f023c2..1c482ba 100644 --- a/server/src/services/analytic_service/analytic_service.rs +++ b/server/src/services/analytic_service/analytic_service.rs @@ -278,14 +278,10 @@ impl AnalyticService { let my_id = self.analytic_unit_service.get_config_id(&self.analytic_unit_config); let patch_id = patch.get_type_id(); - println!("my id: {}", my_id); - println!("patch id: {}", patch_id); - - println!("equals: {}", my_id == patch_id); - let same_type = my_id == patch_id; if same_type { + // TODO: check when learning should be started let new_conf = patch.get_new_config(); self.analytic_unit_config = new_conf.clone(); self.analytic_unit_service.update_config_by_id(&my_id, &new_conf).unwrap(); @@ -304,11 +300,19 @@ impl AnalyticService { } }); } else { - // TODO: implement + // TODO: check if we need this else + match tx.send(()) { + Ok(_) => {} + Err(_e) => { + println!("Can`t send patch config notification"); + } + } } } else { // TODO: extracdt from db - + let new_conf = self.analytic_unit_service.get_config_by_id(&patch_id).unwrap(); + self.analytic_unit_config = new_conf.clone(); + self.consume_request(RequestType::RunLearning); match tx.send(()) { Ok(_) => {} Err(_e) => { diff --git a/server/src/services/analytic_service/analytic_unit/types.rs b/server/src/services/analytic_service/analytic_unit/types.rs index 4f3a9fb..8b84a97 100644 --- a/server/src/services/analytic_service/analytic_unit/types.rs +++ b/server/src/services/analytic_service/analytic_unit/types.rs @@ -57,12 +57,22 @@ impl Default for ThresholdConfig { #[derive(Debug, Serialize, Deserialize, Clone)] pub enum AnalyticUnitConfig { - Pattern(PatternConfig), Threshold(ThresholdConfig), + Pattern(PatternConfig), Anomaly(AnomalyConfig), } impl AnalyticUnitConfig { + + pub fn get_default_by_id(id: &String) -> AnalyticUnitConfig { + let iid = id.as_str(); + match iid { + "1" => AnalyticUnitConfig::Threshold(Default::default()), + "2" => AnalyticUnitConfig::Pattern(Default::default()), + "3" => AnalyticUnitConfig::Anomaly(Default::default()), + _ => panic!("bad id for getting get_default_by_id") + } + } // return true if need needs relearning pub fn patch(&self, patch: PatchConfig) -> (AnalyticUnitConfig, bool) { match patch { diff --git a/server/src/services/analytic_unit_service.rs b/server/src/services/analytic_unit_service.rs index 08ac3fc..f769e46 100644 --- a/server/src/services/analytic_unit_service.rs +++ b/server/src/services/analytic_unit_service.rs @@ -127,8 +127,31 @@ impl AnalyticUnitService { } } - pub fn get_config_by_id(&self) { - // TODO: implement + pub fn get_config_by_id(&self, id: &String) -> anyhow::Result { + let exists = { + let conn = self.connection.lock().unwrap(); + let mut stmt = conn.prepare( + "SELECT config from analytic_unit WHERE id = ?1" + )?; + stmt.exists([id])? + }; + + if exists == false { + let c = AnalyticUnitConfig::get_default_by_id(id); + self.resolve(&c)?; + return Ok(c); + } else { + let conn = self.connection.lock().unwrap(); + let mut stmt = conn.prepare( + "SELECT config from analytic_unit WHERE id = ?1" + )?; + let acfg = stmt.query_row([id], |row| { + let c: String = row.get(0)?; + let cfg = serde_json::from_str(&c).unwrap(); + Ok(cfg) + })?; + return Ok(acfg); + } } pub fn get_config_id(&self, cfg: &AnalyticUnitConfig) -> String { @@ -140,6 +163,8 @@ impl AnalyticUnitService { } pub fn update_config_by_id(&self, id: &String, cfg: &AnalyticUnitConfig) -> anyhow::Result<()> { + + // TODO: it's possble that config doesn't exist, but we trying to update it let conn = self.connection.lock().unwrap(); let cfg_json = serde_json::to_string(&cfg)?; From 21f4b6eaaa0a1568763b2ecbf27ef9690ffcfe80 Mon Sep 17 00:00:00 2001 From: Alexey Velikiy Date: Fri, 24 Dec 2021 13:33:21 +0300 Subject: [PATCH 5/6] need_learning on patch case --- .../analytic_service/analytic_service.rs | 52 ++++++++++--------- .../analytic_service/analytic_unit/types.rs | 41 +++++++++++++++ 2 files changed, 69 insertions(+), 24 deletions(-) diff --git a/server/src/services/analytic_service/analytic_service.rs b/server/src/services/analytic_service/analytic_service.rs index 1c482ba..ea3df6f 100644 --- a/server/src/services/analytic_service/analytic_service.rs +++ b/server/src/services/analytic_service/analytic_service.rs @@ -276,29 +276,46 @@ impl AnalyticService { fn patch_config(&mut self, patch: PatchConfig, tx: oneshot::Sender<()>) { let my_id = self.analytic_unit_service.get_config_id(&self.analytic_unit_config); + let patch_id = patch.get_type_id(); let same_type = my_id == patch_id; + + // TODO: need_learning and same_type logic overlaps, there is a way to optimise this + let need_learning = self.analytic_unit_config.patch_needs_learning(&patch); + if same_type { // TODO: check when learning should be started let new_conf = patch.get_new_config(); self.analytic_unit_config = new_conf.clone(); self.analytic_unit_service.update_config_by_id(&my_id, &new_conf).unwrap(); + if self.analytic_unit.is_some() { - tokio::spawn({ - let au = self.analytic_unit.clone(); - let cfg = self.analytic_unit_config.clone(); - async move { - au.unwrap().write().await.set_config(cfg); - match tx.send(()) { - Ok(_) => {} - Err(_e) => { - println!("Can`t send patch config notification"); - } + if need_learning { + self.consume_request(RequestType::RunLearning); + match tx.send(()) { + Ok(_) => {} + Err(_e) => { + println!("Can`t send patch config notification"); } } - }); + return; + } else { + tokio::spawn({ + let au = self.analytic_unit.clone(); + let cfg = self.analytic_unit_config.clone(); + async move { + au.unwrap().write().await.set_config(cfg); + match tx.send(()) { + Ok(_) => {} + Err(_e) => { + println!("Can`t send patch config notification"); + } + } + } + }); + } } else { // TODO: check if we need this else match tx.send(()) { @@ -309,7 +326,6 @@ impl AnalyticService { } } } else { - // TODO: extracdt from db let new_conf = self.analytic_unit_service.get_config_by_id(&patch_id).unwrap(); self.analytic_unit_config = new_conf.clone(); self.consume_request(RequestType::RunLearning); @@ -320,18 +336,6 @@ impl AnalyticService { } } } - - - // TODO: update analytic_unit config if some - // TODO: save updated - // TODO: run learning when different - // TODO: run learning when it's necessary - - - - - - } pub async fn serve(&mut self) { diff --git a/server/src/services/analytic_service/analytic_unit/types.rs b/server/src/services/analytic_service/analytic_unit/types.rs index 8b84a97..f42d40a 100644 --- a/server/src/services/analytic_service/analytic_unit/types.rs +++ b/server/src/services/analytic_service/analytic_unit/types.rs @@ -73,6 +73,47 @@ impl AnalyticUnitConfig { _ => panic!("bad id for getting get_default_by_id") } } + + pub fn patch_needs_learning(&self, patch: &PatchConfig) -> bool { + // TODO: maybe use type id's to optimise code + match patch { + PatchConfig::Pattern(tcfg) => match self.clone() { + AnalyticUnitConfig::Pattern(_) => { + return false; + } + _ => { + return true + } + }, + + PatchConfig::Anomaly(tcfg) => match self.clone() { + AnalyticUnitConfig::Anomaly(scfg) => { + if tcfg.is_some() { + let t = tcfg.as_ref().unwrap(); + let mut need_learning = t.seasonality != scfg.seasonality; + need_learning |= t.seasonality_iterations != scfg.seasonality_iterations; + return need_learning; + } else { + return false; + } + } + _ => { + return true; + } + }, + + PatchConfig::Threshold(tcfg) => match self.clone() { + AnalyticUnitConfig::Threshold(_) => { + return false; + } + _ => { + return true; + } + }, + } + } + + // TODO: maybe this method depricated // return true if need needs relearning pub fn patch(&self, patch: PatchConfig) -> (AnalyticUnitConfig, bool) { match patch { From 81069e9782b5593d9086baca52bb91360c429264 Mon Sep 17 00:00:00 2001 From: Alexey Velikiy Date: Fri, 24 Dec 2021 13:39:34 +0300 Subject: [PATCH 6/6] code format --- server/src/main.rs | 2 +- .../analytic_service/analytic_service.rs | 24 ++++-- .../analytic_unit/anomaly_analytic_unit.rs | 5 +- .../analytic_service/analytic_unit/mod.rs | 1 - .../analytic_service/analytic_unit/types.rs | 19 +++-- .../analytic_service/detection_runner.rs | 21 ++--- server/src/services/analytic_unit_service.rs | 76 ++++++++++--------- server/src/services/mod.rs | 2 +- 8 files changed, 83 insertions(+), 67 deletions(-) diff --git a/server/src/main.rs b/server/src/main.rs index 03b5b47..a9ed0cd 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -1,6 +1,6 @@ mod api; -use hastic::services::{analytic_service, metric_service, segments_service, analytic_unit_service}; +use hastic::services::{analytic_service, analytic_unit_service, metric_service, segments_service}; use anyhow; diff --git a/server/src/services/analytic_service/analytic_service.rs b/server/src/services/analytic_service/analytic_service.rs index ea3df6f..94fc5e6 100644 --- a/server/src/services/analytic_service/analytic_service.rs +++ b/server/src/services/analytic_service/analytic_service.rs @@ -59,7 +59,6 @@ impl AnalyticService { segments_service: segments_service::SegmentsService, alerting: Option, ) -> AnalyticService { - // TODO: move buffer size to config let (tx, rx) = mpsc::channel::(32); @@ -242,7 +241,9 @@ impl AnalyticService { println!("Detection runner started from {}", from) } ResponseType::DetectionRunnerUpdate(id, timestamp) => { - self.analytic_unit_service.set_last_detection(id, timestamp).unwrap(); + self.analytic_unit_service + .set_last_detection(id, timestamp) + .unwrap(); } ResponseType::LearningStarted => { self.analytic_unit_learning_status = LearningStatus::Learning @@ -274,14 +275,14 @@ impl AnalyticService { } fn patch_config(&mut self, patch: PatchConfig, tx: oneshot::Sender<()>) { + let my_id = self + .analytic_unit_service + .get_config_id(&self.analytic_unit_config); - let my_id = self.analytic_unit_service.get_config_id(&self.analytic_unit_config); - let patch_id = patch.get_type_id(); let same_type = my_id == patch_id; - // TODO: need_learning and same_type logic overlaps, there is a way to optimise this let need_learning = self.analytic_unit_config.patch_needs_learning(&patch); @@ -289,7 +290,9 @@ impl AnalyticService { // TODO: check when learning should be started let new_conf = patch.get_new_config(); self.analytic_unit_config = new_conf.clone(); - self.analytic_unit_service.update_config_by_id(&my_id, &new_conf).unwrap(); + self.analytic_unit_service + .update_config_by_id(&my_id, &new_conf) + .unwrap(); if self.analytic_unit.is_some() { if need_learning { @@ -326,7 +329,10 @@ impl AnalyticService { } } } else { - let new_conf = self.analytic_unit_service.get_config_by_id(&patch_id).unwrap(); + let new_conf = self + .analytic_unit_service + .get_config_by_id(&patch_id) + .unwrap(); self.analytic_unit_config = new_conf.clone(); self.consume_request(RequestType::RunLearning); match tx.send(()) { @@ -365,7 +371,9 @@ impl AnalyticService { ) { let mut au = match aus.resolve(&aucfg) { Ok(a) => a, - Err(e) => { panic!("{}", e); } + Err(e) => { + panic!("{}", e); + } }; match tx diff --git a/server/src/services/analytic_service/analytic_unit/anomaly_analytic_unit.rs b/server/src/services/analytic_service/analytic_unit/anomaly_analytic_unit.rs index a33202d..98a3373 100644 --- a/server/src/services/analytic_service/analytic_unit/anomaly_analytic_unit.rs +++ b/server/src/services/analytic_service/analytic_unit/anomaly_analytic_unit.rs @@ -50,7 +50,10 @@ impl SARIMA { // TODO: trend detection if ts.len() < 2 { - return Err(anyhow::format_err!("too short timeserie to learn from, timeserie length: {}", ts.len())); + return Err(anyhow::format_err!( + "too short timeserie to learn from, timeserie length: {}", + ts.len() + )); } // TODO: ensure capacity with seasonality size let mut res_ts = Vec::<(u64, f64)>::new(); diff --git a/server/src/services/analytic_service/analytic_unit/mod.rs b/server/src/services/analytic_service/analytic_unit/mod.rs index 865bc99..ebb73c9 100644 --- a/server/src/services/analytic_service/analytic_unit/mod.rs +++ b/server/src/services/analytic_service/analytic_unit/mod.rs @@ -7,4 +7,3 @@ use self::{ anomaly_analytic_unit::AnomalyAnalyticUnit, pattern_analytic_unit::PatternAnalyticUnit, threshold_analytic_unit::ThresholdAnalyticUnit, types::AnalyticUnitConfig, }; - diff --git a/server/src/services/analytic_service/analytic_unit/types.rs b/server/src/services/analytic_service/analytic_unit/types.rs index f42d40a..0a043f4 100644 --- a/server/src/services/analytic_service/analytic_unit/types.rs +++ b/server/src/services/analytic_service/analytic_unit/types.rs @@ -63,14 +63,13 @@ pub enum AnalyticUnitConfig { } impl AnalyticUnitConfig { - pub fn get_default_by_id(id: &String) -> AnalyticUnitConfig { let iid = id.as_str(); match iid { "1" => AnalyticUnitConfig::Threshold(Default::default()), "2" => AnalyticUnitConfig::Pattern(Default::default()), "3" => AnalyticUnitConfig::Anomaly(Default::default()), - _ => panic!("bad id for getting get_default_by_id") + _ => panic!("bad id for getting get_default_by_id"), } } @@ -81,9 +80,7 @@ impl AnalyticUnitConfig { AnalyticUnitConfig::Pattern(_) => { return false; } - _ => { - return true - } + _ => return true, }, PatchConfig::Anomaly(tcfg) => match self.clone() { @@ -212,15 +209,17 @@ impl PatchConfig { match &self { PatchConfig::Threshold(_) => "1".to_string(), PatchConfig::Pattern(_) => "2".to_string(), - PatchConfig::Anomaly(_) => "3".to_string() + PatchConfig::Anomaly(_) => "3".to_string(), } } pub fn get_new_config(&self) -> AnalyticUnitConfig { match &self { - PatchConfig::Threshold(cfg) => AnalyticUnitConfig::Threshold(cfg.as_ref().unwrap().clone()), + PatchConfig::Threshold(cfg) => { + AnalyticUnitConfig::Threshold(cfg.as_ref().unwrap().clone()) + } PatchConfig::Pattern(cfg) => AnalyticUnitConfig::Pattern(cfg.as_ref().unwrap().clone()), - PatchConfig::Anomaly(cfg) => AnalyticUnitConfig::Anomaly(cfg.as_ref().unwrap().clone()) + PatchConfig::Anomaly(cfg) => AnalyticUnitConfig::Anomaly(cfg.as_ref().unwrap().clone()), } - } -} \ No newline at end of file + } +} diff --git a/server/src/services/analytic_service/detection_runner.rs b/server/src/services/analytic_service/detection_runner.rs index 6ec3e8e..66ff8a4 100644 --- a/server/src/services/analytic_service/detection_runner.rs +++ b/server/src/services/analytic_service/detection_runner.rs @@ -1,16 +1,14 @@ -use chrono::{Utc, DateTime}; +use chrono::{DateTime, Utc}; -use tokio::sync::{mpsc}; +use tokio::sync::mpsc; use crate::services::metric_service::MetricService; use super::types::{AnalyticServiceMessage, AnalyticUnitRF, DetectionRunnerConfig, ResponseType}; use tokio::time::{sleep, Duration}; - const DETECTION_STEP: u64 = 10; - pub struct DetectionRunner { metric_service: MetricService, tx: mpsc::Sender, @@ -76,11 +74,16 @@ impl DetectionRunner { // TODO: run detection periodically // TODO: set info about detections to tx - - match tx.send(AnalyticServiceMessage::Response(Ok( - ResponseType::DetectionRunnerUpdate(au.as_ref().read().await.get_id(), t_to) - ))).await { - Ok(_) => {}, + match tx + .send(AnalyticServiceMessage::Response(Ok( + ResponseType::DetectionRunnerUpdate( + au.as_ref().read().await.get_id(), + t_to, + ), + ))) + .await + { + Ok(_) => {} Err(_e) => println!("Fail to send detection runner started notification"), } diff --git a/server/src/services/analytic_unit_service.rs b/server/src/services/analytic_unit_service.rs index f769e46..cdcd84b 100644 --- a/server/src/services/analytic_unit_service.rs +++ b/server/src/services/analytic_unit_service.rs @@ -1,14 +1,19 @@ -use std::sync::{Arc, Mutex}; -use serde_json::{Result, Value}; use serde::{Deserialize, Serialize}; +use serde_json::{Result, Value}; +use std::sync::{Arc, Mutex}; use rusqlite::{params, Connection}; -use super::analytic_service::analytic_unit::{types::{AnalyticUnitConfig, self}, threshold_analytic_unit::ThresholdAnalyticUnit, pattern_analytic_unit::PatternAnalyticUnit, anomaly_analytic_unit::AnomalyAnalyticUnit}; +use super::analytic_service::analytic_unit::{ + anomaly_analytic_unit::AnomalyAnalyticUnit, + pattern_analytic_unit::PatternAnalyticUnit, + threshold_analytic_unit::ThresholdAnalyticUnit, + types::{self, AnalyticUnitConfig}, +}; #[derive(Clone)] pub struct AnalyticUnitService { - connection: Arc> + connection: Arc>, } impl AnalyticUnitService { @@ -35,40 +40,50 @@ impl AnalyticUnitService { } // TODO: optional id - pub fn resolve_au(&self, cfg: &AnalyticUnitConfig) -> Box { + pub fn resolve_au( + &self, + cfg: &AnalyticUnitConfig, + ) -> Box { match cfg { - AnalyticUnitConfig::Threshold(c) => Box::new(ThresholdAnalyticUnit::new("1".to_string(), c.clone())), - AnalyticUnitConfig::Pattern(c) => Box::new(PatternAnalyticUnit::new("2".to_string(), c.clone())), - AnalyticUnitConfig::Anomaly(c) => Box::new(AnomalyAnalyticUnit::new("3".to_string(), c.clone())), + AnalyticUnitConfig::Threshold(c) => { + Box::new(ThresholdAnalyticUnit::new("1".to_string(), c.clone())) + } + AnalyticUnitConfig::Pattern(c) => { + Box::new(PatternAnalyticUnit::new("2".to_string(), c.clone())) + } + AnalyticUnitConfig::Anomaly(c) => { + Box::new(AnomalyAnalyticUnit::new("3".to_string(), c.clone())) + } } } // TODO: get id of analytic_unit which be used also as it's type - pub fn resolve(&self, cfg: &AnalyticUnitConfig) -> anyhow::Result> { + pub fn resolve( + &self, + cfg: &AnalyticUnitConfig, + ) -> anyhow::Result> { let au = self.resolve_au(cfg); let id = au.as_ref().get_id(); let conn = self.connection.lock().unwrap(); - let mut stmt = conn.prepare( - "SELECT id from analytic_unit WHERE id = ?1", - )?; + let mut stmt = conn.prepare("SELECT id from analytic_unit WHERE id = ?1")?; let res = stmt.exists(params![id])?; if res == false { let cfg_json = serde_json::to_string(&cfg)?; conn.execute( - "INSERT INTO analytic_unit (id, type, config) VALUES (?1, ?1, ?2)", - params![id, cfg_json] + "INSERT INTO analytic_unit (id, type, config) VALUES (?1, ?1, ?2)", + params![id, cfg_json], )?; } conn.execute( "UPDATE analytic_unit set active = FALSE where active = TRUE", - params![] + params![], )?; conn.execute( "UPDATE analytic_unit set active = TRUE where id = ?1", - params![id] + params![id], )?; return Ok(au); @@ -78,7 +93,7 @@ impl AnalyticUnitService { let conn = self.connection.lock().unwrap(); conn.execute( "UPDATE analytic_unit SET last_detection = ?1 WHERE id = ?2", - params![last_detection, id] + params![last_detection, id], )?; Ok(()) } @@ -86,9 +101,8 @@ impl AnalyticUnitService { pub fn get_active(&self) -> anyhow::Result> { // TODO: return default when there is no active let conn = self.connection.lock().unwrap(); - let mut stmt = conn.prepare( - "SELECT id, type, config from analytic_unit WHERE active = TRUE" - )?; + let mut stmt = + conn.prepare("SELECT id, type, config from analytic_unit WHERE active = TRUE")?; let au = stmt.query_row([], |row| { let c: String = row.get(2)?; @@ -97,15 +111,12 @@ impl AnalyticUnitService { })??; return Ok(au); - } pub fn get_active_config(&self) -> anyhow::Result { let exists = { let conn = self.connection.lock().unwrap(); - let mut stmt = conn.prepare( - "SELECT config from analytic_unit WHERE active = TRUE" - )?; + let mut stmt = conn.prepare("SELECT config from analytic_unit WHERE active = TRUE")?; stmt.exists([])? }; @@ -115,9 +126,7 @@ impl AnalyticUnitService { return Ok(c); } else { let conn = self.connection.lock().unwrap(); - let mut stmt = conn.prepare( - "SELECT config from analytic_unit WHERE active = TRUE" - )?; + let mut stmt = conn.prepare("SELECT config from analytic_unit WHERE active = TRUE")?; let acfg = stmt.query_row([], |row| { let c: String = row.get(0)?; let cfg = serde_json::from_str(&c).unwrap(); @@ -130,9 +139,7 @@ impl AnalyticUnitService { pub fn get_config_by_id(&self, id: &String) -> anyhow::Result { let exists = { let conn = self.connection.lock().unwrap(); - let mut stmt = conn.prepare( - "SELECT config from analytic_unit WHERE id = ?1" - )?; + let mut stmt = conn.prepare("SELECT config from analytic_unit WHERE id = ?1")?; stmt.exists([id])? }; @@ -142,9 +149,7 @@ impl AnalyticUnitService { return Ok(c); } else { let conn = self.connection.lock().unwrap(); - let mut stmt = conn.prepare( - "SELECT config from analytic_unit WHERE id = ?1" - )?; + let mut stmt = conn.prepare("SELECT config from analytic_unit WHERE id = ?1")?; let acfg = stmt.query_row([id], |row| { let c: String = row.get(0)?; let cfg = serde_json::from_str(&c).unwrap(); @@ -163,7 +168,6 @@ impl AnalyticUnitService { } pub fn update_config_by_id(&self, id: &String, cfg: &AnalyticUnitConfig) -> anyhow::Result<()> { - // TODO: it's possble that config doesn't exist, but we trying to update it let conn = self.connection.lock().unwrap(); @@ -171,7 +175,7 @@ impl AnalyticUnitService { conn.execute( "UPDATE analytic_unit SET config = ?1 WHERE id = ?2", - params![cfg_json, id] + params![cfg_json, id], )?; return Ok(()); @@ -184,7 +188,7 @@ impl AnalyticUnitService { conn.execute( "UPDATE analytic_unit SET config = ?1 WHERE active = TRUE", - params![cfg_json] + params![cfg_json], )?; return Ok(()); diff --git a/server/src/services/mod.rs b/server/src/services/mod.rs index ea17ea9..2f122bf 100644 --- a/server/src/services/mod.rs +++ b/server/src/services/mod.rs @@ -1,5 +1,5 @@ pub mod analytic_service; +pub mod analytic_unit_service; pub mod metric_service; pub mod segments_service; pub mod user_service; -pub mod analytic_unit_service;