Browse Source

Merge pull request #47 from hastic/detection-waiter

detection runner in learning waiters
detection_runner_updade
glitch4347 2 years ago committed by GitHub
parent
commit
193e7984c2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 45
      server/src/services/analytic_service/analytic_service.rs
  2. 7
      server/src/services/analytic_service/types.rs

45
server/src/services/analytic_service/analytic_service.rs

@ -2,7 +2,7 @@ use std::sync::Arc;
use super::analytic_unit::types::{AnalyticUnitConfig, PatchConfig};
use super::detection_runner::DetectionRunner;
use super::types::{self, AnalyticUnitRF, DetectionRunnerConfig, LearningWaiter, HSR};
use super::types::{self, AnalyticUnitRF, DetectionRunnerConfig, LearningWaiter, HSR, DetectionRunnerTask};
use super::{
analytic_client::AnalyticClient,
types::{AnalyticServiceMessage, LearningStatus, RequestType, ResponseType},
@ -20,6 +20,7 @@ use crate::services::analytic_service::analytic_unit::types::{AnalyticUnit, Lear
use anyhow;
use chrono::{TimeZone, DateTime, Utc};
use tokio::sync::{mpsc, oneshot};
// TODO: now it's basically single analytic unit, service will operate on many AU
@ -82,26 +83,36 @@ impl AnalyticService {
AnalyticClient::new(self.tx.clone())
}
fn run_learning_waiter(&self, learning_waiter: LearningWaiter) {
fn run_learning_waiter(&mut self, learning_waiter: LearningWaiter) {
// TODO: save handler of the task
tokio::spawn({
let ms = self.metric_service.clone();
let au = self.analytic_unit.as_ref().unwrap().clone();
async move {
match learning_waiter {
LearningWaiter::Detection(task) => {
match learning_waiter {
LearningWaiter::Detection(task) => {
tokio::spawn({
let ms = self.metric_service.clone();
let au = self.analytic_unit.as_ref().unwrap().clone();
async move {
AnalyticService::get_detections(task.sender, au, ms, task.from, task.to)
.await
}
LearningWaiter::HSR(task) => {
});
}
LearningWaiter::HSR(task) => {
tokio::spawn({
let ms = self.metric_service.clone();
let au = self.analytic_unit.as_ref().unwrap().clone();
async move {
AnalyticService::get_hsr(task.sender, au, ms, task.from, task.to).await
}
}
});
}
});
LearningWaiter::DetectionRunner(task) => {
self.run_detection_runner(task.from);
}
}
}
fn run_detection_runner(&mut self) {
// TODO: make
fn run_detection_runner(&mut self, from: u64) {
// TODO: handle case or make it impossible to run_detection_runner second time
if self.analytic_unit.is_none() {
@ -109,7 +120,10 @@ impl AnalyticService {
}
if self.analytic_unit_learning_status != LearningStatus::Ready {
// TODO: add to waiter
let task = DetectionRunnerTask {
from
};
self.learning_waiters.push(LearningWaiter::DetectionRunner(task));
return;
}
@ -290,7 +304,10 @@ impl AnalyticService {
// TODO: remove this hack
self.consume_request(RequestType::RunLearning);
if self.alerting.is_some() {
self.run_detection_runner();
// TODO: get it from persistance
let now: DateTime<Utc> = Utc::now();
let from = now.timestamp() as u64;
self.run_detection_runner(from);
}
while let Some(message) = self.rx.recv().await {

7
server/src/services/analytic_service/types.rs

@ -62,6 +62,12 @@ pub struct DetectionTask {
pub to: u64,
}
#[derive(Debug)]
pub struct DetectionRunnerTask {
pub from: u64,
}
#[derive(Debug, Serialize)]
pub struct AnomalyHSRConfig {
pub timestamp: u64,
@ -87,6 +93,7 @@ pub struct HSRTask {
#[derive(Debug)]
pub enum LearningWaiter {
Detection(DetectionTask),
DetectionRunner(DetectionRunnerTask),
HSR(HSRTask),
}

Loading…
Cancel
Save