Browse Source

alerting continue

pull/25/head
Alexey Velikiy 3 years ago
parent
commit
de4119a64c
  1. 14
      server/src/services/analytic_service/analytic_service.rs
  2. 18
      server/src/services/analytic_service/analytic_unit/pattern_analytic_unit.rs
  3. 1
      server/src/services/analytic_service/detection_runner.rs
  4. 2
      server/src/services/analytic_service/types.rs

14
server/src/services/analytic_service/analytic_service.rs

@ -27,6 +27,8 @@ pub struct AnalyticService {
metric_service: MetricService, metric_service: MetricService,
segments_service: SegmentsService, segments_service: SegmentsService,
alerting: Option<AlertingConfig>,
analytic_unit: Option<Arc<RwLock<Box<dyn AnalyticUnit + Send + Sync>>>>, analytic_unit: Option<Arc<RwLock<Box<dyn AnalyticUnit + Send + Sync>>>>,
analytic_unit_config: AnalyticUnitConfig, analytic_unit_config: AnalyticUnitConfig,
analytic_unit_learning_status: LearningStatus, analytic_unit_learning_status: LearningStatus,
@ -55,6 +57,8 @@ impl AnalyticService {
metric_service, metric_service,
segments_service, segments_service,
alerting,
// TODO: get it from persistance // TODO: get it from persistance
analytic_unit: None, analytic_unit: None,
analytic_unit_config: AnalyticUnitConfig::Pattern(Default::default()), analytic_unit_config: AnalyticUnitConfig::Pattern(Default::default()),
@ -96,7 +100,8 @@ impl AnalyticService {
}); });
} }
fn run_detection_runner(&mut self, task: DetectionRunnerConfig) { fn run_detection_runner(&mut self) {
// TODO: create DetectionRunnerConfig from alerting
// TODO: rerun detection runner on analytic unit change // TODO: rerun detection runner on analytic unit change
// if self.runner_handler.is_some() { // if self.runner_handler.is_some() {
// self.runner_handler.as_mut().unwrap().abort(); // self.runner_handler.as_mut().unwrap().abort();
@ -154,6 +159,8 @@ impl AnalyticService {
RequestType::GetStatus(tx) => { RequestType::GetStatus(tx) => {
tx.send(self.analytic_unit_learning_status.clone()).unwrap(); tx.send(self.analytic_unit_learning_status.clone()).unwrap();
} }
// TODO: do it in abstract way for all analytic units
// RequestType::GetLearningTrain(tx) => { // RequestType::GetLearningTrain(tx) => {
// if self.analytic_unit_learning_results.is_none() { // if self.analytic_unit_learning_results.is_none() {
// tx.send(LearningTrain::default()).unwrap(); // tx.send(LearningTrain::default()).unwrap();
@ -264,6 +271,11 @@ impl AnalyticService {
pub async fn serve(&mut self) { pub async fn serve(&mut self) {
// TODO: remove this hack // TODO: remove this hack
self.consume_request(RequestType::RunLearning); self.consume_request(RequestType::RunLearning);
// TODO: start detection runner if
if self.alerting.is_some() {
self.run_detection_runner();
}
while let Some(message) = self.rx.recv().await { while let Some(message) = self.rx.recv().await {
match message { match message {

18
server/src/services/analytic_service/analytic_unit/pattern_analytic_unit.rs

@ -42,20 +42,6 @@ pub struct LearningResults {
avg_pattern_length: usize, avg_pattern_length: usize,
} }
// impl Clone for LearningResults {
// fn clone(&self) -> Self {
// // TODO: it's a hack
// // https://github.com/rust-ml/linfa/issues/174
// let model_str = serde_json::to_string(&self.model).unwrap();
// let model = serde_json::from_str(&model_str).unwrap();
// return LearningResults {
// model,
// patterns: self.patterns.clone(),
// anti_patterns: self.anti_patterns.clone()
// };
// }
// }
impl fmt::Debug for LearningResults { impl fmt::Debug for LearningResults {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Point") f.debug_struct("Point")
@ -180,15 +166,13 @@ fn corr_aligned(xs: &VecDeque<f64>, ys: &Vec<f64>) -> f32 {
let numerator: f64 = n * s_xsys - s_xs * s_ys; let numerator: f64 = n * s_xsys - s_xs * s_ys;
let denominator: f64 = ((n * s_xs_2 - s_xs * s_xs) * (n * s_ys_2 - s_ys * s_ys)).sqrt(); let denominator: f64 = ((n * s_xs_2 - s_xs * s_xs) * (n * s_ys_2 - s_ys * s_ys)).sqrt();
// IT"s a hack // TODO: IT"s a hack, check and make it's better
if denominator < 0.01 { if denominator < 0.01 {
return 0.; return 0.;
} }
let result: f64 = numerator / denominator; let result: f64 = numerator / denominator;
// assert!(result.abs() <= 1.01);
if result.abs() > 1.1 { if result.abs() > 1.1 {
println!("{:?}", xs); println!("{:?}", xs);
println!("------------"); println!("------------");

1
server/src/services/analytic_service/detection_runner.rs

@ -2,7 +2,6 @@ use crate::services::analytic_service::analytic_unit::types::AnalyticUnit;
use std::sync::Arc; use std::sync::Arc;
use chrono::Utc; use chrono::Utc;
use tokio::sync::{mpsc, RwLock}; use tokio::sync::{mpsc, RwLock};

2
server/src/services/analytic_service/types.rs

@ -84,7 +84,7 @@ pub enum LearningWaiter {
} }
// TODO: review if it's needed // TODO: review if it's needed
#[derive(Debug)] #[derive(Debug, Clone)]
pub struct DetectionRunnerConfig { pub struct DetectionRunnerConfig {
// pub sender: mpsc::Sender<Result<Vec<Segment>>>, // pub sender: mpsc::Sender<Result<Vec<Segment>>>,
pub endpoint: String, pub endpoint: String,

Loading…
Cancel
Save