Browse Source

save analytic unit into db on resolve

detection_runner_updade
Alexey Velikiy 2 years ago
parent
commit
c61352422c
  1. 4
      server/Cargo.lock
  2. 2
      server/Cargo.toml
  3. 5
      server/src/services/analytic_service/analytic_service.rs
  4. 1
      server/src/services/analytic_service/detection_runner.rs
  5. 31
      server/src/services/analytic_unit_service.rs

4
server/Cargo.lock generated

@ -1333,9 +1333,9 @@ dependencies = [
[[package]] [[package]]
name = "rusqlite" name = "rusqlite"
version = "0.26.1" version = "0.26.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a82b0b91fad72160c56bf8da7a549b25d7c31109f52cc1437eac4c0ad2550a7" checksum = "4ba4d3462c8b2e4d7f4fcfcf2b296dc6b65404fbbc7b63daa37fd485c149daf7"
dependencies = [ dependencies = [
"bitflags", "bitflags",
"fallible-iterator", "fallible-iterator",

2
server/Cargo.toml

@ -17,7 +17,7 @@ fastrand = "1.5.0"
subbeat = "0.0.15" subbeat = "0.0.15"
config = "0.11.0" config = "0.11.0"
openssl = { version = "=0.10.33", features = ["vendored"] } openssl = { version = "=0.10.33", features = ["vendored"] }
rusqlite = "0.26.1" rusqlite = "0.26.2"
# https://github.com/rusqlite/rusqlite/issues/914 # https://github.com/rusqlite/rusqlite/issues/914
libsqlite3-sys = { version = "*", features = ["bundled"] } libsqlite3-sys = { version = "*", features = ["bundled"] }
futures = "0.3.17" futures = "0.3.17"

5
server/src/services/analytic_service/analytic_service.rs

@ -330,7 +330,10 @@ impl AnalyticService {
ms: MetricService, ms: MetricService,
ss: SegmentsService, ss: SegmentsService,
) { ) {
let mut au = aus.resolve(aucfg); let mut au = match aus.resolve(aucfg) {
Ok(a) => a,
Err(e) => { panic!("{}", e); }
};
match tx match tx
.send(AnalyticServiceMessage::Response(Ok( .send(AnalyticServiceMessage::Response(Ok(

1
server/src/services/analytic_service/detection_runner.rs

@ -56,6 +56,7 @@ impl DetectionRunner {
loop { loop {
// TODO: run detection periodically // TODO: run detection periodically
sleep(Duration::from_secs(cfg.interval)).await; sleep(Duration::from_secs(cfg.interval)).await;
// TODO: use tx senf detection update
} }
} }
})); }));

31
server/src/services/analytic_unit_service.rs

@ -3,6 +3,7 @@ use std::sync::{Arc, Mutex};
use crate::utils::get_random_str; use crate::utils::get_random_str;
use rusqlite::{params, Connection, Row}; use rusqlite::{params, Connection, Row};
use warp::hyper::rt::Executor;
use super::analytic_service::analytic_unit::{types::{AnalyticUnitConfig, self}, threshold_analytic_unit::ThresholdAnalyticUnit, pattern_analytic_unit::PatternAnalyticUnit, anomaly_analytic_unit::AnomalyAnalyticUnit}; use super::analytic_service::analytic_unit::{types::{AnalyticUnitConfig, self}, threshold_analytic_unit::ThresholdAnalyticUnit, pattern_analytic_unit::PatternAnalyticUnit, anomaly_analytic_unit::AnomalyAnalyticUnit};
@ -24,7 +25,7 @@ impl AnalyticUnitService {
conn.execute( conn.execute(
"CREATE TABLE IF NOT EXISTS analytic_unit ( "CREATE TABLE IF NOT EXISTS analytic_unit (
id TEXT PRIMARY KEY, id TEXT PRIMARY KEY,
last_detection INTEGER NOT NULL last_detection INTEGER
)", )",
[], [],
)?; )?;
@ -34,11 +35,37 @@ impl AnalyticUnitService {
}) })
} }
pub fn resolve(&self, cfg: AnalyticUnitConfig) -> Box<dyn types::AnalyticUnit + Send + Sync> { // TODO: optional id
pub fn resolve_au(&self, cfg: AnalyticUnitConfig) -> Box<dyn types::AnalyticUnit + Send + Sync> {
match cfg { match cfg {
AnalyticUnitConfig::Threshold(c) => Box::new(ThresholdAnalyticUnit::new("1".to_string(), c.clone())), AnalyticUnitConfig::Threshold(c) => Box::new(ThresholdAnalyticUnit::new("1".to_string(), c.clone())),
AnalyticUnitConfig::Pattern(c) => Box::new(PatternAnalyticUnit::new("2".to_string(), c.clone())), AnalyticUnitConfig::Pattern(c) => Box::new(PatternAnalyticUnit::new("2".to_string(), c.clone())),
AnalyticUnitConfig::Anomaly(c) => Box::new(AnomalyAnalyticUnit::new("3".to_string(), c.clone())), AnalyticUnitConfig::Anomaly(c) => Box::new(AnomalyAnalyticUnit::new("3".to_string(), c.clone())),
} }
} }
pub fn resolve(&self, cfg: AnalyticUnitConfig) -> anyhow::Result<Box<dyn types::AnalyticUnit + Send + Sync>> {
let au = self.resolve_au(cfg);
let id = au.as_ref().get_id();
let conn = self.connection.lock().unwrap();
let mut stmt = conn.prepare(
"SELECT id from analytic_unit WHERE id = ?1",
)?;
let res = stmt.exists(params![id])?;
if res == false {
conn.execute(
"INSERT INTO analytic_unit (id) VALUES (?1)",
params![id]
)?;
}
return Ok(au);
}
// TODO: resolve with saving by id
pub fn set_last_detection(id: String, last_detection: u64) -> anyhow::Result<()> {
Ok(())
}
} }
Loading…
Cancel
Save