|
|
|
@ -2,17 +2,14 @@ use std::sync::Arc;
|
|
|
|
|
|
|
|
|
|
use super::analytic_unit::types::{AnalyticUnitConfig, PatchConfig}; |
|
|
|
|
use super::detection_runner::DetectionRunner; |
|
|
|
|
use super::types::{ |
|
|
|
|
self, AnalyticUnitRF, DetectionRunnerConfig, DetectionRunnerTask, LearningWaiter, HSR, |
|
|
|
|
}; |
|
|
|
|
use super::types::{self, AnalyticUnitRF, DetectionRunnerConfig, LearningWaiter, HSR}; |
|
|
|
|
use super::{ |
|
|
|
|
analytic_client::AnalyticClient, |
|
|
|
|
types::{AnalyticServiceMessage, LearningStatus, RequestType, ResponseType}, |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
use crate::config::{AlertingConfig, AlertingType}; |
|
|
|
|
|
|
|
|
|
use crate::services::analytic_unit_service::AnalyticUnitService; |
|
|
|
|
use crate::config::AlertingConfig; |
|
|
|
|
use crate::services::analytic_service::analytic_unit::resolve; |
|
|
|
|
use crate::services::{ |
|
|
|
|
metric_service::MetricService, |
|
|
|
|
segments_service::{self, Segment, SegmentType, SegmentsService, ID_LENGTH}, |
|
|
|
@ -23,7 +20,6 @@ use crate::services::analytic_service::analytic_unit::types::{AnalyticUnit, Lear
|
|
|
|
|
|
|
|
|
|
use anyhow; |
|
|
|
|
|
|
|
|
|
use chrono::{DateTime, Utc}; |
|
|
|
|
use tokio::sync::{mpsc, oneshot}; |
|
|
|
|
|
|
|
|
|
// TODO: now it's basically single analytic unit, service will operate on many AU
|
|
|
|
@ -31,7 +27,6 @@ use tokio::sync::{mpsc, oneshot};
|
|
|
|
|
pub struct AnalyticService { |
|
|
|
|
metric_service: MetricService, |
|
|
|
|
segments_service: SegmentsService, |
|
|
|
|
analytic_unit_service: AnalyticUnitService, |
|
|
|
|
|
|
|
|
|
alerting: Option<AlertingConfig>, |
|
|
|
|
|
|
|
|
@ -39,7 +34,6 @@ pub struct AnalyticService {
|
|
|
|
|
analytic_unit_config: AnalyticUnitConfig, |
|
|
|
|
analytic_unit_learning_status: LearningStatus, |
|
|
|
|
|
|
|
|
|
// TODO: add comment about how it's used
|
|
|
|
|
tx: mpsc::Sender<AnalyticServiceMessage>, |
|
|
|
|
rx: mpsc::Receiver<AnalyticServiceMessage>, |
|
|
|
|
|
|
|
|
@ -54,25 +48,21 @@ pub struct AnalyticService {
|
|
|
|
|
|
|
|
|
|
impl AnalyticService { |
|
|
|
|
pub fn new( |
|
|
|
|
analytic_unit_service: AnalyticUnitService, |
|
|
|
|
metric_service: MetricService, |
|
|
|
|
segments_service: segments_service::SegmentsService, |
|
|
|
|
alerting: Option<AlertingConfig>, |
|
|
|
|
) -> AnalyticService { |
|
|
|
|
// TODO: move buffer size to config
|
|
|
|
|
let (tx, rx) = mpsc::channel::<AnalyticServiceMessage>(32); |
|
|
|
|
|
|
|
|
|
let aus = analytic_unit_service.clone(); |
|
|
|
|
|
|
|
|
|
AnalyticService { |
|
|
|
|
analytic_unit_service: aus, |
|
|
|
|
metric_service, |
|
|
|
|
segments_service, |
|
|
|
|
|
|
|
|
|
alerting, |
|
|
|
|
|
|
|
|
|
// TODO: get it from persistance
|
|
|
|
|
analytic_unit: None, |
|
|
|
|
analytic_unit_config: analytic_unit_service.get_active_config().unwrap(), |
|
|
|
|
analytic_unit_config: AnalyticUnitConfig::Pattern(Default::default()), |
|
|
|
|
|
|
|
|
|
analytic_unit_learning_status: LearningStatus::Initialization, |
|
|
|
|
tx, |
|
|
|
@ -92,56 +82,28 @@ impl AnalyticService {
|
|
|
|
|
AnalyticClient::new(self.tx.clone()) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
fn run_learning_waiter(&mut self, learning_waiter: LearningWaiter) { |
|
|
|
|
fn run_learning_waiter(&self, learning_waiter: LearningWaiter) { |
|
|
|
|
// TODO: save handler of the task
|
|
|
|
|
match learning_waiter { |
|
|
|
|
LearningWaiter::Detection(task) => { |
|
|
|
|
tokio::spawn({ |
|
|
|
|
let ms = self.metric_service.clone(); |
|
|
|
|
let au = self.analytic_unit.as_ref().unwrap().clone(); |
|
|
|
|
async move { |
|
|
|
|
tokio::spawn({ |
|
|
|
|
let ms = self.metric_service.clone(); |
|
|
|
|
let au = self.analytic_unit.as_ref().unwrap().clone(); |
|
|
|
|
async move { |
|
|
|
|
match learning_waiter { |
|
|
|
|
LearningWaiter::Detection(task) => { |
|
|
|
|
AnalyticService::get_detections(task.sender, au, ms, task.from, task.to) |
|
|
|
|
.await |
|
|
|
|
} |
|
|
|
|
}); |
|
|
|
|
} |
|
|
|
|
LearningWaiter::HSR(task) => { |
|
|
|
|
tokio::spawn({ |
|
|
|
|
let ms = self.metric_service.clone(); |
|
|
|
|
let au = self.analytic_unit.as_ref().unwrap().clone(); |
|
|
|
|
async move { |
|
|
|
|
LearningWaiter::HSR(task) => { |
|
|
|
|
AnalyticService::get_hsr(task.sender, au, ms, task.from, task.to).await |
|
|
|
|
} |
|
|
|
|
}); |
|
|
|
|
} |
|
|
|
|
LearningWaiter::DetectionRunner(task) => { |
|
|
|
|
self.run_detection_runner(task.from); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
}); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
fn run_detection_runner(&mut self, from: u64) { |
|
|
|
|
// TODO: handle case or make it impossible to run_detection_runner second time
|
|
|
|
|
|
|
|
|
|
if self.analytic_unit_learning_status != LearningStatus::Ready { |
|
|
|
|
let task = DetectionRunnerTask { from }; |
|
|
|
|
self.learning_waiters |
|
|
|
|
.push(LearningWaiter::DetectionRunner(task)); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
let AlertingType::Webhook(acfg) = self.alerting.as_ref().unwrap().alerting_type.clone(); |
|
|
|
|
let drcfg = DetectionRunnerConfig { |
|
|
|
|
endpoint: acfg.endpoint.clone(), |
|
|
|
|
interval: self.alerting.as_ref().unwrap().interval, |
|
|
|
|
}; |
|
|
|
|
let tx = self.tx.clone(); |
|
|
|
|
let au = self.analytic_unit.as_ref().unwrap().clone(); |
|
|
|
|
let dr = DetectionRunner::new(tx,self.metric_service.clone(), drcfg, au); |
|
|
|
|
self.detection_runner = Some(dr); |
|
|
|
|
self.detection_runner.as_mut().unwrap().run(from); |
|
|
|
|
|
|
|
|
|
// TODO: rerun detection runner on analytic unit change (by setting analytic unit)
|
|
|
|
|
fn run_detection_runner(&mut self) { |
|
|
|
|
// TODO: create DetectionRunnerConfig from alerting
|
|
|
|
|
// TODO: rerun detection runner on analytic unit change
|
|
|
|
|
// if self.runner_handler.is_some() {
|
|
|
|
|
// self.runner_handler.as_mut().unwrap().abort();
|
|
|
|
|
// }
|
|
|
|
@ -167,12 +129,11 @@ impl AnalyticService {
|
|
|
|
|
self.learning_handler = Some(tokio::spawn({ |
|
|
|
|
self.analytic_unit_learning_status = LearningStatus::Starting; |
|
|
|
|
let tx = self.tx.clone(); |
|
|
|
|
let aus = self.analytic_unit_service.clone(); |
|
|
|
|
let ms = self.metric_service.clone(); |
|
|
|
|
let ss = self.segments_service.clone(); |
|
|
|
|
let cfg = self.analytic_unit_config.clone(); |
|
|
|
|
async move { |
|
|
|
|
AnalyticService::run_learning(tx, cfg, aus, ms, ss).await; |
|
|
|
|
AnalyticService::run_learning(tx, cfg, ms, ss).await; |
|
|
|
|
} |
|
|
|
|
})); |
|
|
|
|
} |
|
|
|
@ -233,132 +194,80 @@ impl AnalyticService {
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// TODO: maybe make `consume_response` async
|
|
|
|
|
fn consume_response(&mut self, res: anyhow::Result<types::ResponseType>) { |
|
|
|
|
fn consume_response(&mut self, res: types::ResponseType) { |
|
|
|
|
match res { |
|
|
|
|
Ok(response_type) => { |
|
|
|
|
match response_type { |
|
|
|
|
ResponseType::DetectionRunnerStarted(from) => { |
|
|
|
|
println!("Detection runner started from {}", from) |
|
|
|
|
} |
|
|
|
|
ResponseType::DetectionRunnerUpdate(id, timestamp) => { |
|
|
|
|
self.analytic_unit_service |
|
|
|
|
.set_last_detection(id, timestamp) |
|
|
|
|
.unwrap(); |
|
|
|
|
} |
|
|
|
|
ResponseType::DetectionRunnerDetection(from, to) => { |
|
|
|
|
println!("detection: {} {}", from, to); |
|
|
|
|
} |
|
|
|
|
ResponseType::LearningStarted => { |
|
|
|
|
self.analytic_unit_learning_status = LearningStatus::Learning |
|
|
|
|
} |
|
|
|
|
ResponseType::LearningFinished(results) => { |
|
|
|
|
self.learning_handler = None; |
|
|
|
|
self.analytic_unit = Some(Arc::new(tokio::sync::RwLock::new(results))); |
|
|
|
|
self.analytic_unit_learning_status = LearningStatus::Ready; |
|
|
|
|
|
|
|
|
|
// TODO: run tasks from self.learning_waiter
|
|
|
|
|
while self.learning_waiters.len() > 0 { |
|
|
|
|
let task = self.learning_waiters.pop().unwrap(); |
|
|
|
|
self.run_learning_waiter(task); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
ResponseType::LearningFinishedEmpty => { |
|
|
|
|
// TODO: drop all learning_waiters with empty results
|
|
|
|
|
self.analytic_unit = None; |
|
|
|
|
self.analytic_unit_learning_status = LearningStatus::Initialization; |
|
|
|
|
} |
|
|
|
|
// TODO: handle when learning panics
|
|
|
|
|
ResponseType::LearningStarted => { |
|
|
|
|
self.analytic_unit_learning_status = LearningStatus::Learning |
|
|
|
|
} |
|
|
|
|
ResponseType::LearningFinished(results) => { |
|
|
|
|
self.learning_handler = None; |
|
|
|
|
self.analytic_unit = Some(Arc::new(tokio::sync::RwLock::new(results))); |
|
|
|
|
self.analytic_unit_learning_status = LearningStatus::Ready; |
|
|
|
|
|
|
|
|
|
// TODO: run tasks from self.learning_waiter
|
|
|
|
|
while self.learning_waiters.len() > 0 { |
|
|
|
|
let task = self.learning_waiters.pop().unwrap(); |
|
|
|
|
self.run_learning_waiter(task); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
// TODO: create custom DatasourceError error type
|
|
|
|
|
Err(err) => { |
|
|
|
|
ResponseType::LearningFinishedEmpty => { |
|
|
|
|
// TODO: drop all learning_waiters with empty results
|
|
|
|
|
self.analytic_unit = None; |
|
|
|
|
self.analytic_unit_learning_status = LearningStatus::Error(err.to_string()); |
|
|
|
|
self.analytic_unit_learning_status = LearningStatus::Initialization; |
|
|
|
|
} |
|
|
|
|
ResponseType::LearningDatasourceError => { |
|
|
|
|
// TODO: drop all learning_waiters with error
|
|
|
|
|
self.analytic_unit = None; |
|
|
|
|
self.analytic_unit_learning_status = LearningStatus::Error; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
fn patch_config(&mut self, patch: PatchConfig, tx: oneshot::Sender<()>) { |
|
|
|
|
let my_id = self |
|
|
|
|
.analytic_unit_service |
|
|
|
|
.get_config_id(&self.analytic_unit_config); |
|
|
|
|
|
|
|
|
|
let patch_id = patch.get_type_id(); |
|
|
|
|
|
|
|
|
|
let same_type = my_id == patch_id; |
|
|
|
|
|
|
|
|
|
// TODO: need_learning and same_type logic overlaps, there is a way to optimise this
|
|
|
|
|
let need_learning = self.analytic_unit_config.patch_needs_learning(&patch); |
|
|
|
|
|
|
|
|
|
if same_type { |
|
|
|
|
// TODO: check when learning should be started
|
|
|
|
|
let new_conf = patch.get_new_config(); |
|
|
|
|
self.analytic_unit_config = new_conf.clone(); |
|
|
|
|
self.analytic_unit_service |
|
|
|
|
.update_config_by_id(&my_id, &new_conf) |
|
|
|
|
.unwrap(); |
|
|
|
|
|
|
|
|
|
let (new_conf, need_learning) = self.analytic_unit_config.patch(patch); |
|
|
|
|
self.analytic_unit_config = new_conf; |
|
|
|
|
if need_learning { |
|
|
|
|
self.consume_request(RequestType::RunLearning); |
|
|
|
|
// TODO: it's not fullu correct: we need to wait when the learning starts
|
|
|
|
|
match tx.send(()) { |
|
|
|
|
Ok(_) => {} |
|
|
|
|
Err(_e) => { |
|
|
|
|
println!("Can`t send patch config notification"); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
if self.analytic_unit.is_some() { |
|
|
|
|
if need_learning { |
|
|
|
|
self.consume_request(RequestType::RunLearning); |
|
|
|
|
match tx.send(()) { |
|
|
|
|
Ok(_) => {} |
|
|
|
|
Err(e) => { |
|
|
|
|
println!("Can`t send patch config notification"); |
|
|
|
|
println!("{:?}", e); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
return; |
|
|
|
|
} else { |
|
|
|
|
tokio::spawn({ |
|
|
|
|
let au = self.analytic_unit.clone(); |
|
|
|
|
let cfg = self.analytic_unit_config.clone(); |
|
|
|
|
async move { |
|
|
|
|
au.unwrap().write().await.set_config(cfg); |
|
|
|
|
match tx.send(()) { |
|
|
|
|
Ok(_) => {} |
|
|
|
|
Err(e) => { |
|
|
|
|
println!("Can`t send patch config notification"); |
|
|
|
|
println!("{:?}", e); |
|
|
|
|
} |
|
|
|
|
tokio::spawn({ |
|
|
|
|
let au = self.analytic_unit.clone(); |
|
|
|
|
let cfg = self.analytic_unit_config.clone(); |
|
|
|
|
async move { |
|
|
|
|
au.unwrap().write().await.set_config(cfg); |
|
|
|
|
match tx.send(()) { |
|
|
|
|
Ok(_) => {} |
|
|
|
|
Err(_e) => { |
|
|
|
|
println!("Can`t send patch config notification"); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
}); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
}); |
|
|
|
|
} else { |
|
|
|
|
// TODO: check if we need this else
|
|
|
|
|
match tx.send(()) { |
|
|
|
|
Ok(_) => {} |
|
|
|
|
Err(e) => { |
|
|
|
|
Err(_e) => { |
|
|
|
|
println!("Can`t send patch config notification"); |
|
|
|
|
println!("{:?}", e); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
let new_conf = self |
|
|
|
|
.analytic_unit_service |
|
|
|
|
.get_config_by_id(&patch_id) |
|
|
|
|
.unwrap(); |
|
|
|
|
self.analytic_unit_config = new_conf.clone(); |
|
|
|
|
self.consume_request(RequestType::RunLearning); |
|
|
|
|
match tx.send(()) { |
|
|
|
|
Ok(_) => {} |
|
|
|
|
Err(e) => { |
|
|
|
|
println!("Can`t send patch config notification"); |
|
|
|
|
println!("{:?}", e); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
pub async fn serve(&mut self) { |
|
|
|
|
// TODO: remove this hack
|
|
|
|
|
self.consume_request(RequestType::RunLearning); |
|
|
|
|
// TODO: start detection runner if
|
|
|
|
|
if self.alerting.is_some() { |
|
|
|
|
// TODO: get it from persistance
|
|
|
|
|
let now: DateTime<Utc> = Utc::now(); |
|
|
|
|
let from = now.timestamp() as u64; |
|
|
|
|
self.run_detection_runner(from); |
|
|
|
|
self.run_detection_runner(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
while let Some(message) = self.rx.recv().await { |
|
|
|
@ -372,21 +281,15 @@ impl AnalyticService {
|
|
|
|
|
async fn run_learning( |
|
|
|
|
tx: mpsc::Sender<AnalyticServiceMessage>, |
|
|
|
|
aucfg: AnalyticUnitConfig, |
|
|
|
|
aus: AnalyticUnitService, |
|
|
|
|
ms: MetricService, |
|
|
|
|
ss: SegmentsService, |
|
|
|
|
) { |
|
|
|
|
let mut au = match aus.resolve(&aucfg) { |
|
|
|
|
Ok(a) => a, |
|
|
|
|
Err(e) => { |
|
|
|
|
panic!("{}", e); |
|
|
|
|
} |
|
|
|
|
}; |
|
|
|
|
let mut au = resolve(aucfg); |
|
|
|
|
|
|
|
|
|
match tx |
|
|
|
|
.send(AnalyticServiceMessage::Response(Ok( |
|
|
|
|
.send(AnalyticServiceMessage::Response( |
|
|
|
|
ResponseType::LearningStarted, |
|
|
|
|
))) |
|
|
|
|
)) |
|
|
|
|
.await |
|
|
|
|
{ |
|
|
|
|
Ok(_) => {} |
|
|
|
@ -395,11 +298,9 @@ impl AnalyticService {
|
|
|
|
|
|
|
|
|
|
// TODO: maybe to spawn_blocking here
|
|
|
|
|
let lr = match au.learn(ms, ss).await { |
|
|
|
|
Ok(res) => match res { |
|
|
|
|
LearningResult::Finished => Ok(ResponseType::LearningFinished(au)), |
|
|
|
|
LearningResult::FinishedEmpty => Ok(ResponseType::LearningFinishedEmpty), |
|
|
|
|
}, |
|
|
|
|
Err(e) => Err(e), |
|
|
|
|
LearningResult::Finished => ResponseType::LearningFinished(au), |
|
|
|
|
LearningResult::DatasourceError => ResponseType::LearningDatasourceError, |
|
|
|
|
LearningResult::FinishedEmpty => ResponseType::LearningFinishedEmpty, |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
match tx.send(AnalyticServiceMessage::Response(lr)).await { |
|
|
|
|