Browse Source

PoC: improve error handling

pull/45/head
rozetko 2 years ago
parent
commit
f55d4a5a0d
  1. 59
      server/src/services/analytic_service/analytic_service.rs
  2. 12
      server/src/services/analytic_service/analytic_unit/anomaly_analytic_unit.rs
  3. 17
      server/src/services/analytic_service/analytic_unit/pattern_analytic_unit.rs
  4. 4
      server/src/services/analytic_service/analytic_unit/threshold_analytic_unit.rs
  5. 5
      server/src/services/analytic_service/analytic_unit/types.rs
  6. 5
      server/src/services/analytic_service/types.rs

59
server/src/services/analytic_service/analytic_service.rs

@ -215,30 +215,33 @@ impl AnalyticService {
}
// TODO: maybe make `consume_response` async
fn consume_response(&mut self, res: types::ResponseType) {
fn consume_response(&mut self, res: anyhow::Result<types::ResponseType>) {
match res {
// TODO: handle when learning panics
ResponseType::LearningStarted => {
self.analytic_unit_learning_status = LearningStatus::Learning
}
ResponseType::LearningFinished(results) => {
self.learning_handler = None;
self.analytic_unit = Some(Arc::new(tokio::sync::RwLock::new(results)));
self.analytic_unit_learning_status = LearningStatus::Ready;
// TODO: run tasks from self.learning_waiter
while self.learning_waiters.len() > 0 {
let task = self.learning_waiters.pop().unwrap();
self.run_learning_waiter(task);
Ok(response_type) => {
match response_type {
ResponseType::LearningStarted => {
self.analytic_unit_learning_status = LearningStatus::Learning
}
ResponseType::LearningFinished(results) => {
self.learning_handler = None;
self.analytic_unit = Some(Arc::new(tokio::sync::RwLock::new(results)));
self.analytic_unit_learning_status = LearningStatus::Ready;
// TODO: run tasks from self.learning_waiter
while self.learning_waiters.len() > 0 {
let task = self.learning_waiters.pop().unwrap();
self.run_learning_waiter(task);
}
}
ResponseType::LearningFinishedEmpty => {
// TODO: drop all learning_waiters with empty results
self.analytic_unit = None;
self.analytic_unit_learning_status = LearningStatus::Initialization;
}
}
}
ResponseType::LearningFinishedEmpty => {
// TODO: drop all learning_waiters with empty results
self.analytic_unit = None;
self.analytic_unit_learning_status = LearningStatus::Initialization;
}
ResponseType::LearningDatasourceError => {
// TODO: drop all learning_waiters with error
},
// TODO: create custom DatasourceError error type
Err(_) => {
self.analytic_unit = None;
self.analytic_unit_learning_status = LearningStatus::Error;
}
@ -308,7 +311,7 @@ impl AnalyticService {
match tx
.send(AnalyticServiceMessage::Response(
ResponseType::LearningStarted,
Ok(ResponseType::LearningStarted),
))
.await
{
@ -318,9 +321,13 @@ impl AnalyticService {
// TODO: maybe to spawn_blocking here
let lr = match au.learn(ms, ss).await {
LearningResult::Finished => ResponseType::LearningFinished(au),
LearningResult::DatasourceError => ResponseType::LearningDatasourceError,
LearningResult::FinishedEmpty => ResponseType::LearningFinishedEmpty,
Ok(res) => {
match res {
LearningResult::Finished => Ok(ResponseType::LearningFinished(au)),
LearningResult::FinishedEmpty => Ok(ResponseType::LearningFinishedEmpty)
}
}
Err(e) => Err(e)
};
match tx.send(AnalyticServiceMessage::Response(lr)).await {

12
server/src/services/analytic_service/analytic_unit/anomaly_analytic_unit.rs

@ -54,6 +54,7 @@ impl SARIMA {
// TODO: ensure capacity with seasonality size
let mut res_ts = Vec::<(u64, f64)>::new();
let from = ts[0].0;
// TODO: unwrap -> ?
let to = ts.last().unwrap().0;
let iter_steps = (self.seasonality / DETECTION_STEP) as usize;
@ -167,21 +168,22 @@ impl AnalyticUnit for AnomalyAnalyticUnit {
panic!("Bad config!");
}
}
async fn learn(&mut self, ms: MetricService, _ss: SegmentsService) -> LearningResult {
async fn learn(&mut self, ms: MetricService, _ss: SegmentsService) -> anyhow::Result<LearningResult> {
let mut sarima = SARIMA::new(self.config.seasonality, self.config.confidence, self.config.seasonality_iterations);
let utc: DateTime<Utc> = Utc::now();
let to = utc.timestamp() as u64;
let from = to - self.config.seasonality * self.config.seasonality_iterations;
let mr = ms.query(from, to, DETECTION_STEP).await.unwrap();
let mr = ms.query(from, to, DETECTION_STEP).await?;
if mr.data.keys().len() == 0 {
return LearningResult::FinishedEmpty;
return Ok(LearningResult::FinishedEmpty);
}
// TODO: unwrap -> ?
let k = mr.data.keys().nth(0).unwrap();
let ts = &mr.data[k];
sarima.learn(ts).unwrap();
sarima.learn(ts)?;
self.sarima = Some(sarima);
@ -189,7 +191,7 @@ impl AnalyticUnit for AnomalyAnalyticUnit {
// TODO: load data to learning
// TODO: update model to work online
return LearningResult::Finished;
return Ok(LearningResult::Finished);
}
async fn detect(
&self,

17
server/src/services/analytic_service/analytic_unit/pattern_analytic_unit.rs

@ -74,6 +74,7 @@ async fn segment_to_segdata(ms: &MetricService, segment: &Segment) -> anyhow::Re
});
}
// TODO: unwrap -> ?
let k = mr.data.keys().nth(0).unwrap().clone();
let ts = mr.data.remove(&k).unwrap();
@ -221,7 +222,7 @@ impl AnalyticUnit for PatternAnalyticUnit {
}
}
async fn learn(&mut self, ms: MetricService, ss: SegmentsService) -> LearningResult {
async fn learn(&mut self, ms: MetricService, ss: SegmentsService) -> anyhow::Result<LearningResult> {
// TODO: move to config
let mut cfg = Config::new();
cfg.set_feature_size(FEATURES_SIZE);
@ -235,14 +236,14 @@ impl AnalyticUnit for PatternAnalyticUnit {
cfg.set_training_optimization_level(2);
// be careful if decide to store detections in db
let segments = ss.get_segments_inside(0, u64::MAX / 2).unwrap();
let segments = ss.get_segments_inside(0, u64::MAX / 2)?;
let has_segments_label = segments
.iter()
.find(|s| s.segment_type == SegmentType::Label)
.is_some();
if !has_segments_label {
return LearningResult::FinishedEmpty;
return Ok(LearningResult::FinishedEmpty);
}
let fs = segments.iter().map(|s| segment_to_segdata(&ms, s));
@ -253,11 +254,11 @@ impl AnalyticUnit for PatternAnalyticUnit {
for r in rs {
if r.is_err() {
println!("Error extracting metrics from datasource");
return LearningResult::DatasourceError;
// TODO: custom DatasourceError error type
return Err(anyhow::format_err!("Error extracting metrics from datasource"));
}
let sd = r.unwrap();
let sd = r?;
if sd.data.is_empty() {
continue;
}
@ -269,7 +270,7 @@ impl AnalyticUnit for PatternAnalyticUnit {
}
if learn_tss.len() == 0 {
return LearningResult::FinishedEmpty;
return Ok(LearningResult::FinishedEmpty);
}
let mut patterns = Vec::<Vec<f64>>::new();
@ -332,7 +333,7 @@ impl AnalyticUnit for PatternAnalyticUnit {
avg_pattern_length,
});
return LearningResult::Finished;
return Ok(LearningResult::Finished);
}
// TODO: get iterator instead of vector

4
server/src/services/analytic_service/analytic_unit/threshold_analytic_unit.rs

@ -21,8 +21,8 @@ impl ThresholdAnalyticUnit {
#[async_trait]
impl AnalyticUnit for ThresholdAnalyticUnit {
async fn learn(&mut self, _ms: MetricService, _ss: SegmentsService) -> LearningResult {
return LearningResult::Finished;
async fn learn(&mut self, _ms: MetricService, _ss: SegmentsService) -> anyhow::Result<LearningResult> {
return Ok(LearningResult::Finished);
}
fn set_config(&mut self, config: AnalyticUnitConfig) {

5
server/src/services/analytic_service/analytic_unit/types.rs

@ -125,13 +125,12 @@ impl AnalyticUnitConfig {
pub enum LearningResult {
Finished,
FinishedEmpty,
DatasourceError,
FinishedEmpty
}
#[async_trait]
pub trait AnalyticUnit {
async fn learn(&mut self, ms: MetricService, ss: SegmentsService) -> LearningResult;
async fn learn(&mut self, ms: MetricService, ss: SegmentsService) -> anyhow::Result<LearningResult>;
async fn detect(
&self,
ms: MetricService,

5
server/src/services/analytic_service/types.rs

@ -45,8 +45,7 @@ impl Default for LearningTrain {
pub enum ResponseType {
LearningStarted,
LearningFinished(Box<dyn AnalyticUnit + Send + Sync>),
LearningFinishedEmpty,
LearningDatasourceError,
LearningFinishedEmpty
}
impl fmt::Debug for ResponseType {
@ -117,5 +116,5 @@ pub enum RequestType {
pub enum AnalyticServiceMessage {
// Status,
Request(RequestType),
Response(ResponseType), // Detect { from: u64, to: u64 },
Response(anyhow::Result<ResponseType>), // Detect { from: u64, to: u64 },
}

Loading…
Cancel
Save