From 5fa99d14214b31ca940e7ce8bf6dd30d5f5ca147 Mon Sep 17 00:00:00 2001 From: Alexey Velikiy Date: Mon, 20 Dec 2021 22:32:03 +0300 Subject: [PATCH] update detections --- .../services/analytic_service/analytic_service.rs | 3 +++ .../services/analytic_service/detection_runner.rs | 13 +++++++++++-- server/src/services/analytic_service/types.rs | 1 + server/src/services/analytic_unit_service.rs | 7 ++++++- 4 files changed, 21 insertions(+), 3 deletions(-) diff --git a/server/src/services/analytic_service/analytic_service.rs b/server/src/services/analytic_service/analytic_service.rs index 8c79628..81c9f22 100644 --- a/server/src/services/analytic_service/analytic_service.rs +++ b/server/src/services/analytic_service/analytic_service.rs @@ -238,6 +238,9 @@ impl AnalyticService { ResponseType::DetectionRunnerStarted(from) => { println!("Detection runner started from {}", from) } + ResponseType::DetectionRunnerUpdate(id, timestamp) => { + self.analytic_unit_service.set_last_detection(id, timestamp).unwrap(); + } ResponseType::LearningStarted => { self.analytic_unit_learning_status = LearningStatus::Learning } diff --git a/server/src/services/analytic_service/detection_runner.rs b/server/src/services/analytic_service/detection_runner.rs index 6135b9f..e881483 100644 --- a/server/src/services/analytic_service/detection_runner.rs +++ b/server/src/services/analytic_service/detection_runner.rs @@ -1,4 +1,4 @@ -use chrono::Utc; +use chrono::{Utc, DateTime}; use tokio::sync::{mpsc, RwLock}; @@ -36,6 +36,7 @@ impl DetectionRunner { // TODO: clone channel let cfg = self.config.clone(); let tx = self.tx.clone(); + let au = self.analytic_unit.clone(); async move { // TODO: run detection "from" for big timespan // TODO: parse detections to webhooks @@ -54,9 +55,17 @@ impl DetectionRunner { } loop { + // TODO: don't use DateTime, but count timestamp by steps + let now: DateTime = Utc::now(); + let to = now.timestamp() as u64; // TODO: run detection periodically sleep(Duration::from_secs(cfg.interval)).await; - // TODO: use tx senf detection update + match tx.send(AnalyticServiceMessage::Response(Ok( + ResponseType::DetectionRunnerUpdate(au.as_ref().read().await.get_id(), to) + ))).await { + Ok(_) => {}, + Err(_e) => println!("Fail to send detection runner started notification"), + } } } })); diff --git a/server/src/services/analytic_service/types.rs b/server/src/services/analytic_service/types.rs index 3ce085e..6d1efbe 100644 --- a/server/src/services/analytic_service/types.rs +++ b/server/src/services/analytic_service/types.rs @@ -44,6 +44,7 @@ impl Default for LearningTrain { pub enum ResponseType { DetectionRunnerStarted(u64), + DetectionRunnerUpdate(String, u64), // analytic_unit id and timestamp LearningStarted, LearningFinished(Box), LearningFinishedEmpty, diff --git a/server/src/services/analytic_unit_service.rs b/server/src/services/analytic_unit_service.rs index 1a164c6..608bf9e 100644 --- a/server/src/services/analytic_unit_service.rs +++ b/server/src/services/analytic_unit_service.rs @@ -65,7 +65,12 @@ impl AnalyticUnitService { } // TODO: resolve with saving by id - pub fn set_last_detection(id: String, last_detection: u64) -> anyhow::Result<()> { + pub fn set_last_detection(&self, id: String, last_detection: u64) -> anyhow::Result<()> { + let conn = self.connection.lock().unwrap(); + conn.execute( + "UPDATE analytic_unit SET last_detection = ?1 WHERE id = ?2", + params![last_detection, id] + )?; Ok(()) } } \ No newline at end of file