From f55d4a5a0d0a0371ef1217034903d6a3631f7b99 Mon Sep 17 00:00:00 2001 From: rozetko Date: Fri, 3 Dec 2021 17:47:34 +0300 Subject: [PATCH] PoC: improve error handling --- .../analytic_service/analytic_service.rs | 59 +++++++++++-------- .../analytic_unit/anomaly_analytic_unit.rs | 12 ++-- .../analytic_unit/pattern_analytic_unit.rs | 17 +++--- .../analytic_unit/threshold_analytic_unit.rs | 4 +- .../analytic_service/analytic_unit/types.rs | 5 +- server/src/services/analytic_service/types.rs | 5 +- 6 files changed, 55 insertions(+), 47 deletions(-) diff --git a/server/src/services/analytic_service/analytic_service.rs b/server/src/services/analytic_service/analytic_service.rs index 9f2592d..e8c1711 100644 --- a/server/src/services/analytic_service/analytic_service.rs +++ b/server/src/services/analytic_service/analytic_service.rs @@ -215,30 +215,33 @@ impl AnalyticService { } // TODO: maybe make `consume_response` async - fn consume_response(&mut self, res: types::ResponseType) { + fn consume_response(&mut self, res: anyhow::Result) { match res { - // TODO: handle when learning panics - ResponseType::LearningStarted => { - self.analytic_unit_learning_status = LearningStatus::Learning - } - ResponseType::LearningFinished(results) => { - self.learning_handler = None; - self.analytic_unit = Some(Arc::new(tokio::sync::RwLock::new(results))); - self.analytic_unit_learning_status = LearningStatus::Ready; - - // TODO: run tasks from self.learning_waiter - while self.learning_waiters.len() > 0 { - let task = self.learning_waiters.pop().unwrap(); - self.run_learning_waiter(task); + Ok(response_type) => { + match response_type { + ResponseType::LearningStarted => { + self.analytic_unit_learning_status = LearningStatus::Learning + } + ResponseType::LearningFinished(results) => { + self.learning_handler = None; + self.analytic_unit = Some(Arc::new(tokio::sync::RwLock::new(results))); + self.analytic_unit_learning_status = LearningStatus::Ready; + + // TODO: run tasks from self.learning_waiter + while self.learning_waiters.len() > 0 { + let task = self.learning_waiters.pop().unwrap(); + self.run_learning_waiter(task); + } + } + ResponseType::LearningFinishedEmpty => { + // TODO: drop all learning_waiters with empty results + self.analytic_unit = None; + self.analytic_unit_learning_status = LearningStatus::Initialization; + } } - } - ResponseType::LearningFinishedEmpty => { - // TODO: drop all learning_waiters with empty results - self.analytic_unit = None; - self.analytic_unit_learning_status = LearningStatus::Initialization; - } - ResponseType::LearningDatasourceError => { - // TODO: drop all learning_waiters with error + }, + // TODO: create custom DatasourceError error type + Err(_) => { self.analytic_unit = None; self.analytic_unit_learning_status = LearningStatus::Error; } @@ -308,7 +311,7 @@ impl AnalyticService { match tx .send(AnalyticServiceMessage::Response( - ResponseType::LearningStarted, + Ok(ResponseType::LearningStarted), )) .await { @@ -318,9 +321,13 @@ impl AnalyticService { // TODO: maybe to spawn_blocking here let lr = match au.learn(ms, ss).await { - LearningResult::Finished => ResponseType::LearningFinished(au), - LearningResult::DatasourceError => ResponseType::LearningDatasourceError, - LearningResult::FinishedEmpty => ResponseType::LearningFinishedEmpty, + Ok(res) => { + match res { + LearningResult::Finished => Ok(ResponseType::LearningFinished(au)), + LearningResult::FinishedEmpty => Ok(ResponseType::LearningFinishedEmpty) + } + } + Err(e) => Err(e) }; match tx.send(AnalyticServiceMessage::Response(lr)).await { diff --git a/server/src/services/analytic_service/analytic_unit/anomaly_analytic_unit.rs b/server/src/services/analytic_service/analytic_unit/anomaly_analytic_unit.rs index 08a82c0..b02c6b2 100644 --- a/server/src/services/analytic_service/analytic_unit/anomaly_analytic_unit.rs +++ b/server/src/services/analytic_service/analytic_unit/anomaly_analytic_unit.rs @@ -54,6 +54,7 @@ impl SARIMA { // TODO: ensure capacity with seasonality size let mut res_ts = Vec::<(u64, f64)>::new(); let from = ts[0].0; + // TODO: unwrap -> ? let to = ts.last().unwrap().0; let iter_steps = (self.seasonality / DETECTION_STEP) as usize; @@ -167,21 +168,22 @@ impl AnalyticUnit for AnomalyAnalyticUnit { panic!("Bad config!"); } } - async fn learn(&mut self, ms: MetricService, _ss: SegmentsService) -> LearningResult { + async fn learn(&mut self, ms: MetricService, _ss: SegmentsService) -> anyhow::Result { let mut sarima = SARIMA::new(self.config.seasonality, self.config.confidence, self.config.seasonality_iterations); let utc: DateTime = Utc::now(); let to = utc.timestamp() as u64; let from = to - self.config.seasonality * self.config.seasonality_iterations; - let mr = ms.query(from, to, DETECTION_STEP).await.unwrap(); + let mr = ms.query(from, to, DETECTION_STEP).await?; if mr.data.keys().len() == 0 { - return LearningResult::FinishedEmpty; + return Ok(LearningResult::FinishedEmpty); } + // TODO: unwrap -> ? let k = mr.data.keys().nth(0).unwrap(); let ts = &mr.data[k]; - sarima.learn(ts).unwrap(); + sarima.learn(ts)?; self.sarima = Some(sarima); @@ -189,7 +191,7 @@ impl AnalyticUnit for AnomalyAnalyticUnit { // TODO: load data to learning // TODO: update model to work online - return LearningResult::Finished; + return Ok(LearningResult::Finished); } async fn detect( &self, diff --git a/server/src/services/analytic_service/analytic_unit/pattern_analytic_unit.rs b/server/src/services/analytic_service/analytic_unit/pattern_analytic_unit.rs index 84b5212..f8f153f 100644 --- a/server/src/services/analytic_service/analytic_unit/pattern_analytic_unit.rs +++ b/server/src/services/analytic_service/analytic_unit/pattern_analytic_unit.rs @@ -74,6 +74,7 @@ async fn segment_to_segdata(ms: &MetricService, segment: &Segment) -> anyhow::Re }); } + // TODO: unwrap -> ? let k = mr.data.keys().nth(0).unwrap().clone(); let ts = mr.data.remove(&k).unwrap(); @@ -221,7 +222,7 @@ impl AnalyticUnit for PatternAnalyticUnit { } } - async fn learn(&mut self, ms: MetricService, ss: SegmentsService) -> LearningResult { + async fn learn(&mut self, ms: MetricService, ss: SegmentsService) -> anyhow::Result { // TODO: move to config let mut cfg = Config::new(); cfg.set_feature_size(FEATURES_SIZE); @@ -235,14 +236,14 @@ impl AnalyticUnit for PatternAnalyticUnit { cfg.set_training_optimization_level(2); // be careful if decide to store detections in db - let segments = ss.get_segments_inside(0, u64::MAX / 2).unwrap(); + let segments = ss.get_segments_inside(0, u64::MAX / 2)?; let has_segments_label = segments .iter() .find(|s| s.segment_type == SegmentType::Label) .is_some(); if !has_segments_label { - return LearningResult::FinishedEmpty; + return Ok(LearningResult::FinishedEmpty); } let fs = segments.iter().map(|s| segment_to_segdata(&ms, s)); @@ -253,11 +254,11 @@ impl AnalyticUnit for PatternAnalyticUnit { for r in rs { if r.is_err() { - println!("Error extracting metrics from datasource"); - return LearningResult::DatasourceError; + // TODO: custom DatasourceError error type + return Err(anyhow::format_err!("Error extracting metrics from datasource")); } - let sd = r.unwrap(); + let sd = r?; if sd.data.is_empty() { continue; } @@ -269,7 +270,7 @@ impl AnalyticUnit for PatternAnalyticUnit { } if learn_tss.len() == 0 { - return LearningResult::FinishedEmpty; + return Ok(LearningResult::FinishedEmpty); } let mut patterns = Vec::>::new(); @@ -332,7 +333,7 @@ impl AnalyticUnit for PatternAnalyticUnit { avg_pattern_length, }); - return LearningResult::Finished; + return Ok(LearningResult::Finished); } // TODO: get iterator instead of vector diff --git a/server/src/services/analytic_service/analytic_unit/threshold_analytic_unit.rs b/server/src/services/analytic_service/analytic_unit/threshold_analytic_unit.rs index 849d006..e3745a4 100644 --- a/server/src/services/analytic_service/analytic_unit/threshold_analytic_unit.rs +++ b/server/src/services/analytic_service/analytic_unit/threshold_analytic_unit.rs @@ -21,8 +21,8 @@ impl ThresholdAnalyticUnit { #[async_trait] impl AnalyticUnit for ThresholdAnalyticUnit { - async fn learn(&mut self, _ms: MetricService, _ss: SegmentsService) -> LearningResult { - return LearningResult::Finished; + async fn learn(&mut self, _ms: MetricService, _ss: SegmentsService) -> anyhow::Result { + return Ok(LearningResult::Finished); } fn set_config(&mut self, config: AnalyticUnitConfig) { diff --git a/server/src/services/analytic_service/analytic_unit/types.rs b/server/src/services/analytic_service/analytic_unit/types.rs index 11f2247..cf89d3c 100644 --- a/server/src/services/analytic_service/analytic_unit/types.rs +++ b/server/src/services/analytic_service/analytic_unit/types.rs @@ -125,13 +125,12 @@ impl AnalyticUnitConfig { pub enum LearningResult { Finished, - FinishedEmpty, - DatasourceError, + FinishedEmpty } #[async_trait] pub trait AnalyticUnit { - async fn learn(&mut self, ms: MetricService, ss: SegmentsService) -> LearningResult; + async fn learn(&mut self, ms: MetricService, ss: SegmentsService) -> anyhow::Result; async fn detect( &self, ms: MetricService, diff --git a/server/src/services/analytic_service/types.rs b/server/src/services/analytic_service/types.rs index 2b702a9..922dc46 100644 --- a/server/src/services/analytic_service/types.rs +++ b/server/src/services/analytic_service/types.rs @@ -45,8 +45,7 @@ impl Default for LearningTrain { pub enum ResponseType { LearningStarted, LearningFinished(Box), - LearningFinishedEmpty, - LearningDatasourceError, + LearningFinishedEmpty } impl fmt::Debug for ResponseType { @@ -117,5 +116,5 @@ pub enum RequestType { pub enum AnalyticServiceMessage { // Status, Request(RequestType), - Response(ResponseType), // Detect { from: u64, to: u64 }, + Response(anyhow::Result), // Detect { from: u64, to: u64 }, }