diff --git a/README.md b/README.md index c135a43..50c8647 100644 --- a/README.md +++ b/README.md @@ -6,11 +6,10 @@ instance for getting metrics. ## Build from source (Linux) ### Prerequirements -1. [Install cargo](https://doc.rust-lang.org/cargo/getting-started/installation.html) (required version: >=1.49) -2. Install [node.js >=10.x](https://nodejs.org/en/download/) -3. Install [yarn](https://classic.yarnpkg.com/lang/en/docs/install) -4. Install x86_64-unknown-linux-musl: `rustup target add x86_64-unknown-linux-musl` -5. musl-tools: `sudo apt install musl-tools` +1. [Install cargo](https://doc.rust-lang.org/cargo/getting-started/installation.html) +2. Install x86_64-unknown-linux-musl: `rustup target add x86_64-unknown-linux-musl` +3. musl-tools: `sudo apt install musl-tools` + ### Build ``` diff --git a/server/src/services/analytic_service/analytic_service.rs b/server/src/services/analytic_service/analytic_service.rs index e8c1711..b63ca32 100644 --- a/server/src/services/analytic_service/analytic_service.rs +++ b/server/src/services/analytic_service/analytic_service.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use super::analytic_unit::types::{AnalyticUnitConfig, PatchConfig}; use super::detection_runner::DetectionRunner; -use super::types::{self, AnalyticUnitRF, DetectionRunnerConfig, LearningWaiter, HSR}; +use super::types::{self, AnalyticUnitRF, DetectionRunnerConfig, LearningWaiter, HSR, DetectionRunnerTask}; use super::{ analytic_client::AnalyticClient, types::{AnalyticServiceMessage, LearningStatus, RequestType, ResponseType}, @@ -20,6 +20,7 @@ use crate::services::analytic_service::analytic_unit::types::{AnalyticUnit, Lear use anyhow; +use chrono::{TimeZone, DateTime, Utc}; use tokio::sync::{mpsc, oneshot}; // TODO: now it's basically single analytic unit, service will operate on many AU @@ -82,26 +83,36 @@ impl AnalyticService { AnalyticClient::new(self.tx.clone()) } - fn run_learning_waiter(&self, learning_waiter: LearningWaiter) { + fn run_learning_waiter(&mut self, learning_waiter: LearningWaiter) { // TODO: save handler of the task - tokio::spawn({ - let ms = self.metric_service.clone(); - let au = self.analytic_unit.as_ref().unwrap().clone(); - async move { - match learning_waiter { - LearningWaiter::Detection(task) => { + match learning_waiter { + LearningWaiter::Detection(task) => { + tokio::spawn({ + let ms = self.metric_service.clone(); + let au = self.analytic_unit.as_ref().unwrap().clone(); + async move { AnalyticService::get_detections(task.sender, au, ms, task.from, task.to) .await } - LearningWaiter::HSR(task) => { + }); + } + LearningWaiter::HSR(task) => { + tokio::spawn({ + let ms = self.metric_service.clone(); + let au = self.analytic_unit.as_ref().unwrap().clone(); + async move { AnalyticService::get_hsr(task.sender, au, ms, task.from, task.to).await } - } + }); + } + LearningWaiter::DetectionRunner(task) => { + self.run_detection_runner(task.from); } - }); + } } - fn run_detection_runner(&mut self) { + // TODO: make + fn run_detection_runner(&mut self, from: u64) { // TODO: handle case or make it impossible to run_detection_runner second time if self.analytic_unit.is_none() { @@ -109,7 +120,10 @@ impl AnalyticService { } if self.analytic_unit_learning_status != LearningStatus::Ready { - // TODO: add to waiter + let task = DetectionRunnerTask { + from + }; + self.learning_waiters.push(LearningWaiter::DetectionRunner(task)); return; } @@ -215,33 +229,30 @@ impl AnalyticService { } // TODO: maybe make `consume_response` async - fn consume_response(&mut self, res: anyhow::Result) { + fn consume_response(&mut self, res: types::ResponseType) { match res { - Ok(response_type) => { - match response_type { - ResponseType::LearningStarted => { - self.analytic_unit_learning_status = LearningStatus::Learning - } - ResponseType::LearningFinished(results) => { - self.learning_handler = None; - self.analytic_unit = Some(Arc::new(tokio::sync::RwLock::new(results))); - self.analytic_unit_learning_status = LearningStatus::Ready; - - // TODO: run tasks from self.learning_waiter - while self.learning_waiters.len() > 0 { - let task = self.learning_waiters.pop().unwrap(); - self.run_learning_waiter(task); - } - } - ResponseType::LearningFinishedEmpty => { - // TODO: drop all learning_waiters with empty results - self.analytic_unit = None; - self.analytic_unit_learning_status = LearningStatus::Initialization; - } + // TODO: handle when learning panics + ResponseType::LearningStarted => { + self.analytic_unit_learning_status = LearningStatus::Learning + } + ResponseType::LearningFinished(results) => { + self.learning_handler = None; + self.analytic_unit = Some(Arc::new(tokio::sync::RwLock::new(results))); + self.analytic_unit_learning_status = LearningStatus::Ready; + + // TODO: run tasks from self.learning_waiter + while self.learning_waiters.len() > 0 { + let task = self.learning_waiters.pop().unwrap(); + self.run_learning_waiter(task); } - }, - // TODO: create custom DatasourceError error type - Err(_) => { + } + ResponseType::LearningFinishedEmpty => { + // TODO: drop all learning_waiters with empty results + self.analytic_unit = None; + self.analytic_unit_learning_status = LearningStatus::Initialization; + } + ResponseType::LearningDatasourceError => { + // TODO: drop all learning_waiters with error self.analytic_unit = None; self.analytic_unit_learning_status = LearningStatus::Error; } @@ -290,7 +301,10 @@ impl AnalyticService { // TODO: remove this hack self.consume_request(RequestType::RunLearning); if self.alerting.is_some() { - self.run_detection_runner(); + // TODO: get it from persistance + let now: DateTime = Utc::now(); + let from = now.timestamp() as u64; + self.run_detection_runner(from); } while let Some(message) = self.rx.recv().await { @@ -311,7 +325,7 @@ impl AnalyticService { match tx .send(AnalyticServiceMessage::Response( - Ok(ResponseType::LearningStarted), + ResponseType::LearningStarted, )) .await { @@ -321,13 +335,9 @@ impl AnalyticService { // TODO: maybe to spawn_blocking here let lr = match au.learn(ms, ss).await { - Ok(res) => { - match res { - LearningResult::Finished => Ok(ResponseType::LearningFinished(au)), - LearningResult::FinishedEmpty => Ok(ResponseType::LearningFinishedEmpty) - } - } - Err(e) => Err(e) + LearningResult::Finished => ResponseType::LearningFinished(au), + LearningResult::DatasourceError => ResponseType::LearningDatasourceError, + LearningResult::FinishedEmpty => ResponseType::LearningFinishedEmpty, }; match tx.send(AnalyticServiceMessage::Response(lr)).await { diff --git a/server/src/services/analytic_service/analytic_unit/anomaly_analytic_unit.rs b/server/src/services/analytic_service/analytic_unit/anomaly_analytic_unit.rs index b02c6b2..08a82c0 100644 --- a/server/src/services/analytic_service/analytic_unit/anomaly_analytic_unit.rs +++ b/server/src/services/analytic_service/analytic_unit/anomaly_analytic_unit.rs @@ -54,7 +54,6 @@ impl SARIMA { // TODO: ensure capacity with seasonality size let mut res_ts = Vec::<(u64, f64)>::new(); let from = ts[0].0; - // TODO: unwrap -> ? let to = ts.last().unwrap().0; let iter_steps = (self.seasonality / DETECTION_STEP) as usize; @@ -168,22 +167,21 @@ impl AnalyticUnit for AnomalyAnalyticUnit { panic!("Bad config!"); } } - async fn learn(&mut self, ms: MetricService, _ss: SegmentsService) -> anyhow::Result { + async fn learn(&mut self, ms: MetricService, _ss: SegmentsService) -> LearningResult { let mut sarima = SARIMA::new(self.config.seasonality, self.config.confidence, self.config.seasonality_iterations); let utc: DateTime = Utc::now(); let to = utc.timestamp() as u64; let from = to - self.config.seasonality * self.config.seasonality_iterations; - let mr = ms.query(from, to, DETECTION_STEP).await?; + let mr = ms.query(from, to, DETECTION_STEP).await.unwrap(); if mr.data.keys().len() == 0 { - return Ok(LearningResult::FinishedEmpty); + return LearningResult::FinishedEmpty; } - // TODO: unwrap -> ? let k = mr.data.keys().nth(0).unwrap(); let ts = &mr.data[k]; - sarima.learn(ts)?; + sarima.learn(ts).unwrap(); self.sarima = Some(sarima); @@ -191,7 +189,7 @@ impl AnalyticUnit for AnomalyAnalyticUnit { // TODO: load data to learning // TODO: update model to work online - return Ok(LearningResult::Finished); + return LearningResult::Finished; } async fn detect( &self, diff --git a/server/src/services/analytic_service/analytic_unit/pattern_analytic_unit.rs b/server/src/services/analytic_service/analytic_unit/pattern_analytic_unit.rs index f8f153f..84b5212 100644 --- a/server/src/services/analytic_service/analytic_unit/pattern_analytic_unit.rs +++ b/server/src/services/analytic_service/analytic_unit/pattern_analytic_unit.rs @@ -74,7 +74,6 @@ async fn segment_to_segdata(ms: &MetricService, segment: &Segment) -> anyhow::Re }); } - // TODO: unwrap -> ? let k = mr.data.keys().nth(0).unwrap().clone(); let ts = mr.data.remove(&k).unwrap(); @@ -222,7 +221,7 @@ impl AnalyticUnit for PatternAnalyticUnit { } } - async fn learn(&mut self, ms: MetricService, ss: SegmentsService) -> anyhow::Result { + async fn learn(&mut self, ms: MetricService, ss: SegmentsService) -> LearningResult { // TODO: move to config let mut cfg = Config::new(); cfg.set_feature_size(FEATURES_SIZE); @@ -236,14 +235,14 @@ impl AnalyticUnit for PatternAnalyticUnit { cfg.set_training_optimization_level(2); // be careful if decide to store detections in db - let segments = ss.get_segments_inside(0, u64::MAX / 2)?; + let segments = ss.get_segments_inside(0, u64::MAX / 2).unwrap(); let has_segments_label = segments .iter() .find(|s| s.segment_type == SegmentType::Label) .is_some(); if !has_segments_label { - return Ok(LearningResult::FinishedEmpty); + return LearningResult::FinishedEmpty; } let fs = segments.iter().map(|s| segment_to_segdata(&ms, s)); @@ -254,11 +253,11 @@ impl AnalyticUnit for PatternAnalyticUnit { for r in rs { if r.is_err() { - // TODO: custom DatasourceError error type - return Err(anyhow::format_err!("Error extracting metrics from datasource")); + println!("Error extracting metrics from datasource"); + return LearningResult::DatasourceError; } - let sd = r?; + let sd = r.unwrap(); if sd.data.is_empty() { continue; } @@ -270,7 +269,7 @@ impl AnalyticUnit for PatternAnalyticUnit { } if learn_tss.len() == 0 { - return Ok(LearningResult::FinishedEmpty); + return LearningResult::FinishedEmpty; } let mut patterns = Vec::>::new(); @@ -333,7 +332,7 @@ impl AnalyticUnit for PatternAnalyticUnit { avg_pattern_length, }); - return Ok(LearningResult::Finished); + return LearningResult::Finished; } // TODO: get iterator instead of vector diff --git a/server/src/services/analytic_service/analytic_unit/threshold_analytic_unit.rs b/server/src/services/analytic_service/analytic_unit/threshold_analytic_unit.rs index e3745a4..849d006 100644 --- a/server/src/services/analytic_service/analytic_unit/threshold_analytic_unit.rs +++ b/server/src/services/analytic_service/analytic_unit/threshold_analytic_unit.rs @@ -21,8 +21,8 @@ impl ThresholdAnalyticUnit { #[async_trait] impl AnalyticUnit for ThresholdAnalyticUnit { - async fn learn(&mut self, _ms: MetricService, _ss: SegmentsService) -> anyhow::Result { - return Ok(LearningResult::Finished); + async fn learn(&mut self, _ms: MetricService, _ss: SegmentsService) -> LearningResult { + return LearningResult::Finished; } fn set_config(&mut self, config: AnalyticUnitConfig) { diff --git a/server/src/services/analytic_service/analytic_unit/types.rs b/server/src/services/analytic_service/analytic_unit/types.rs index cf89d3c..11f2247 100644 --- a/server/src/services/analytic_service/analytic_unit/types.rs +++ b/server/src/services/analytic_service/analytic_unit/types.rs @@ -125,12 +125,13 @@ impl AnalyticUnitConfig { pub enum LearningResult { Finished, - FinishedEmpty + FinishedEmpty, + DatasourceError, } #[async_trait] pub trait AnalyticUnit { - async fn learn(&mut self, ms: MetricService, ss: SegmentsService) -> anyhow::Result; + async fn learn(&mut self, ms: MetricService, ss: SegmentsService) -> LearningResult; async fn detect( &self, ms: MetricService, diff --git a/server/src/services/analytic_service/types.rs b/server/src/services/analytic_service/types.rs index 922dc46..880d634 100644 --- a/server/src/services/analytic_service/types.rs +++ b/server/src/services/analytic_service/types.rs @@ -45,7 +45,8 @@ impl Default for LearningTrain { pub enum ResponseType { LearningStarted, LearningFinished(Box), - LearningFinishedEmpty + LearningFinishedEmpty, + LearningDatasourceError, } impl fmt::Debug for ResponseType { @@ -62,6 +63,12 @@ pub struct DetectionTask { pub to: u64, } +#[derive(Debug)] +pub struct DetectionRunnerTask { + pub from: u64, +} + + #[derive(Debug, Serialize)] pub struct AnomalyHSRConfig { pub timestamp: u64, @@ -87,6 +94,7 @@ pub struct HSRTask { #[derive(Debug)] pub enum LearningWaiter { Detection(DetectionTask), + DetectionRunner(DetectionRunnerTask), HSR(HSRTask), } @@ -116,5 +124,5 @@ pub enum RequestType { pub enum AnalyticServiceMessage { // Status, Request(RequestType), - Response(anyhow::Result), // Detect { from: u64, to: u64 }, + Response(ResponseType), // Detect { from: u64, to: u64 }, }