diff --git a/server/src/services/analytic_service/analytic_service.rs b/server/src/services/analytic_service/analytic_service.rs index 2156fa0..7c007dd 100644 --- a/server/src/services/analytic_service/analytic_service.rs +++ b/server/src/services/analytic_service/analytic_service.rs @@ -23,7 +23,7 @@ use crate::services::analytic_service::analytic_unit::types::{AnalyticUnit, Lear use anyhow; -use chrono::{DateTime, TimeZone, Utc}; +use chrono::{DateTime, Utc}; use tokio::sync::{mpsc, oneshot}; // TODO: now it's basically single analytic unit, service will operate on many AU @@ -59,18 +59,21 @@ impl AnalyticService { segments_service: segments_service::SegmentsService, alerting: Option, ) -> AnalyticService { + + // TODO: move buffer size to config let (tx, rx) = mpsc::channel::(32); + let aus = analytic_unit_service.clone(); + AnalyticService { - analytic_unit_service, + analytic_unit_service: aus, metric_service, segments_service, alerting, - // TODO: get it from persistance analytic_unit: None, - analytic_unit_config: AnalyticUnitConfig::Pattern(Default::default()), + analytic_unit_config: analytic_unit_service.get_active_config().unwrap(), analytic_unit_learning_status: LearningStatus::Initialization, tx, @@ -271,11 +274,12 @@ impl AnalyticService { } fn patch_config(&mut self, patch: PatchConfig, tx: oneshot::Sender<()>) { - let (new_conf, need_learning) = self.analytic_unit_config.patch(patch); - self.analytic_unit_config = new_conf; + + let (new_conf, need_learning, same_type) = self.analytic_unit_config.patch(patch); + self.analytic_unit_config = new_conf.clone(); if need_learning { self.consume_request(RequestType::RunLearning); - // TODO: it's not fullu correct: we need to wait when the learning starts + // TODO: it's not fully correct: we need to wait when the learning starts match tx.send(()) { Ok(_) => {} Err(_e) => { @@ -306,6 +310,18 @@ impl AnalyticService { } } } + + + if same_type { + // TODO: avoid using `unwrap` + self.analytic_unit_service.update_active_config(&new_conf).unwrap(); + } else { + // TODO: it's a hack, make it a better way + // TODO: avoid using unwrap + self.analytic_unit_service.resolve(&new_conf).unwrap(); + self.analytic_unit_service.update_active_config(&new_conf).unwrap(); + } + } pub async fn serve(&mut self) { @@ -333,7 +349,7 @@ impl AnalyticService { ms: MetricService, ss: SegmentsService, ) { - let mut au = match aus.resolve(aucfg) { + let mut au = match aus.resolve(&aucfg) { Ok(a) => a, Err(e) => { panic!("{}", e); } }; diff --git a/server/src/services/analytic_service/analytic_unit/types.rs b/server/src/services/analytic_service/analytic_unit/types.rs index b591d94..2ae8221 100644 --- a/server/src/services/analytic_service/analytic_unit/types.rs +++ b/server/src/services/analytic_service/analytic_unit/types.rs @@ -63,22 +63,22 @@ pub enum AnalyticUnitConfig { } impl AnalyticUnitConfig { - // return true if patch is different type - pub fn patch(&self, patch: PatchConfig) -> (AnalyticUnitConfig, bool) { + // return true if need needs relearning and true if the config of the same type + pub fn patch(&self, patch: PatchConfig) -> (AnalyticUnitConfig, bool, bool) { match patch { PatchConfig::Pattern(tcfg) => match self.clone() { AnalyticUnitConfig::Pattern(_) => { if tcfg.is_some() { - return (AnalyticUnitConfig::Pattern(tcfg.unwrap()), false); + return (AnalyticUnitConfig::Pattern(tcfg.unwrap()), false, true); } else { - return (AnalyticUnitConfig::Pattern(Default::default()), false); + return (AnalyticUnitConfig::Pattern(Default::default()), false, true); } } _ => { if tcfg.is_some() { - return (AnalyticUnitConfig::Pattern(tcfg.unwrap()), true); + return (AnalyticUnitConfig::Pattern(tcfg.unwrap()), true, false); } else { - return (AnalyticUnitConfig::Pattern(Default::default()), true); + return (AnalyticUnitConfig::Pattern(Default::default()), true, false); } } }, @@ -89,16 +89,16 @@ impl AnalyticUnitConfig { let t = tcfg.as_ref().unwrap(); let mut need_learning = t.seasonality != scfg.seasonality; need_learning |= t.seasonality_iterations != scfg.seasonality_iterations; - return (AnalyticUnitConfig::Anomaly(tcfg.unwrap()), need_learning); + return (AnalyticUnitConfig::Anomaly(tcfg.unwrap()), need_learning, true); } else { - return (AnalyticUnitConfig::Anomaly(Default::default()), false); + return (AnalyticUnitConfig::Anomaly(Default::default()), false, true); } } _ => { if tcfg.is_some() { - return (AnalyticUnitConfig::Anomaly(tcfg.unwrap()), true); + return (AnalyticUnitConfig::Anomaly(tcfg.unwrap()), true, false); } else { - return (AnalyticUnitConfig::Anomaly(Default::default()), true); + return (AnalyticUnitConfig::Anomaly(Default::default()), true, false); } } }, @@ -106,16 +106,16 @@ impl AnalyticUnitConfig { PatchConfig::Threshold(tcfg) => match self.clone() { AnalyticUnitConfig::Threshold(_) => { if tcfg.is_some() { - return (AnalyticUnitConfig::Threshold(tcfg.unwrap()), false); + return (AnalyticUnitConfig::Threshold(tcfg.unwrap()), false, true); } else { - return (AnalyticUnitConfig::Threshold(Default::default()), false); + return (AnalyticUnitConfig::Threshold(Default::default()), false, true); } } _ => { if tcfg.is_some() { - return (AnalyticUnitConfig::Threshold(tcfg.unwrap()), true); + return (AnalyticUnitConfig::Threshold(tcfg.unwrap()), true, false); } else { - return (AnalyticUnitConfig::Threshold(Default::default()), true); + return (AnalyticUnitConfig::Threshold(Default::default()), true, false); } } }, diff --git a/server/src/services/analytic_unit_service.rs b/server/src/services/analytic_unit_service.rs index 06532b1..bd7de52 100644 --- a/server/src/services/analytic_unit_service.rs +++ b/server/src/services/analytic_unit_service.rs @@ -1,4 +1,6 @@ use std::sync::{Arc, Mutex}; +use serde_json::{Result, Value}; +use serde::{Deserialize, Serialize}; use rusqlite::{params, Connection}; @@ -19,7 +21,10 @@ impl AnalyticUnitService { conn.execute( "CREATE TABLE IF NOT EXISTS analytic_unit ( id TEXT PRIMARY KEY, - last_detection INTEGER + last_detection INTEGER, + active BOOLEAN, + type INTEGER, + config TEXT )", [], )?; @@ -30,7 +35,7 @@ impl AnalyticUnitService { } // TODO: optional id - pub fn resolve_au(&self, cfg: AnalyticUnitConfig) -> Box { + pub fn resolve_au(&self, cfg: &AnalyticUnitConfig) -> Box { match cfg { AnalyticUnitConfig::Threshold(c) => Box::new(ThresholdAnalyticUnit::new("1".to_string(), c.clone())), AnalyticUnitConfig::Pattern(c) => Box::new(PatternAnalyticUnit::new("2".to_string(), c.clone())), @@ -38,7 +43,8 @@ impl AnalyticUnitService { } } - pub fn resolve(&self, cfg: AnalyticUnitConfig) -> anyhow::Result> { + // TODO: get id of analytic_unit which be used also as it's type + pub fn resolve(&self, cfg: &AnalyticUnitConfig) -> anyhow::Result> { let au = self.resolve_au(cfg); let id = au.as_ref().get_id(); @@ -49,16 +55,25 @@ impl AnalyticUnitService { let res = stmt.exists(params![id])?; if res == false { + let cfg_json = serde_json::to_string(&cfg)?; conn.execute( - "INSERT INTO analytic_unit (id) VALUES (?1)", - params![id] + "INSERT INTO analytic_unit (id, type, config) VALUES (?1, ?1, ?2)", + params![id, cfg_json] )?; } + conn.execute( + "UPDATE analytic_unit set active = FALSE where active = TRUE", + params![] + )?; + conn.execute( + "UPDATE analytic_unit set active = TRUE where id = ?1", + params![id] + )?; + return Ok(au); } - // TODO: resolve with saving by id pub fn set_last_detection(&self, id: String, last_detection: u64) -> anyhow::Result<()> { let conn = self.connection.lock().unwrap(); conn.execute( @@ -67,4 +82,61 @@ impl AnalyticUnitService { )?; Ok(()) } -} \ No newline at end of file + + pub fn get_active(&self) -> anyhow::Result> { + // TODO: return default when there is no active + let conn = self.connection.lock().unwrap(); + let mut stmt = conn.prepare( + "SELECT id, type, config from analytic_unit WHERE active = TRUE" + )?; + + let au = stmt.query_row([], |row| { + let c: String = row.get(2)?; + let cfg: AnalyticUnitConfig = serde_json::from_str(&c).unwrap(); + Ok(self.resolve(&cfg)) + })??; + + return Ok(au); + + } + + pub fn get_active_config(&self) -> anyhow::Result { + let exists = { + let conn = self.connection.lock().unwrap(); + let mut stmt = conn.prepare( + "SELECT config from analytic_unit WHERE active = TRUE" + )?; + stmt.exists([])? + }; + + if exists == false { + let c = AnalyticUnitConfig::Pattern(Default::default()); + self.resolve(&c)?; + return Ok(c); + } else { + let conn = self.connection.lock().unwrap(); + let mut stmt = conn.prepare( + "SELECT config from analytic_unit WHERE active = TRUE" + )?; + let acfg = stmt.query_row([], |row| { + let c: String = row.get(0)?; + let cfg = serde_json::from_str(&c).unwrap(); + Ok(cfg) + })?; + return Ok(acfg); + } + } + + pub fn update_active_config(&self, cfg: &AnalyticUnitConfig) -> anyhow::Result<()> { + let conn = self.connection.lock().unwrap(); + + let cfg_json = serde_json::to_string(&cfg)?; + + conn.execute( + "UPDATE analytic_unit SET config = ?1 WHERE active = TRUE", + params![cfg_json] + )?; + + return Ok(()); + } +}