Browse Source

code format

analytic-unit-type-and-meta-in-db-#65
Alexey Velikiy 2 years ago
parent
commit
81069e9782
  1. 2
      server/src/main.rs
  2. 24
      server/src/services/analytic_service/analytic_service.rs
  3. 5
      server/src/services/analytic_service/analytic_unit/anomaly_analytic_unit.rs
  4. 1
      server/src/services/analytic_service/analytic_unit/mod.rs
  5. 19
      server/src/services/analytic_service/analytic_unit/types.rs
  6. 21
      server/src/services/analytic_service/detection_runner.rs
  7. 76
      server/src/services/analytic_unit_service.rs
  8. 2
      server/src/services/mod.rs

2
server/src/main.rs

@ -1,6 +1,6 @@
mod api; mod api;
use hastic::services::{analytic_service, metric_service, segments_service, analytic_unit_service}; use hastic::services::{analytic_service, analytic_unit_service, metric_service, segments_service};
use anyhow; use anyhow;

24
server/src/services/analytic_service/analytic_service.rs

@ -59,7 +59,6 @@ impl AnalyticService {
segments_service: segments_service::SegmentsService, segments_service: segments_service::SegmentsService,
alerting: Option<AlertingConfig>, alerting: Option<AlertingConfig>,
) -> AnalyticService { ) -> AnalyticService {
// TODO: move buffer size to config // TODO: move buffer size to config
let (tx, rx) = mpsc::channel::<AnalyticServiceMessage>(32); let (tx, rx) = mpsc::channel::<AnalyticServiceMessage>(32);
@ -242,7 +241,9 @@ impl AnalyticService {
println!("Detection runner started from {}", from) println!("Detection runner started from {}", from)
} }
ResponseType::DetectionRunnerUpdate(id, timestamp) => { ResponseType::DetectionRunnerUpdate(id, timestamp) => {
self.analytic_unit_service.set_last_detection(id, timestamp).unwrap(); self.analytic_unit_service
.set_last_detection(id, timestamp)
.unwrap();
} }
ResponseType::LearningStarted => { ResponseType::LearningStarted => {
self.analytic_unit_learning_status = LearningStatus::Learning self.analytic_unit_learning_status = LearningStatus::Learning
@ -274,14 +275,14 @@ impl AnalyticService {
} }
fn patch_config(&mut self, patch: PatchConfig, tx: oneshot::Sender<()>) { fn patch_config(&mut self, patch: PatchConfig, tx: oneshot::Sender<()>) {
let my_id = self
.analytic_unit_service
.get_config_id(&self.analytic_unit_config);
let my_id = self.analytic_unit_service.get_config_id(&self.analytic_unit_config);
let patch_id = patch.get_type_id(); let patch_id = patch.get_type_id();
let same_type = my_id == patch_id; let same_type = my_id == patch_id;
// TODO: need_learning and same_type logic overlaps, there is a way to optimise this // TODO: need_learning and same_type logic overlaps, there is a way to optimise this
let need_learning = self.analytic_unit_config.patch_needs_learning(&patch); let need_learning = self.analytic_unit_config.patch_needs_learning(&patch);
@ -289,7 +290,9 @@ impl AnalyticService {
// TODO: check when learning should be started // TODO: check when learning should be started
let new_conf = patch.get_new_config(); let new_conf = patch.get_new_config();
self.analytic_unit_config = new_conf.clone(); self.analytic_unit_config = new_conf.clone();
self.analytic_unit_service.update_config_by_id(&my_id, &new_conf).unwrap(); self.analytic_unit_service
.update_config_by_id(&my_id, &new_conf)
.unwrap();
if self.analytic_unit.is_some() { if self.analytic_unit.is_some() {
if need_learning { if need_learning {
@ -326,7 +329,10 @@ impl AnalyticService {
} }
} }
} else { } else {
let new_conf = self.analytic_unit_service.get_config_by_id(&patch_id).unwrap(); let new_conf = self
.analytic_unit_service
.get_config_by_id(&patch_id)
.unwrap();
self.analytic_unit_config = new_conf.clone(); self.analytic_unit_config = new_conf.clone();
self.consume_request(RequestType::RunLearning); self.consume_request(RequestType::RunLearning);
match tx.send(()) { match tx.send(()) {
@ -365,7 +371,9 @@ impl AnalyticService {
) { ) {
let mut au = match aus.resolve(&aucfg) { let mut au = match aus.resolve(&aucfg) {
Ok(a) => a, Ok(a) => a,
Err(e) => { panic!("{}", e); } Err(e) => {
panic!("{}", e);
}
}; };
match tx match tx

5
server/src/services/analytic_service/analytic_unit/anomaly_analytic_unit.rs

@ -50,7 +50,10 @@ impl SARIMA {
// TODO: trend detection // TODO: trend detection
if ts.len() < 2 { if ts.len() < 2 {
return Err(anyhow::format_err!("too short timeserie to learn from, timeserie length: {}", ts.len())); return Err(anyhow::format_err!(
"too short timeserie to learn from, timeserie length: {}",
ts.len()
));
} }
// TODO: ensure capacity with seasonality size // TODO: ensure capacity with seasonality size
let mut res_ts = Vec::<(u64, f64)>::new(); let mut res_ts = Vec::<(u64, f64)>::new();

1
server/src/services/analytic_service/analytic_unit/mod.rs

@ -7,4 +7,3 @@ use self::{
anomaly_analytic_unit::AnomalyAnalyticUnit, pattern_analytic_unit::PatternAnalyticUnit, anomaly_analytic_unit::AnomalyAnalyticUnit, pattern_analytic_unit::PatternAnalyticUnit,
threshold_analytic_unit::ThresholdAnalyticUnit, types::AnalyticUnitConfig, threshold_analytic_unit::ThresholdAnalyticUnit, types::AnalyticUnitConfig,
}; };

19
server/src/services/analytic_service/analytic_unit/types.rs

@ -63,14 +63,13 @@ pub enum AnalyticUnitConfig {
} }
impl AnalyticUnitConfig { impl AnalyticUnitConfig {
pub fn get_default_by_id(id: &String) -> AnalyticUnitConfig { pub fn get_default_by_id(id: &String) -> AnalyticUnitConfig {
let iid = id.as_str(); let iid = id.as_str();
match iid { match iid {
"1" => AnalyticUnitConfig::Threshold(Default::default()), "1" => AnalyticUnitConfig::Threshold(Default::default()),
"2" => AnalyticUnitConfig::Pattern(Default::default()), "2" => AnalyticUnitConfig::Pattern(Default::default()),
"3" => AnalyticUnitConfig::Anomaly(Default::default()), "3" => AnalyticUnitConfig::Anomaly(Default::default()),
_ => panic!("bad id for getting get_default_by_id") _ => panic!("bad id for getting get_default_by_id"),
} }
} }
@ -81,9 +80,7 @@ impl AnalyticUnitConfig {
AnalyticUnitConfig::Pattern(_) => { AnalyticUnitConfig::Pattern(_) => {
return false; return false;
} }
_ => { _ => return true,
return true
}
}, },
PatchConfig::Anomaly(tcfg) => match self.clone() { PatchConfig::Anomaly(tcfg) => match self.clone() {
@ -212,15 +209,17 @@ impl PatchConfig {
match &self { match &self {
PatchConfig::Threshold(_) => "1".to_string(), PatchConfig::Threshold(_) => "1".to_string(),
PatchConfig::Pattern(_) => "2".to_string(), PatchConfig::Pattern(_) => "2".to_string(),
PatchConfig::Anomaly(_) => "3".to_string() PatchConfig::Anomaly(_) => "3".to_string(),
} }
} }
pub fn get_new_config(&self) -> AnalyticUnitConfig { pub fn get_new_config(&self) -> AnalyticUnitConfig {
match &self { match &self {
PatchConfig::Threshold(cfg) => AnalyticUnitConfig::Threshold(cfg.as_ref().unwrap().clone()), PatchConfig::Threshold(cfg) => {
AnalyticUnitConfig::Threshold(cfg.as_ref().unwrap().clone())
}
PatchConfig::Pattern(cfg) => AnalyticUnitConfig::Pattern(cfg.as_ref().unwrap().clone()), PatchConfig::Pattern(cfg) => AnalyticUnitConfig::Pattern(cfg.as_ref().unwrap().clone()),
PatchConfig::Anomaly(cfg) => AnalyticUnitConfig::Anomaly(cfg.as_ref().unwrap().clone()) PatchConfig::Anomaly(cfg) => AnalyticUnitConfig::Anomaly(cfg.as_ref().unwrap().clone()),
} }
} }
} }

21
server/src/services/analytic_service/detection_runner.rs

@ -1,16 +1,14 @@
use chrono::{Utc, DateTime}; use chrono::{DateTime, Utc};
use tokio::sync::{mpsc}; use tokio::sync::mpsc;
use crate::services::metric_service::MetricService; use crate::services::metric_service::MetricService;
use super::types::{AnalyticServiceMessage, AnalyticUnitRF, DetectionRunnerConfig, ResponseType}; use super::types::{AnalyticServiceMessage, AnalyticUnitRF, DetectionRunnerConfig, ResponseType};
use tokio::time::{sleep, Duration}; use tokio::time::{sleep, Duration};
const DETECTION_STEP: u64 = 10; const DETECTION_STEP: u64 = 10;
pub struct DetectionRunner { pub struct DetectionRunner {
metric_service: MetricService, metric_service: MetricService,
tx: mpsc::Sender<AnalyticServiceMessage>, tx: mpsc::Sender<AnalyticServiceMessage>,
@ -76,11 +74,16 @@ impl DetectionRunner {
// TODO: run detection periodically // TODO: run detection periodically
// TODO: set info about detections to tx // TODO: set info about detections to tx
match tx
match tx.send(AnalyticServiceMessage::Response(Ok( .send(AnalyticServiceMessage::Response(Ok(
ResponseType::DetectionRunnerUpdate(au.as_ref().read().await.get_id(), t_to) ResponseType::DetectionRunnerUpdate(
))).await { au.as_ref().read().await.get_id(),
Ok(_) => {}, t_to,
),
)))
.await
{
Ok(_) => {}
Err(_e) => println!("Fail to send detection runner started notification"), Err(_e) => println!("Fail to send detection runner started notification"),
} }

76
server/src/services/analytic_unit_service.rs

@ -1,14 +1,19 @@
use std::sync::{Arc, Mutex};
use serde_json::{Result, Value};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_json::{Result, Value};
use std::sync::{Arc, Mutex};
use rusqlite::{params, Connection}; use rusqlite::{params, Connection};
use super::analytic_service::analytic_unit::{types::{AnalyticUnitConfig, self}, threshold_analytic_unit::ThresholdAnalyticUnit, pattern_analytic_unit::PatternAnalyticUnit, anomaly_analytic_unit::AnomalyAnalyticUnit}; use super::analytic_service::analytic_unit::{
anomaly_analytic_unit::AnomalyAnalyticUnit,
pattern_analytic_unit::PatternAnalyticUnit,
threshold_analytic_unit::ThresholdAnalyticUnit,
types::{self, AnalyticUnitConfig},
};
#[derive(Clone)] #[derive(Clone)]
pub struct AnalyticUnitService { pub struct AnalyticUnitService {
connection: Arc<Mutex<Connection>> connection: Arc<Mutex<Connection>>,
} }
impl AnalyticUnitService { impl AnalyticUnitService {
@ -35,40 +40,50 @@ impl AnalyticUnitService {
} }
// TODO: optional id // TODO: optional id
pub fn resolve_au(&self, cfg: &AnalyticUnitConfig) -> Box<dyn types::AnalyticUnit + Send + Sync> { pub fn resolve_au(
&self,
cfg: &AnalyticUnitConfig,
) -> Box<dyn types::AnalyticUnit + Send + Sync> {
match cfg { match cfg {
AnalyticUnitConfig::Threshold(c) => Box::new(ThresholdAnalyticUnit::new("1".to_string(), c.clone())), AnalyticUnitConfig::Threshold(c) => {
AnalyticUnitConfig::Pattern(c) => Box::new(PatternAnalyticUnit::new("2".to_string(), c.clone())), Box::new(ThresholdAnalyticUnit::new("1".to_string(), c.clone()))
AnalyticUnitConfig::Anomaly(c) => Box::new(AnomalyAnalyticUnit::new("3".to_string(), c.clone())), }
AnalyticUnitConfig::Pattern(c) => {
Box::new(PatternAnalyticUnit::new("2".to_string(), c.clone()))
}
AnalyticUnitConfig::Anomaly(c) => {
Box::new(AnomalyAnalyticUnit::new("3".to_string(), c.clone()))
}
} }
} }
// TODO: get id of analytic_unit which be used also as it's type // TODO: get id of analytic_unit which be used also as it's type
pub fn resolve(&self, cfg: &AnalyticUnitConfig) -> anyhow::Result<Box<dyn types::AnalyticUnit + Send + Sync>> { pub fn resolve(
&self,
cfg: &AnalyticUnitConfig,
) -> anyhow::Result<Box<dyn types::AnalyticUnit + Send + Sync>> {
let au = self.resolve_au(cfg); let au = self.resolve_au(cfg);
let id = au.as_ref().get_id(); let id = au.as_ref().get_id();
let conn = self.connection.lock().unwrap(); let conn = self.connection.lock().unwrap();
let mut stmt = conn.prepare( let mut stmt = conn.prepare("SELECT id from analytic_unit WHERE id = ?1")?;
"SELECT id from analytic_unit WHERE id = ?1",
)?;
let res = stmt.exists(params![id])?; let res = stmt.exists(params![id])?;
if res == false { if res == false {
let cfg_json = serde_json::to_string(&cfg)?; let cfg_json = serde_json::to_string(&cfg)?;
conn.execute( conn.execute(
"INSERT INTO analytic_unit (id, type, config) VALUES (?1, ?1, ?2)", "INSERT INTO analytic_unit (id, type, config) VALUES (?1, ?1, ?2)",
params![id, cfg_json] params![id, cfg_json],
)?; )?;
} }
conn.execute( conn.execute(
"UPDATE analytic_unit set active = FALSE where active = TRUE", "UPDATE analytic_unit set active = FALSE where active = TRUE",
params![] params![],
)?; )?;
conn.execute( conn.execute(
"UPDATE analytic_unit set active = TRUE where id = ?1", "UPDATE analytic_unit set active = TRUE where id = ?1",
params![id] params![id],
)?; )?;
return Ok(au); return Ok(au);
@ -78,7 +93,7 @@ impl AnalyticUnitService {
let conn = self.connection.lock().unwrap(); let conn = self.connection.lock().unwrap();
conn.execute( conn.execute(
"UPDATE analytic_unit SET last_detection = ?1 WHERE id = ?2", "UPDATE analytic_unit SET last_detection = ?1 WHERE id = ?2",
params![last_detection, id] params![last_detection, id],
)?; )?;
Ok(()) Ok(())
} }
@ -86,9 +101,8 @@ impl AnalyticUnitService {
pub fn get_active(&self) -> anyhow::Result<Box<dyn types::AnalyticUnit + Send + Sync>> { pub fn get_active(&self) -> anyhow::Result<Box<dyn types::AnalyticUnit + Send + Sync>> {
// TODO: return default when there is no active // TODO: return default when there is no active
let conn = self.connection.lock().unwrap(); let conn = self.connection.lock().unwrap();
let mut stmt = conn.prepare( let mut stmt =
"SELECT id, type, config from analytic_unit WHERE active = TRUE" conn.prepare("SELECT id, type, config from analytic_unit WHERE active = TRUE")?;
)?;
let au = stmt.query_row([], |row| { let au = stmt.query_row([], |row| {
let c: String = row.get(2)?; let c: String = row.get(2)?;
@ -97,15 +111,12 @@ impl AnalyticUnitService {
})??; })??;
return Ok(au); return Ok(au);
} }
pub fn get_active_config(&self) -> anyhow::Result<AnalyticUnitConfig> { pub fn get_active_config(&self) -> anyhow::Result<AnalyticUnitConfig> {
let exists = { let exists = {
let conn = self.connection.lock().unwrap(); let conn = self.connection.lock().unwrap();
let mut stmt = conn.prepare( let mut stmt = conn.prepare("SELECT config from analytic_unit WHERE active = TRUE")?;
"SELECT config from analytic_unit WHERE active = TRUE"
)?;
stmt.exists([])? stmt.exists([])?
}; };
@ -115,9 +126,7 @@ impl AnalyticUnitService {
return Ok(c); return Ok(c);
} else { } else {
let conn = self.connection.lock().unwrap(); let conn = self.connection.lock().unwrap();
let mut stmt = conn.prepare( let mut stmt = conn.prepare("SELECT config from analytic_unit WHERE active = TRUE")?;
"SELECT config from analytic_unit WHERE active = TRUE"
)?;
let acfg = stmt.query_row([], |row| { let acfg = stmt.query_row([], |row| {
let c: String = row.get(0)?; let c: String = row.get(0)?;
let cfg = serde_json::from_str(&c).unwrap(); let cfg = serde_json::from_str(&c).unwrap();
@ -130,9 +139,7 @@ impl AnalyticUnitService {
pub fn get_config_by_id(&self, id: &String) -> anyhow::Result<AnalyticUnitConfig> { pub fn get_config_by_id(&self, id: &String) -> anyhow::Result<AnalyticUnitConfig> {
let exists = { let exists = {
let conn = self.connection.lock().unwrap(); let conn = self.connection.lock().unwrap();
let mut stmt = conn.prepare( let mut stmt = conn.prepare("SELECT config from analytic_unit WHERE id = ?1")?;
"SELECT config from analytic_unit WHERE id = ?1"
)?;
stmt.exists([id])? stmt.exists([id])?
}; };
@ -142,9 +149,7 @@ impl AnalyticUnitService {
return Ok(c); return Ok(c);
} else { } else {
let conn = self.connection.lock().unwrap(); let conn = self.connection.lock().unwrap();
let mut stmt = conn.prepare( let mut stmt = conn.prepare("SELECT config from analytic_unit WHERE id = ?1")?;
"SELECT config from analytic_unit WHERE id = ?1"
)?;
let acfg = stmt.query_row([id], |row| { let acfg = stmt.query_row([id], |row| {
let c: String = row.get(0)?; let c: String = row.get(0)?;
let cfg = serde_json::from_str(&c).unwrap(); let cfg = serde_json::from_str(&c).unwrap();
@ -163,7 +168,6 @@ impl AnalyticUnitService {
} }
pub fn update_config_by_id(&self, id: &String, cfg: &AnalyticUnitConfig) -> anyhow::Result<()> { pub fn update_config_by_id(&self, id: &String, cfg: &AnalyticUnitConfig) -> anyhow::Result<()> {
// TODO: it's possble that config doesn't exist, but we trying to update it // TODO: it's possble that config doesn't exist, but we trying to update it
let conn = self.connection.lock().unwrap(); let conn = self.connection.lock().unwrap();
@ -171,7 +175,7 @@ impl AnalyticUnitService {
conn.execute( conn.execute(
"UPDATE analytic_unit SET config = ?1 WHERE id = ?2", "UPDATE analytic_unit SET config = ?1 WHERE id = ?2",
params![cfg_json, id] params![cfg_json, id],
)?; )?;
return Ok(()); return Ok(());
@ -184,7 +188,7 @@ impl AnalyticUnitService {
conn.execute( conn.execute(
"UPDATE analytic_unit SET config = ?1 WHERE active = TRUE", "UPDATE analytic_unit SET config = ?1 WHERE active = TRUE",
params![cfg_json] params![cfg_json],
)?; )?;
return Ok(()); return Ok(());

2
server/src/services/mod.rs

@ -1,5 +1,5 @@
pub mod analytic_service; pub mod analytic_service;
pub mod analytic_unit_service;
pub mod metric_service; pub mod metric_service;
pub mod segments_service; pub mod segments_service;
pub mod user_service; pub mod user_service;
pub mod analytic_unit_service;

Loading…
Cancel
Save