Hastic standalone https://hastic.io
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

105 lines
3.6 KiB

use chrono::{DateTime, Utc};
use tokio::sync::mpsc;
use crate::services::metric_service::MetricService;
use super::types::{AnalyticServiceMessage, AnalyticUnitRF, DetectionRunnerConfig, ResponseType};
use tokio::time::{sleep, Duration};
const DETECTION_STEP: u64 = 10;
pub struct DetectionRunner {
metric_service: MetricService,
tx: mpsc::Sender<AnalyticServiceMessage>,
config: DetectionRunnerConfig,
analytic_unit: AnalyticUnitRF,
running_handler: Option<tokio::task::JoinHandle<()>>,
impl DetectionRunner {
pub fn new(
metric_service: MetricService,
tx: mpsc::Sender<AnalyticServiceMessage>,
config: DetectionRunnerConfig,
analytic_unit: AnalyticUnitRF,
) -> DetectionRunner {
DetectionRunner {
running_handler: None,
pub fn run(&mut self, from: u64) {
// TODO: get last detection timestamp from persistance
// TODO: set last detection from "now"
if self.running_handler.is_some() {
self.running_handler = Some(tokio::spawn({
let cfg = self.config.clone();
let ms = self.metric_service.clone();
let tx = self.tx.clone();
let au = self.analytic_unit.clone();
async move {
// TODO: run detection "from" for big timespan
// TODO: parse detections to webhooks
// TODO: define window for detection
// TODO: handle case when detection is in the end and continues after "now"
// TODO: update t_from / t_to
let window_size = au.as_ref().read().await.get_detection_window();
let mut t_from = from - window_size;
let mut t_to = from;
match tx
Ok(_) => {}
Err(_e) => println!("Fail to send detection runner started notification"),
loop {
let a = au.as_ref().read().await;
let detections = a.detect(ms.clone(), t_from, t_to).await.unwrap();
for d in detections {
println!("detection: {} {}", d.0, d.1);
// TODO: set info about detections to tx
match tx
Ok(_) => {}
Err(_e) => println!("Fail to send detection runner started notification"),
// pub async fn set_analytic_unit(&mut self, analytic_unit: AnalyticUnitRF,
// ) {
// self.analytic_unit = analytic_unit;
// // TODO: stop running_handler
// // TODO: rerun detection with new anomaly units
// if self.runner_handler.is_some() {
// self.runner_handler.as_mut().unwrap().abort();
// }
// }