From 1af18e1118ff9df3c26d6eb9471198dfc2429eb2 Mon Sep 17 00:00:00 2001 From: Alexey Velikiy Date: Fri, 24 Dec 2021 12:02:08 +0300 Subject: [PATCH] rm 'same_type' in pathc config and patching continue --- .../analytic_service/analytic_service.rs | 63 ++++++------------- .../analytic_service/analytic_unit/types.rs | 38 ++++++----- server/src/services/analytic_unit_service.rs | 25 ++++++++ 3 files changed, 69 insertions(+), 57 deletions(-) diff --git a/server/src/services/analytic_service/analytic_service.rs b/server/src/services/analytic_service/analytic_service.rs index b8dacdf..582e097 100644 --- a/server/src/services/analytic_service/analytic_service.rs +++ b/server/src/services/analytic_service/analytic_service.rs @@ -274,53 +274,30 @@ impl AnalyticService { } fn patch_config(&mut self, patch: PatchConfig, tx: oneshot::Sender<()>) { - let (new_conf, need_learning, same_type) = self.analytic_unit_config.patch(patch); - self.analytic_unit_config = new_conf.clone(); - if need_learning { - self.consume_request(RequestType::RunLearning); - // TODO: it's not fully correct: we need to wait when the learning starts - match tx.send(()) { - Ok(_) => {} - Err(_e) => { - println!("Can`t send patch config notification"); - } - } - } else { - if self.analytic_unit.is_some() { - tokio::spawn({ - let au = self.analytic_unit.clone(); - let cfg = self.analytic_unit_config.clone(); - async move { - au.unwrap().write().await.set_config(cfg); - match tx.send(()) { - Ok(_) => {} - Err(_e) => { - println!("Can`t send patch config notification"); - } - } - } - }); - } else { - match tx.send(()) { - Ok(_) => {} - Err(_e) => { - println!("Can`t send patch config notification"); - } - } + + let my_id = self.analytic_unit_service.get_config_id(&self.analytic_unit_config); + let patch_id = patch.get_type_id(); + + println!("my id: {}", my_id); + println!("patch id: {}", patch_id); + + println!("equals: {}", my_id == patch_id); + + // TODO: update analytic_unit config if some + // TODO: save updated + // TODO: run learning when different + // TODO: run learning when it's necessary + + + match tx.send(()) { + Ok(_) => {} + Err(_e) => { + println!("Can`t send patch config notification"); } } - - if same_type { - // TODO: avoid using `unwrap` - self.analytic_unit_service.update_active_config(&new_conf).unwrap(); - } else { - // TODO: it's a hack, make it a better way - // TODO: avoid using unwrap - self.analytic_unit_service.resolve(&new_conf).unwrap(); - self.analytic_unit_service.update_active_config(&new_conf).unwrap(); - } + } pub async fn serve(&mut self) { diff --git a/server/src/services/analytic_service/analytic_unit/types.rs b/server/src/services/analytic_service/analytic_unit/types.rs index 95f9b06..cdbed87 100644 --- a/server/src/services/analytic_service/analytic_unit/types.rs +++ b/server/src/services/analytic_service/analytic_unit/types.rs @@ -63,23 +63,23 @@ pub enum AnalyticUnitConfig { } impl AnalyticUnitConfig { - // return true if need needs relearning and true if the config of the same type - pub fn patch(&self, patch: PatchConfig) -> (AnalyticUnitConfig, bool, bool) { + // return true if need needs relearning + pub fn patch(&self, patch: PatchConfig) -> (AnalyticUnitConfig, bool) { match patch { PatchConfig::Pattern(tcfg) => match self.clone() { AnalyticUnitConfig::Pattern(_) => { if tcfg.is_some() { - return (AnalyticUnitConfig::Pattern(tcfg.unwrap()), false, true); + return (AnalyticUnitConfig::Pattern(tcfg.unwrap()), false); } else { // TODO: it should be extraced from db - return (AnalyticUnitConfig::Pattern(Default::default()), false, true); + return (AnalyticUnitConfig::Pattern(Default::default()), false); } } _ => { if tcfg.is_some() { - return (AnalyticUnitConfig::Pattern(tcfg.unwrap()), true, false); + return (AnalyticUnitConfig::Pattern(tcfg.unwrap()), true); } else { - return (AnalyticUnitConfig::Pattern(Default::default()), true, false); + return (AnalyticUnitConfig::Pattern(Default::default()), true); } } }, @@ -90,16 +90,16 @@ impl AnalyticUnitConfig { let t = tcfg.as_ref().unwrap(); let mut need_learning = t.seasonality != scfg.seasonality; need_learning |= t.seasonality_iterations != scfg.seasonality_iterations; - return (AnalyticUnitConfig::Anomaly(tcfg.unwrap()), need_learning, true); + return (AnalyticUnitConfig::Anomaly(tcfg.unwrap()), need_learning); } else { - return (AnalyticUnitConfig::Anomaly(Default::default()), false, true); + return (AnalyticUnitConfig::Anomaly(Default::default()), false); } } _ => { if tcfg.is_some() { - return (AnalyticUnitConfig::Anomaly(tcfg.unwrap()), true, false); + return (AnalyticUnitConfig::Anomaly(tcfg.unwrap()), true); } else { - return (AnalyticUnitConfig::Anomaly(Default::default()), true, false); + return (AnalyticUnitConfig::Anomaly(Default::default()), true); } } }, @@ -107,16 +107,16 @@ impl AnalyticUnitConfig { PatchConfig::Threshold(tcfg) => match self.clone() { AnalyticUnitConfig::Threshold(_) => { if tcfg.is_some() { - return (AnalyticUnitConfig::Threshold(tcfg.unwrap()), false, true); + return (AnalyticUnitConfig::Threshold(tcfg.unwrap()), false); } else { - return (AnalyticUnitConfig::Threshold(Default::default()), false, true); + return (AnalyticUnitConfig::Threshold(Default::default()), false); } } _ => { if tcfg.is_some() { - return (AnalyticUnitConfig::Threshold(tcfg.unwrap()), true, false); + return (AnalyticUnitConfig::Threshold(tcfg.unwrap()), true); } else { - return (AnalyticUnitConfig::Threshold(Default::default()), true, false); + return (AnalyticUnitConfig::Threshold(Default::default()), true); } } }, @@ -155,3 +155,13 @@ pub enum PatchConfig { Threshold(Option), Anomaly(Option), } + +impl PatchConfig { + pub fn get_type_id(&self) -> String { + match &self { + PatchConfig::Threshold(_) => "1".to_string(), + PatchConfig::Pattern(_) => "2".to_string(), + PatchConfig::Anomaly(_) => "3".to_string() + } + } +} \ No newline at end of file diff --git a/server/src/services/analytic_unit_service.rs b/server/src/services/analytic_unit_service.rs index bd7de52..08ac3fc 100644 --- a/server/src/services/analytic_unit_service.rs +++ b/server/src/services/analytic_unit_service.rs @@ -127,6 +127,31 @@ impl AnalyticUnitService { } } + pub fn get_config_by_id(&self) { + // TODO: implement + } + + pub fn get_config_id(&self, cfg: &AnalyticUnitConfig) -> String { + match cfg { + AnalyticUnitConfig::Threshold(_) => "1".to_string(), + AnalyticUnitConfig::Pattern(_) => "2".to_string(), + AnalyticUnitConfig::Anomaly(_) => "3".to_string(), + } + } + + pub fn update_config_by_id(&self, id: &String, cfg: &AnalyticUnitConfig) -> anyhow::Result<()> { + let conn = self.connection.lock().unwrap(); + + let cfg_json = serde_json::to_string(&cfg)?; + + conn.execute( + "UPDATE analytic_unit SET config = ?1 WHERE id = ?2", + params![cfg_json, id] + )?; + + return Ok(()); + } + pub fn update_active_config(&self, cfg: &AnalyticUnitConfig) -> anyhow::Result<()> { let conn = self.connection.lock().unwrap();