Browse Source

AnalyticUnitService++

detection_runner_updade
Alexey Velikiy 2 years ago
parent
commit
9745083799
  1. 4
      server/src/main.rs
  2. 12
      server/src/services/analytic_service/analytic_service.rs
  3. 8
      server/src/services/analytic_service/analytic_unit/mod.rs
  4. 18
      server/src/services/analytic_unit_service.rs
  5. 1
      server/src/services/mod.rs

4
server/src/main.rs

@ -1,6 +1,6 @@
mod api; mod api;
use hastic::services::{analytic_service, metric_service, segments_service}; use hastic::services::{analytic_service, metric_service, segments_service, analytic_unit_service};
use anyhow; use anyhow;
@ -9,10 +9,12 @@ async fn main() -> anyhow::Result<()> {
let config = hastic::config::Config::new()?; let config = hastic::config::Config::new()?;
let cfg_clone = config.clone(); let cfg_clone = config.clone();
let analytic_unit_service = analytic_unit_service::AnalyticUnitService::new()?;
let metric_service = metric_service::MetricService::new(&config.datasource_config); let metric_service = metric_service::MetricService::new(&config.datasource_config);
let segments_service = segments_service::SegmentsService::new()?; let segments_service = segments_service::SegmentsService::new()?;
let mut analytic_service = analytic_service::AnalyticService::new( let mut analytic_service = analytic_service::AnalyticService::new(
analytic_unit_service.clone(),
metric_service.clone(), metric_service.clone(),
segments_service.clone(), segments_service.clone(),
config.alerting, config.alerting,

12
server/src/services/analytic_service/analytic_service.rs

@ -11,7 +11,8 @@ use super::{
}; };
use crate::config::{AlertingConfig, AlertingType}; use crate::config::{AlertingConfig, AlertingType};
use crate::services::analytic_service::analytic_unit::resolve;
use crate::services::analytic_unit_service::AnalyticUnitService;
use crate::services::{ use crate::services::{
metric_service::MetricService, metric_service::MetricService,
segments_service::{self, Segment, SegmentType, SegmentsService, ID_LENGTH}, segments_service::{self, Segment, SegmentType, SegmentsService, ID_LENGTH},
@ -30,6 +31,7 @@ use tokio::sync::{mpsc, oneshot};
pub struct AnalyticService { pub struct AnalyticService {
metric_service: MetricService, metric_service: MetricService,
segments_service: SegmentsService, segments_service: SegmentsService,
analytic_unit_service: AnalyticUnitService,
alerting: Option<AlertingConfig>, alerting: Option<AlertingConfig>,
@ -52,6 +54,7 @@ pub struct AnalyticService {
impl AnalyticService { impl AnalyticService {
pub fn new( pub fn new(
analytic_unit_service: AnalyticUnitService,
metric_service: MetricService, metric_service: MetricService,
segments_service: segments_service::SegmentsService, segments_service: segments_service::SegmentsService,
alerting: Option<AlertingConfig>, alerting: Option<AlertingConfig>,
@ -59,6 +62,7 @@ impl AnalyticService {
let (tx, rx) = mpsc::channel::<AnalyticServiceMessage>(32); let (tx, rx) = mpsc::channel::<AnalyticServiceMessage>(32);
AnalyticService { AnalyticService {
analytic_unit_service,
metric_service, metric_service,
segments_service, segments_service,
@ -161,11 +165,12 @@ impl AnalyticService {
self.learning_handler = Some(tokio::spawn({ self.learning_handler = Some(tokio::spawn({
self.analytic_unit_learning_status = LearningStatus::Starting; self.analytic_unit_learning_status = LearningStatus::Starting;
let tx = self.tx.clone(); let tx = self.tx.clone();
let aus = self.analytic_unit_service.clone();
let ms = self.metric_service.clone(); let ms = self.metric_service.clone();
let ss = self.segments_service.clone(); let ss = self.segments_service.clone();
let cfg = self.analytic_unit_config.clone(); let cfg = self.analytic_unit_config.clone();
async move { async move {
AnalyticService::run_learning(tx, cfg, ms, ss).await; AnalyticService::run_learning(tx, cfg, aus, ms, ss).await;
} }
})); }));
} }
@ -321,10 +326,11 @@ impl AnalyticService {
async fn run_learning( async fn run_learning(
tx: mpsc::Sender<AnalyticServiceMessage>, tx: mpsc::Sender<AnalyticServiceMessage>,
aucfg: AnalyticUnitConfig, aucfg: AnalyticUnitConfig,
aus: AnalyticUnitService,
ms: MetricService, ms: MetricService,
ss: SegmentsService, ss: SegmentsService,
) { ) {
let mut au = resolve(aucfg); let mut au = aus.resolve(aucfg);
match tx match tx
.send(AnalyticServiceMessage::Response(Ok( .send(AnalyticServiceMessage::Response(Ok(

8
server/src/services/analytic_service/analytic_unit/mod.rs

@ -8,10 +8,4 @@ use self::{
threshold_analytic_unit::ThresholdAnalyticUnit, types::AnalyticUnitConfig, threshold_analytic_unit::ThresholdAnalyticUnit, types::AnalyticUnitConfig,
}; };
pub fn resolve(cfg: AnalyticUnitConfig) -> Box<dyn types::AnalyticUnit + Send + Sync> {
match cfg {
AnalyticUnitConfig::Threshold(c) => Box::new(ThresholdAnalyticUnit::new(c.clone())),
AnalyticUnitConfig::Pattern(c) => Box::new(PatternAnalyticUnit::new(c.clone())),
AnalyticUnitConfig::Anomaly(c) => Box::new(AnomalyAnalyticUnit::new(c.clone())),
}
}

18
server/src/services/analytic_unit_service.rs

@ -1,13 +1,17 @@
use std::sync::{Arc, Mutex};
use crate::utils::get_random_str; use crate::utils::get_random_str;
use rusqlite::{params, Connection, Row}; use rusqlite::{params, Connection, Row};
use super::analytic_service::analytic_unit::{types::{AnalyticUnitConfig, self}, threshold_analytic_unit::ThresholdAnalyticUnit, pattern_analytic_unit::PatternAnalyticUnit, anomaly_analytic_unit::AnomalyAnalyticUnit};
#[derive(Clone)]
pub struct AnalyticUnitService { pub struct AnalyticUnitService {
// TODO: resolve by setting id for 3 types // TODO: resolve by setting id for 3 types
// TODO: create database // TODO: create database
// TODO: update detection // TODO: update detection
connection: Arc<Mutex<Connection>>
} }
impl AnalyticUnitService { impl AnalyticUnitService {
@ -20,7 +24,7 @@ impl AnalyticUnitService {
conn.execute( conn.execute(
"CREATE TABLE IF NOT EXISTS analytic_unit ( "CREATE TABLE IF NOT EXISTS analytic_unit (
id TEXT PRIMARY KEY, id TEXT PRIMARY KEY,
last_detection INTEGER NOT NULL, last_detection INTEGER NOT NULL
)", )",
[], [],
)?; )?;
@ -29,4 +33,12 @@ impl AnalyticUnitService {
connection: Arc::new(Mutex::new(conn)), connection: Arc::new(Mutex::new(conn)),
}) })
} }
pub fn resolve(&self, cfg: AnalyticUnitConfig) -> Box<dyn types::AnalyticUnit + Send + Sync> {
match cfg {
AnalyticUnitConfig::Threshold(c) => Box::new(ThresholdAnalyticUnit::new(c.clone())),
AnalyticUnitConfig::Pattern(c) => Box::new(PatternAnalyticUnit::new(c.clone())),
AnalyticUnitConfig::Anomaly(c) => Box::new(AnomalyAnalyticUnit::new(c.clone())),
}
}
} }

1
server/src/services/mod.rs

@ -2,3 +2,4 @@ pub mod analytic_service;
pub mod metric_service; pub mod metric_service;
pub mod segments_service; pub mod segments_service;
pub mod user_service; pub mod user_service;
pub mod analytic_unit_service;

Loading…
Cancel
Save