Browse Source

Merge pull request #67 from hastic/active-analytic-unit-#66

active analytic unit
env-variables-configuration-#55
glitch4347 2 years ago committed by GitHub
parent
commit
f1d31b9637
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 32
      server/src/services/analytic_service/analytic_service.rs
  2. 28
      server/src/services/analytic_service/analytic_unit/types.rs
  3. 86
      server/src/services/analytic_unit_service.rs

32
server/src/services/analytic_service/analytic_service.rs

@ -23,7 +23,7 @@ use crate::services::analytic_service::analytic_unit::types::{AnalyticUnit, Lear
use anyhow;
use chrono::{DateTime, TimeZone, Utc};
use chrono::{DateTime, Utc};
use tokio::sync::{mpsc, oneshot};
// TODO: now it's basically single analytic unit, service will operate on many AU
@ -59,18 +59,21 @@ impl AnalyticService {
segments_service: segments_service::SegmentsService,
alerting: Option<AlertingConfig>,
) -> AnalyticService {
// TODO: move buffer size to config
let (tx, rx) = mpsc::channel::<AnalyticServiceMessage>(32);
let aus = analytic_unit_service.clone();
AnalyticService {
analytic_unit_service,
analytic_unit_service: aus,
metric_service,
segments_service,
alerting,
// TODO: get it from persistance
analytic_unit: None,
analytic_unit_config: AnalyticUnitConfig::Pattern(Default::default()),
analytic_unit_config: analytic_unit_service.get_active_config().unwrap(),
analytic_unit_learning_status: LearningStatus::Initialization,
tx,
@ -271,11 +274,12 @@ impl AnalyticService {
}
fn patch_config(&mut self, patch: PatchConfig, tx: oneshot::Sender<()>) {
let (new_conf, need_learning) = self.analytic_unit_config.patch(patch);
self.analytic_unit_config = new_conf;
let (new_conf, need_learning, same_type) = self.analytic_unit_config.patch(patch);
self.analytic_unit_config = new_conf.clone();
if need_learning {
self.consume_request(RequestType::RunLearning);
// TODO: it's not fullu correct: we need to wait when the learning starts
// TODO: it's not fully correct: we need to wait when the learning starts
match tx.send(()) {
Ok(_) => {}
Err(_e) => {
@ -306,6 +310,18 @@ impl AnalyticService {
}
}
}
if same_type {
// TODO: avoid using `unwrap`
self.analytic_unit_service.update_active_config(&new_conf).unwrap();
} else {
// TODO: it's a hack, make it a better way
// TODO: avoid using unwrap
self.analytic_unit_service.resolve(&new_conf).unwrap();
self.analytic_unit_service.update_active_config(&new_conf).unwrap();
}
}
pub async fn serve(&mut self) {
@ -333,7 +349,7 @@ impl AnalyticService {
ms: MetricService,
ss: SegmentsService,
) {
let mut au = match aus.resolve(aucfg) {
let mut au = match aus.resolve(&aucfg) {
Ok(a) => a,
Err(e) => { panic!("{}", e); }
};

28
server/src/services/analytic_service/analytic_unit/types.rs

@ -63,22 +63,22 @@ pub enum AnalyticUnitConfig {
}
impl AnalyticUnitConfig {
// return true if patch is different type
pub fn patch(&self, patch: PatchConfig) -> (AnalyticUnitConfig, bool) {
// return true if need needs relearning and true if the config of the same type
pub fn patch(&self, patch: PatchConfig) -> (AnalyticUnitConfig, bool, bool) {
match patch {
PatchConfig::Pattern(tcfg) => match self.clone() {
AnalyticUnitConfig::Pattern(_) => {
if tcfg.is_some() {
return (AnalyticUnitConfig::Pattern(tcfg.unwrap()), false);
return (AnalyticUnitConfig::Pattern(tcfg.unwrap()), false, true);
} else {
return (AnalyticUnitConfig::Pattern(Default::default()), false);
return (AnalyticUnitConfig::Pattern(Default::default()), false, true);
}
}
_ => {
if tcfg.is_some() {
return (AnalyticUnitConfig::Pattern(tcfg.unwrap()), true);
return (AnalyticUnitConfig::Pattern(tcfg.unwrap()), true, false);
} else {
return (AnalyticUnitConfig::Pattern(Default::default()), true);
return (AnalyticUnitConfig::Pattern(Default::default()), true, false);
}
}
},
@ -89,16 +89,16 @@ impl AnalyticUnitConfig {
let t = tcfg.as_ref().unwrap();
let mut need_learning = t.seasonality != scfg.seasonality;
need_learning |= t.seasonality_iterations != scfg.seasonality_iterations;
return (AnalyticUnitConfig::Anomaly(tcfg.unwrap()), need_learning);
return (AnalyticUnitConfig::Anomaly(tcfg.unwrap()), need_learning, true);
} else {
return (AnalyticUnitConfig::Anomaly(Default::default()), false);
return (AnalyticUnitConfig::Anomaly(Default::default()), false, true);
}
}
_ => {
if tcfg.is_some() {
return (AnalyticUnitConfig::Anomaly(tcfg.unwrap()), true);
return (AnalyticUnitConfig::Anomaly(tcfg.unwrap()), true, false);
} else {
return (AnalyticUnitConfig::Anomaly(Default::default()), true);
return (AnalyticUnitConfig::Anomaly(Default::default()), true, false);
}
}
},
@ -106,16 +106,16 @@ impl AnalyticUnitConfig {
PatchConfig::Threshold(tcfg) => match self.clone() {
AnalyticUnitConfig::Threshold(_) => {
if tcfg.is_some() {
return (AnalyticUnitConfig::Threshold(tcfg.unwrap()), false);
return (AnalyticUnitConfig::Threshold(tcfg.unwrap()), false, true);
} else {
return (AnalyticUnitConfig::Threshold(Default::default()), false);
return (AnalyticUnitConfig::Threshold(Default::default()), false, true);
}
}
_ => {
if tcfg.is_some() {
return (AnalyticUnitConfig::Threshold(tcfg.unwrap()), true);
return (AnalyticUnitConfig::Threshold(tcfg.unwrap()), true, false);
} else {
return (AnalyticUnitConfig::Threshold(Default::default()), true);
return (AnalyticUnitConfig::Threshold(Default::default()), true, false);
}
}
},

86
server/src/services/analytic_unit_service.rs

@ -1,4 +1,6 @@
use std::sync::{Arc, Mutex};
use serde_json::{Result, Value};
use serde::{Deserialize, Serialize};
use rusqlite::{params, Connection};
@ -19,7 +21,10 @@ impl AnalyticUnitService {
conn.execute(
"CREATE TABLE IF NOT EXISTS analytic_unit (
id TEXT PRIMARY KEY,
last_detection INTEGER
last_detection INTEGER,
active BOOLEAN,
type INTEGER,
config TEXT
)",
[],
)?;
@ -30,7 +35,7 @@ impl AnalyticUnitService {
}
// TODO: optional id
pub fn resolve_au(&self, cfg: AnalyticUnitConfig) -> Box<dyn types::AnalyticUnit + Send + Sync> {
pub fn resolve_au(&self, cfg: &AnalyticUnitConfig) -> Box<dyn types::AnalyticUnit + Send + Sync> {
match cfg {
AnalyticUnitConfig::Threshold(c) => Box::new(ThresholdAnalyticUnit::new("1".to_string(), c.clone())),
AnalyticUnitConfig::Pattern(c) => Box::new(PatternAnalyticUnit::new("2".to_string(), c.clone())),
@ -38,7 +43,8 @@ impl AnalyticUnitService {
}
}
pub fn resolve(&self, cfg: AnalyticUnitConfig) -> anyhow::Result<Box<dyn types::AnalyticUnit + Send + Sync>> {
// TODO: get id of analytic_unit which be used also as it's type
pub fn resolve(&self, cfg: &AnalyticUnitConfig) -> anyhow::Result<Box<dyn types::AnalyticUnit + Send + Sync>> {
let au = self.resolve_au(cfg);
let id = au.as_ref().get_id();
@ -49,16 +55,25 @@ impl AnalyticUnitService {
let res = stmt.exists(params![id])?;
if res == false {
let cfg_json = serde_json::to_string(&cfg)?;
conn.execute(
"INSERT INTO analytic_unit (id) VALUES (?1)",
params![id]
"INSERT INTO analytic_unit (id, type, config) VALUES (?1, ?1, ?2)",
params![id, cfg_json]
)?;
}
conn.execute(
"UPDATE analytic_unit set active = FALSE where active = TRUE",
params![]
)?;
conn.execute(
"UPDATE analytic_unit set active = TRUE where id = ?1",
params![id]
)?;
return Ok(au);
}
// TODO: resolve with saving by id
pub fn set_last_detection(&self, id: String, last_detection: u64) -> anyhow::Result<()> {
let conn = self.connection.lock().unwrap();
conn.execute(
@ -67,4 +82,61 @@ impl AnalyticUnitService {
)?;
Ok(())
}
}
pub fn get_active(&self) -> anyhow::Result<Box<dyn types::AnalyticUnit + Send + Sync>> {
// TODO: return default when there is no active
let conn = self.connection.lock().unwrap();
let mut stmt = conn.prepare(
"SELECT id, type, config from analytic_unit WHERE active = TRUE"
)?;
let au = stmt.query_row([], |row| {
let c: String = row.get(2)?;
let cfg: AnalyticUnitConfig = serde_json::from_str(&c).unwrap();
Ok(self.resolve(&cfg))
})??;
return Ok(au);
}
pub fn get_active_config(&self) -> anyhow::Result<AnalyticUnitConfig> {
let exists = {
let conn = self.connection.lock().unwrap();
let mut stmt = conn.prepare(
"SELECT config from analytic_unit WHERE active = TRUE"
)?;
stmt.exists([])?
};
if exists == false {
let c = AnalyticUnitConfig::Pattern(Default::default());
self.resolve(&c)?;
return Ok(c);
} else {
let conn = self.connection.lock().unwrap();
let mut stmt = conn.prepare(
"SELECT config from analytic_unit WHERE active = TRUE"
)?;
let acfg = stmt.query_row([], |row| {
let c: String = row.get(0)?;
let cfg = serde_json::from_str(&c).unwrap();
Ok(cfg)
})?;
return Ok(acfg);
}
}
pub fn update_active_config(&self, cfg: &AnalyticUnitConfig) -> anyhow::Result<()> {
let conn = self.connection.lock().unwrap();
let cfg_json = serde_json::to_string(&cfg)?;
conn.execute(
"UPDATE analytic_unit SET config = ?1 WHERE active = TRUE",
params![cfg_json]
)?;
return Ok(());
}
}

Loading…
Cancel
Save