Browse Source

active config++

active-analytic-unit-#66
Alexey Velikiy 2 years ago
parent
commit
090b9bf42e
  1. 13
      server/src/services/analytic_service/analytic_service.rs
  2. 38
      server/src/services/analytic_unit_service.rs

13
server/src/services/analytic_service/analytic_service.rs

@ -23,7 +23,7 @@ use crate::services::analytic_service::analytic_unit::types::{AnalyticUnit, Lear
use anyhow;
use chrono::{DateTime, TimeZone, Utc};
use chrono::{DateTime, Utc};
use tokio::sync::{mpsc, oneshot};
// TODO: now it's basically single analytic unit, service will operate on many AU
@ -59,10 +59,14 @@ impl AnalyticService {
segments_service: segments_service::SegmentsService,
alerting: Option<AlertingConfig>,
) -> AnalyticService {
// TODO: move buffer size to config
let (tx, rx) = mpsc::channel::<AnalyticServiceMessage>(32);
let aus = analytic_unit_service.clone();
AnalyticService {
analytic_unit_service,
analytic_unit_service: aus,
metric_service,
segments_service,
@ -71,7 +75,7 @@ impl AnalyticService {
// TODO: get it from persistance
analytic_unit: None,
// TODO: get pattern from saved in analytic_unit_service
analytic_unit_config: AnalyticUnitConfig::Pattern(Default::default()),
analytic_unit_config: analytic_unit_service.get_active_config().unwrap(),
analytic_unit_learning_status: LearningStatus::Initialization,
tx,
@ -272,6 +276,7 @@ impl AnalyticService {
}
fn patch_config(&mut self, patch: PatchConfig, tx: oneshot::Sender<()>) {
// TODO: update config in db
let (new_conf, need_learning) = self.analytic_unit_config.patch(patch);
self.analytic_unit_config = new_conf;
if need_learning {
@ -334,7 +339,7 @@ impl AnalyticService {
ms: MetricService,
ss: SegmentsService,
) {
let mut au = match aus.resolve(aucfg) {
let mut au = match aus.resolve(&aucfg) {
Ok(a) => a,
Err(e) => { panic!("{}", e); }
};

38
server/src/services/analytic_unit_service.rs

@ -21,9 +21,9 @@ impl AnalyticUnitService {
conn.execute(
"CREATE TABLE IF NOT EXISTS analytic_unit (
id TEXT PRIMARY KEY,
last_detection INTEGER
active BOOLEAN
type INTEGER
last_detection INTEGER,
active BOOLEAN,
type INTEGER,
config TEXT
)",
[],
@ -44,8 +44,8 @@ impl AnalyticUnitService {
}
// TODO: get id of analytic_unit which be used also as it's type
pub fn resolve(&self, cfg: AnalyticUnitConfig) -> anyhow::Result<Box<dyn types::AnalyticUnit + Send + Sync>> {
let au = self.resolve_au(&cfg);
pub fn resolve(&self, cfg: &AnalyticUnitConfig) -> anyhow::Result<Box<dyn types::AnalyticUnit + Send + Sync>> {
let au = self.resolve_au(cfg);
let id = au.as_ref().get_id();
let conn = self.connection.lock().unwrap();
@ -74,7 +74,6 @@ impl AnalyticUnitService {
return Ok(au);
}
// TODO: resolve with saving by id
pub fn set_last_detection(&self, id: String, last_detection: u64) -> anyhow::Result<()> {
let conn = self.connection.lock().unwrap();
conn.execute(
@ -85,6 +84,7 @@ impl AnalyticUnitService {
}
pub fn get_active(&self) -> anyhow::Result<Box<dyn types::AnalyticUnit + Send + Sync>> {
// TODO: return default when there is no active
let conn = self.connection.lock().unwrap();
let mut stmt = conn.prepare(
"SELECT id, type, config from analytic_unit WHERE active = TRUE"
@ -92,11 +92,31 @@ impl AnalyticUnitService {
let au = stmt.query_row([], |row| {
let c: String = row.get(2)?;
let cfg = serde_json::from_str(&c).unwrap();
Ok(self.resolve(cfg))
})?;
let cfg: AnalyticUnitConfig = serde_json::from_str(&c).unwrap();
Ok(self.resolve(&cfg))
})??;
return Ok(au);
}
pub fn get_active_config(&self) -> anyhow::Result<AnalyticUnitConfig> {
let conn = self.connection.lock().unwrap();
let mut stmt = conn.prepare(
"SELECT config from analytic_unit WHERE active = TRUE"
)?;
if stmt.exists([])? == false {
let c = AnalyticUnitConfig::Pattern(Default::default());
self.resolve(&c)?;
return Ok(c);
} else {
let acfg = stmt.query_row([], |row| {
let c: String = row.get(0)?;
let cfg = serde_json::from_str(&c).unwrap();
Ok(cfg)
})?;
return Ok(acfg);
}
}
}
Loading…
Cancel
Save