Browse Source

Merge pull request #63 from hastic/basic-periodic-update-of-detection-runner-#62

detections window begin
pull/69/head
glitch4347 2 years ago committed by GitHub
parent
commit
0eff1204f2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      server/src/services/analytic_service/analytic_service.rs
  2. 4
      server/src/services/analytic_service/analytic_unit/anomaly_analytic_unit.rs
  3. 4
      server/src/services/analytic_service/analytic_unit/pattern_analytic_unit.rs
  4. 3
      server/src/services/analytic_service/analytic_unit/threshold_analytic_unit.rs
  5. 1
      server/src/services/analytic_service/analytic_unit/types.rs
  6. 35
      server/src/services/analytic_service/detection_runner.rs

2
server/src/services/analytic_service/analytic_service.rs

@ -135,7 +135,7 @@ impl AnalyticService {
};
let tx = self.tx.clone();
let au = self.analytic_unit.as_ref().unwrap().clone();
let dr = DetectionRunner::new(tx, drcfg, au);
let dr = DetectionRunner::new(self.metric_service.clone(), tx, drcfg, au);
self.detection_runner = Some(dr);
self.detection_runner.as_mut().unwrap().run(from);

4
server/src/services/analytic_service/analytic_unit/anomaly_analytic_unit.rs

@ -164,6 +164,10 @@ impl AnalyticUnit for AnomalyAnalyticUnit {
fn get_id(&self) -> String {
return self.id.to_owned();
}
fn get_detection_window(&self) -> u64 {
// TODO: return window based on real petterns info
return DETECTION_STEP;
}
fn set_config(&mut self, config: AnalyticUnitConfig) {
if let AnalyticUnitConfig::Anomaly(cfg) = config {
self.config = cfg;

4
server/src/services/analytic_service/analytic_unit/pattern_analytic_unit.rs

@ -219,6 +219,10 @@ impl AnalyticUnit for PatternAnalyticUnit {
fn get_id(&self) -> String {
return self.id.to_owned();
}
fn get_detection_window(&self) -> u64 {
// TODO: return window based on real petterns info
return DETECTION_STEP;
}
fn set_config(&mut self, config: AnalyticUnitConfig) {
if let AnalyticUnitConfig::Pattern(cfg) = config {
self.config = cfg;

3
server/src/services/analytic_service/analytic_unit/threshold_analytic_unit.rs

@ -25,6 +25,9 @@ impl AnalyticUnit for ThresholdAnalyticUnit {
fn get_id(&self) -> String {
return self.id.to_owned();
}
fn get_detection_window(&self) -> u64 {
return DETECTION_STEP;
}
async fn learn(
&mut self,
_ms: MetricService,

1
server/src/services/analytic_service/analytic_unit/types.rs

@ -131,6 +131,7 @@ pub enum LearningResult {
#[async_trait]
pub trait AnalyticUnit {
fn get_id(&self) -> String;
fn get_detection_window(&self) -> u64;
async fn learn(
&mut self,
ms: MetricService,

35
server/src/services/analytic_service/detection_runner.rs

@ -1,11 +1,18 @@
use chrono::{Utc, DateTime};
use tokio::sync::{mpsc, RwLock};
use tokio::sync::{mpsc};
use crate::services::metric_service::MetricService;
use super::types::{AnalyticServiceMessage, AnalyticUnitRF, DetectionRunnerConfig, ResponseType};
use tokio::time::{sleep, Duration};
const DETECTION_STEP: u64 = 10;
pub struct DetectionRunner {
metric_service: MetricService,
tx: mpsc::Sender<AnalyticServiceMessage>,
config: DetectionRunnerConfig,
analytic_unit: AnalyticUnitRF,
@ -14,11 +21,13 @@ pub struct DetectionRunner {
impl DetectionRunner {
pub fn new(
metric_service: MetricService,
tx: mpsc::Sender<AnalyticServiceMessage>,
config: DetectionRunnerConfig,
analytic_unit: AnalyticUnitRF,
) -> DetectionRunner {
DetectionRunner {
metric_service,
tx,
config,
analytic_unit,
@ -33,16 +42,18 @@ impl DetectionRunner {
self.running_handler.as_mut().unwrap().abort();
}
self.running_handler = Some(tokio::spawn({
// TODO: clone channel
let cfg = self.config.clone();
let ms = self.metric_service.clone();
let tx = self.tx.clone();
let au = self.analytic_unit.clone();
async move {
// TODO: run detection "from" for big timespan
// TODO: parse detections to webhooks
// TODO: define window for detection
// TODO: save last detection
// TODO: handle case when detection is in the end and continues after "now"
let window_size = au.as_ref().read().await.get_detection_window();
let mut t_from = from - window_size;
let mut t_to = from;
match tx
.send(AnalyticServiceMessage::Response(Ok(
@ -55,17 +66,25 @@ impl DetectionRunner {
}
loop {
// TODO: don't use DateTime, but count timestamp by steps
let now: DateTime<Utc> = Utc::now();
let to = now.timestamp() as u64;
let a = au.as_ref().read().await;
let detections = a.detect(ms.clone(), t_from, t_to).await.unwrap();
for d in detections {
println!("detection: {} {}", d.0, d.1);
}
// TODO: run detection periodically
sleep(Duration::from_secs(cfg.interval)).await;
// TODO: set info about detections to tx
match tx.send(AnalyticServiceMessage::Response(Ok(
ResponseType::DetectionRunnerUpdate(au.as_ref().read().await.get_id(), to)
ResponseType::DetectionRunnerUpdate(au.as_ref().read().await.get_id(), t_to)
))).await {
Ok(_) => {},
Err(_e) => println!("Fail to send detection runner started notification"),
}
sleep(Duration::from_secs(cfg.interval)).await;
}
}
}));

Loading…
Cancel
Save