use std::sync::{Arc, Mutex}; use serde_json::{Result, Value}; use serde::{Deserialize, Serialize}; use rusqlite::{params, Connection}; use super::analytic_service::analytic_unit::{types::{AnalyticUnitConfig, self}, threshold_analytic_unit::ThresholdAnalyticUnit, pattern_analytic_unit::PatternAnalyticUnit, anomaly_analytic_unit::AnomalyAnalyticUnit}; #[derive(Clone)] pub struct AnalyticUnitService { connection: Arc> } impl AnalyticUnitService { pub fn new() -> anyhow::Result { // TODO: remove repetitoin with segment_service std::fs::create_dir_all("./data").unwrap(); let conn = Connection::open("./data/analytic_units.db")?; // TODO: add learning results field conn.execute( "CREATE TABLE IF NOT EXISTS analytic_unit ( id TEXT PRIMARY KEY, last_detection INTEGER active BOOLEAN type INTEGER config TEXT )", [], )?; Ok(AnalyticUnitService { connection: Arc::new(Mutex::new(conn)), }) } // TODO: optional id pub fn resolve_au(&self, cfg: &AnalyticUnitConfig) -> Box { match cfg { AnalyticUnitConfig::Threshold(c) => Box::new(ThresholdAnalyticUnit::new("1".to_string(), c.clone())), AnalyticUnitConfig::Pattern(c) => Box::new(PatternAnalyticUnit::new("2".to_string(), c.clone())), AnalyticUnitConfig::Anomaly(c) => Box::new(AnomalyAnalyticUnit::new("3".to_string(), c.clone())), } } // TODO: get id of analytic_unit which be used also as it's type pub fn resolve(&self, cfg: AnalyticUnitConfig) -> anyhow::Result> { let au = self.resolve_au(&cfg); let id = au.as_ref().get_id(); let conn = self.connection.lock().unwrap(); let mut stmt = conn.prepare( "SELECT id from analytic_unit WHERE id = ?1", )?; let res = stmt.exists(params![id])?; if res == false { let cfg_json = serde_json::to_string(&cfg)?; conn.execute( "INSERT INTO analytic_unit (id, type, config) VALUES (?1, ?1, ?2)", params![id, cfg_json] )?; } conn.execute( "UPDATE analytic_unit set active = FALSE where active = TRUE", params![] )?; conn.execute( "UPDATE analytic_unit set active = TRUE where id = ?1", params![id] )?; return Ok(au); } // TODO: resolve with saving by id pub fn set_last_detection(&self, id: String, last_detection: u64) -> anyhow::Result<()> { let conn = self.connection.lock().unwrap(); conn.execute( "UPDATE analytic_unit SET last_detection = ?1 WHERE id = ?2", params![last_detection, id] )?; Ok(()) } pub fn get_active(&self) -> anyhow::Result> { let conn = self.connection.lock().unwrap(); let mut stmt = conn.prepare( "SELECT id, type, config from analytic_unit WHERE active = TRUE" )?; let au = stmt.query_row([], |row| { let c: String = row.get(2)?; let cfg = serde_json::from_str(&c).unwrap(); Ok(self.resolve(cfg)) })?; return Ok(au); } }