|
|
|
@ -215,30 +215,33 @@ impl AnalyticService {
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// TODO: maybe make `consume_response` async
|
|
|
|
|
fn consume_response(&mut self, res: types::ResponseType) { |
|
|
|
|
fn consume_response(&mut self, res: anyhow::Result<types::ResponseType>) { |
|
|
|
|
match res { |
|
|
|
|
// TODO: handle when learning panics
|
|
|
|
|
ResponseType::LearningStarted => { |
|
|
|
|
self.analytic_unit_learning_status = LearningStatus::Learning |
|
|
|
|
} |
|
|
|
|
ResponseType::LearningFinished(results) => { |
|
|
|
|
self.learning_handler = None; |
|
|
|
|
self.analytic_unit = Some(Arc::new(tokio::sync::RwLock::new(results))); |
|
|
|
|
self.analytic_unit_learning_status = LearningStatus::Ready; |
|
|
|
|
|
|
|
|
|
// TODO: run tasks from self.learning_waiter
|
|
|
|
|
while self.learning_waiters.len() > 0 { |
|
|
|
|
let task = self.learning_waiters.pop().unwrap(); |
|
|
|
|
self.run_learning_waiter(task); |
|
|
|
|
Ok(response_type) => { |
|
|
|
|
match response_type { |
|
|
|
|
ResponseType::LearningStarted => { |
|
|
|
|
self.analytic_unit_learning_status = LearningStatus::Learning |
|
|
|
|
} |
|
|
|
|
ResponseType::LearningFinished(results) => { |
|
|
|
|
self.learning_handler = None; |
|
|
|
|
self.analytic_unit = Some(Arc::new(tokio::sync::RwLock::new(results))); |
|
|
|
|
self.analytic_unit_learning_status = LearningStatus::Ready; |
|
|
|
|
|
|
|
|
|
// TODO: run tasks from self.learning_waiter
|
|
|
|
|
while self.learning_waiters.len() > 0 { |
|
|
|
|
let task = self.learning_waiters.pop().unwrap(); |
|
|
|
|
self.run_learning_waiter(task); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
ResponseType::LearningFinishedEmpty => { |
|
|
|
|
// TODO: drop all learning_waiters with empty results
|
|
|
|
|
self.analytic_unit = None; |
|
|
|
|
self.analytic_unit_learning_status = LearningStatus::Initialization; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
ResponseType::LearningFinishedEmpty => { |
|
|
|
|
// TODO: drop all learning_waiters with empty results
|
|
|
|
|
self.analytic_unit = None; |
|
|
|
|
self.analytic_unit_learning_status = LearningStatus::Initialization; |
|
|
|
|
} |
|
|
|
|
ResponseType::LearningDatasourceError => { |
|
|
|
|
// TODO: drop all learning_waiters with error
|
|
|
|
|
}, |
|
|
|
|
// TODO: create custom DatasourceError error type
|
|
|
|
|
Err(_) => { |
|
|
|
|
self.analytic_unit = None; |
|
|
|
|
self.analytic_unit_learning_status = LearningStatus::Error; |
|
|
|
|
} |
|
|
|
@ -308,7 +311,7 @@ impl AnalyticService {
|
|
|
|
|
|
|
|
|
|
match tx |
|
|
|
|
.send(AnalyticServiceMessage::Response( |
|
|
|
|
ResponseType::LearningStarted, |
|
|
|
|
Ok(ResponseType::LearningStarted), |
|
|
|
|
)) |
|
|
|
|
.await |
|
|
|
|
{ |
|
|
|
@ -318,9 +321,13 @@ impl AnalyticService {
|
|
|
|
|
|
|
|
|
|
// TODO: maybe to spawn_blocking here
|
|
|
|
|
let lr = match au.learn(ms, ss).await { |
|
|
|
|
LearningResult::Finished => ResponseType::LearningFinished(au), |
|
|
|
|
LearningResult::DatasourceError => ResponseType::LearningDatasourceError, |
|
|
|
|
LearningResult::FinishedEmpty => ResponseType::LearningFinishedEmpty, |
|
|
|
|
Ok(res) => { |
|
|
|
|
match res { |
|
|
|
|
LearningResult::Finished => Ok(ResponseType::LearningFinished(au)), |
|
|
|
|
LearningResult::FinishedEmpty => Ok(ResponseType::LearningFinishedEmpty) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
Err(e) => Err(e) |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
match tx.send(AnalyticServiceMessage::Response(lr)).await { |
|
|
|
|