Browse Source

hsr begin

pull/25/head
Alexey Velikiy 3 years ago
parent
commit
8975a501fa
  1. 16
      server/src/services/analytic_service/analytic_client.rs
  2. 41
      server/src/services/analytic_service/analytic_service.rs
  3. 27
      server/src/services/analytic_service/analytic_unit/anomaly_analytic_unit.rs
  4. 18
      server/src/services/analytic_service/analytic_unit/pattern_analytic_unit.rs
  5. 18
      server/src/services/analytic_service/analytic_unit/threshold_analytic_unit.rs
  6. 1
      server/src/services/analytic_service/analytic_unit/types.rs
  7. 15
      server/src/services/analytic_service/types.rs

16
server/src/services/analytic_service/analytic_client.rs

@ -7,6 +7,7 @@ use crate::services::segments_service::Segment;
use super::analytic_unit::types::AnalyticUnitConfig;
use super::analytic_unit::types::PatchConfig;
use super::types::DetectionTask;
use super::types::HSRTask;
use super::types::LearningStatus;
use super::types::LearningTrain;
use super::types::{AnalyticServiceMessage, RequestType};
@ -75,4 +76,19 @@ impl AnalyticClient {
Err(e) => Ok(Vec::new()),
}
}
pub async fn get_hsr(&self, from: u64, to: u64) -> anyhow::Result<Vec<(u64, f64)>> {
let (tx, rx) = oneshot::channel();
let req = AnalyticServiceMessage::Request(RequestType::GetHSR(HSRTask {
sender: tx,
from,
to,
}));
self.tx.send(req).await?;
// TODO: handle second error
match rx.await? {
Ok(r) => Ok(r),
Err(e) => Ok(Vec::new()),
}
}
}

41
server/src/services/analytic_service/analytic_service.rs

@ -1,7 +1,7 @@
use std::sync::Arc;
use super::analytic_unit::types::{AnalyticUnitConfig, PatchConfig, PatternConfig};
use super::types::{self, DetectionRunnerConfig, LearningTrain};
use super::types::{self, DetectionRunnerConfig, LearningTrain, LearningWaiter};
use super::{
analytic_client::AnalyticClient,
analytic_unit::pattern_analytic_unit::{self, LearningResults, PatternAnalyticUnit},
@ -42,7 +42,7 @@ pub struct AnalyticService {
learning_handler: Option<tokio::task::JoinHandle<()>>,
// awaiters
learning_waiters: Vec<DetectionTask>,
learning_waiters: Vec<LearningWaiter>,
// runner
runner_handler: Option<tokio::task::JoinHandle<()>>,
@ -84,13 +84,17 @@ impl AnalyticService {
AnalyticClient::new(self.tx.clone())
}
fn run_detection_task(&self, task: DetectionTask) {
fn run_learning_waiter(&self, learning_waiter: LearningWaiter) {
// TODO: save handler of the task
tokio::spawn({
let ms = self.metric_service.clone();
let au = self.analytic_unit.as_ref().unwrap().clone();
async move {
AnalyticService::get_detections(task.sender, au, ms, task.from, task.to).await;
match learning_waiter {
LearningWaiter::Detection(task) => AnalyticService::get_detections(task.sender, au, ms, task.from, task.to).await,
LearningWaiter::HSR(task) => AnalyticService::get_hsr(task.sender, au, ms, task.from, task.to).await,
}
}
});
}
@ -129,6 +133,7 @@ impl AnalyticService {
}));
}
RequestType::RunDetection(task) => {
// TODO: signle source of truth: Option<AnalyticUnit> vs LearningStatus
if self.analytic_unit_learning_status == LearningStatus::Initialization {
match task
.sender
@ -143,9 +148,9 @@ impl AnalyticService {
return;
}
if self.analytic_unit_learning_status == LearningStatus::Ready {
self.run_detection_task(task);
self.run_learning_waiter(LearningWaiter::Detection(task));
} else {
self.learning_waiters.push(task);
self.learning_waiters.push(LearningWaiter::Detection(task));
}
}
RequestType::GetStatus(tx) => {
@ -172,6 +177,10 @@ impl AnalyticService {
self.patch_config(patch_obj, tx);
// tx.send(()).unwrap();
}
RequestType::GetHSR(task) => {
// self.analytic_unit.
// TODO: implement
}
};
}
@ -190,7 +199,7 @@ impl AnalyticService {
// TODO: run tasks from self.learning_waiter
while self.learning_waiters.len() > 0 {
let task = self.learning_waiters.pop().unwrap();
self.run_detection_task(task);
self.run_learning_waiter(task);
}
// TODO: fix this
@ -329,4 +338,22 @@ impl AnalyticService {
}
return;
}
async fn get_hsr(
tx: oneshot::Sender<anyhow::Result<Vec<(u64, f64)>>>,
analytic_unit: Arc<RwLock<Box<dyn AnalyticUnit + Send + Sync>>>,
ms: MetricService,
from: u64,
to: u64,
) {
let hsr = analytic_unit.read().await.get_hsr(ms, from, to).await.unwrap();
match tx.send(Ok(hsr)) {
Ok(_) => {}
Err(_e) => {
println!("failed to send results");
}
}
}
}

27
server/src/services/analytic_service/analytic_unit/anomaly_analytic_unit.rs

@ -46,9 +46,36 @@ impl AnalyticUnit for AnomalyAnalyticUnit {
let k = mr.data.keys().nth(0).unwrap();
let ts = &mr.data[k];
if ts.len() == 0 {
return Ok(Vec::new());
}
let ct = ts[0];
// TODO: implement
// TODO: decide what to do it from is Some() in the end
Ok(Default::default())
}
async fn get_hsr(
&self,
ms: MetricService,
from: u64,
to: u64,
) -> anyhow::Result<Vec<(u64, f64)>> {
// TODO: implement
let mr = ms.query(from, to, DETECTION_STEP).await.unwrap();
if mr.data.keys().len() == 0 {
return Ok(Vec::new());
}
let k = mr.data.keys().nth(0).unwrap();
let ts = mr.data[k].clone();
Ok(ts)
}
}

18
server/src/services/analytic_service/analytic_unit/pattern_analytic_unit.rs

@ -398,4 +398,22 @@ impl AnalyticUnit for PatternAnalyticUnit {
Ok(results)
}
async fn get_hsr(
&self,
ms: MetricService,
from: u64,
to: u64,
) -> anyhow::Result<Vec<(u64, f64)>> {
let mr = ms.query(from, to, DETECTION_STEP).await.unwrap();
if mr.data.keys().len() == 0 {
return Ok(Vec::new());
}
let k = mr.data.keys().nth(0).unwrap();
let ts = mr.data[k].clone();
Ok(ts)
}
}

18
server/src/services/analytic_service/analytic_unit/threshold_analytic_unit.rs

@ -74,4 +74,22 @@ impl AnalyticUnit for ThresholdAnalyticUnit {
Ok(result)
}
async fn get_hsr(
&self,
ms: MetricService,
from: u64,
to: u64,
) -> anyhow::Result<Vec<(u64, f64)>> {
let mr = ms.query(from, to, DETECTION_STEP).await.unwrap();
if mr.data.keys().len() == 0 {
return Ok(Vec::new());
}
let k = mr.data.keys().nth(0).unwrap();
let ts = mr.data[k].clone();
Ok(ts)
}
}

1
server/src/services/analytic_service/analytic_unit/types.rs

@ -128,6 +128,7 @@ pub trait AnalyticUnit {
) -> anyhow::Result<Vec<(u64, u64)>>;
fn set_config(&mut self, c: AnalyticUnitConfig);
async fn get_hsr(&self, ms: MetricService, from: u64, to: u64) -> anyhow::Result<Vec<(u64, f64)>> ;
}
#[derive(Deserialize, Serialize, Debug)]

15
server/src/services/analytic_service/types.rs

@ -61,6 +61,20 @@ pub struct DetectionTask {
pub to: u64,
}
#[derive(Debug)]
pub struct HSRTask {
// TODO: make enum for HSR which is different for different Analytic Types
pub sender: oneshot::Sender<Result<Vec<(u64, f64)>>>,
pub from: u64,
pub to: u64,
}
#[derive(Debug)]
pub enum LearningWaiter {
Detection(DetectionTask),
HSR(HSRTask)
}
#[derive(Debug)]
pub struct DetectionRunnerConfig {
// pub sender: mpsc::Sender<Result<Vec<Segment>>>,
@ -72,6 +86,7 @@ pub struct DetectionRunnerConfig {
pub enum RequestType {
// TODO: convert to result RunLearning(anyhow::Result<()>)
RunLearning,
GetHSR(HSRTask),
RunDetection(DetectionTask),
GetStatus(oneshot::Sender<LearningStatus>),
// TODO: make type of Value

Loading…
Cancel
Save