Browse Source

threshold begin

pull/25/head
Alexey Velikiy 3 years ago
parent
commit
9bbbbe3c9d
  1. 1
      server/Cargo.lock
  2. 1
      server/Cargo.toml
  3. 107
      server/src/services/analytic_service/analytic_service.rs
  4. 12
      server/src/services/analytic_service/analytic_unit/mod.rs
  5. 20
      server/src/services/analytic_service/analytic_unit/pattern_analytic_unit.rs
  6. 40
      server/src/services/analytic_service/analytic_unit/threshold_analytic_unit.rs
  7. 10
      server/src/services/analytic_service/analytic_unit/types.rs
  8. 4
      server/src/services/analytic_service/types.rs

1
server/Cargo.lock generated

@ -482,6 +482,7 @@ name = "hastic"
version = "0.0.1" version = "0.0.1"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-trait",
"bincode", "bincode",
"chrono", "chrono",
"config", "config",

1
server/Cargo.toml

@ -27,6 +27,7 @@ linfa = "0.5.0"
linfa-svm = { version="0.5.0", features=["serde"] } linfa-svm = { version="0.5.0", features=["serde"] }
ndarray = "0.15.3" ndarray = "0.15.3"
bincode = "1.3.3" bincode = "1.3.3"
async-trait = "0.1.51"
# TODO: remove this from prod # TODO: remove this from prod
# plotlib = "0.5.1" # plotlib = "0.5.1"

107
server/src/services/analytic_service/analytic_service.rs

@ -1,8 +1,8 @@
use super::analytic_unit::types::{AnalyticUnitConfig, PatternDetectorConfig}; use super::analytic_unit::types::{AnalyticUnitConfig, PatternConfig};
use super::types::{self, DetectionRunnerConfig, LearningTrain}; use super::types::{self, DetectionRunnerConfig, LearningTrain};
use super::{ use super::{
analytic_client::AnalyticClient, analytic_client::AnalyticClient,
analytic_unit::pattern_detector::{self, LearningResults, PatternDetector}, analytic_unit::pattern_analytic_unit::{self, LearningResults, PatternAnalyticUnit},
types::{AnalyticServiceMessage, DetectionTask, LearningStatus, RequestType, ResponseType}, types::{AnalyticServiceMessage, DetectionTask, LearningStatus, RequestType, ResponseType},
}; };
@ -51,9 +51,11 @@ async fn segment_to_segdata(ms: &MetricService, segment: &Segment) -> anyhow::Re
pub struct AnalyticService { pub struct AnalyticService {
metric_service: MetricService, metric_service: MetricService,
segments_service: SegmentsService, segments_service: SegmentsService,
learning_results: Option<LearningResults>,
analytic_unit_learning_results: Option<LearningResults>,
analytic_unit_config: AnalyticUnitConfig, analytic_unit_config: AnalyticUnitConfig,
learning_status: LearningStatus, analytic_unit_learning_status: LearningStatus,
tx: mpsc::Sender<AnalyticServiceMessage>, tx: mpsc::Sender<AnalyticServiceMessage>,
rx: mpsc::Receiver<AnalyticServiceMessage>, rx: mpsc::Receiver<AnalyticServiceMessage>,
@ -82,13 +84,13 @@ impl AnalyticService {
segments_service, segments_service,
// TODO: get it from persistance // TODO: get it from persistance
learning_results: None, analytic_unit_learning_results: None,
analytic_unit_config: AnalyticUnitConfig::PatternDetector(PatternDetectorConfig { analytic_unit_config: AnalyticUnitConfig::Pattern(PatternConfig {
correlation_score: 0.95, correlation_score: 0.95,
model_score: 0.95 model_score: 0.95
}), }),
learning_status: LearningStatus::Initialization, analytic_unit_learning_status: LearningStatus::Initialization,
tx, tx,
rx, rx,
@ -111,10 +113,10 @@ impl AnalyticService {
fn run_detection_task(&self, task: DetectionTask) { fn run_detection_task(&self, task: DetectionTask) {
// TODO: save handler of the task // TODO: save handler of the task
tokio::spawn({ tokio::spawn({
let lr = self.learning_results.as_ref().unwrap().clone(); let lr = self.analytic_unit_learning_results.as_ref().unwrap().clone();
let ms = self.metric_service.clone(); let ms = self.metric_service.clone();
async move { async move {
AnalyticService::get_pattern_detection(task.sender, lr, ms, task.from, task.to) AnalyticService::get_detections(task.sender, lr, ms, task.from, task.to)
.await; .await;
} }
}); });
@ -126,7 +128,7 @@ impl AnalyticService {
} }
// TODO: save handler of the task // TODO: save handler of the task
self.runner_handler = Some(tokio::spawn({ self.runner_handler = Some(tokio::spawn({
let lr = self.learning_results.as_ref().unwrap().clone(); let lr = self.analytic_unit_learning_results.as_ref().unwrap().clone();
let ms = self.metric_service.clone(); let ms = self.metric_service.clone();
async move { async move {
// TODO: implement // TODO: implement
@ -142,7 +144,7 @@ impl AnalyticService {
self.learning_handler = None; self.learning_handler = None;
} }
self.learning_handler = Some(tokio::spawn({ self.learning_handler = Some(tokio::spawn({
self.learning_status = LearningStatus::Starting; self.analytic_unit_learning_status = LearningStatus::Starting;
let tx = self.tx.clone(); let tx = self.tx.clone();
let ms = self.metric_service.clone(); let ms = self.metric_service.clone();
let ss = self.segments_service.clone(); let ss = self.segments_service.clone();
@ -152,7 +154,7 @@ impl AnalyticService {
})); }));
} }
RequestType::RunDetection(task) => { RequestType::RunDetection(task) => {
if self.learning_status == LearningStatus::Initialization { if self.analytic_unit_learning_status == LearningStatus::Initialization {
match task match task
.sender .sender
.send(Err(anyhow::format_err!("Analytics in initialization"))) .send(Err(anyhow::format_err!("Analytics in initialization")))
@ -165,21 +167,21 @@ impl AnalyticService {
} }
return; return;
} }
if self.learning_status == LearningStatus::Ready { if self.analytic_unit_learning_status == LearningStatus::Ready {
self.run_detection_task(task); self.run_detection_task(task);
} else { } else {
self.learning_waiters.push(task); self.learning_waiters.push(task);
} }
} }
RequestType::GetStatus(tx) => { RequestType::GetStatus(tx) => {
tx.send(self.learning_status.clone()).unwrap(); tx.send(self.analytic_unit_learning_status.clone()).unwrap();
} }
RequestType::GetLearningTrain(tx) => { RequestType::GetLearningTrain(tx) => {
if self.learning_results.is_none() { if self.analytic_unit_learning_results.is_none() {
tx.send(LearningTrain::default()).unwrap(); tx.send(LearningTrain::default()).unwrap();
} else { } else {
tx.send( tx.send(
self.learning_results self.analytic_unit_learning_results
.as_ref() .as_ref()
.unwrap() .unwrap()
.learning_train .learning_train
@ -197,11 +199,11 @@ impl AnalyticService {
fn consume_response(&mut self, res: types::ResponseType) { fn consume_response(&mut self, res: types::ResponseType) {
match res { match res {
// TODO: handle when learning panic // TODO: handle when learning panic
ResponseType::LearningStarted => self.learning_status = LearningStatus::Learning, ResponseType::LearningStarted => self.analytic_unit_learning_status = LearningStatus::Learning,
ResponseType::LearningFinished(results) => { ResponseType::LearningFinished(results) => {
self.learning_handler = None; self.learning_handler = None;
self.learning_results = Some(results); self.analytic_unit_learning_results = Some(results);
self.learning_status = LearningStatus::Ready; self.analytic_unit_learning_status = LearningStatus::Ready;
// TODO: run tasks from self.learning_waiter // TODO: run tasks from self.learning_waiter
while self.learning_waiters.len() > 0 { while self.learning_waiters.len() > 0 {
@ -219,13 +221,13 @@ impl AnalyticService {
} }
ResponseType::LearningFinishedEmpty => { ResponseType::LearningFinishedEmpty => {
// TODO: drop all learning_waiters with empty results // TODO: drop all learning_waiters with empty results
self.learning_results = None; self.analytic_unit_learning_results = None;
self.learning_status = LearningStatus::Initialization; self.analytic_unit_learning_status = LearningStatus::Initialization;
} }
ResponseType::LearningDatasourceError => { ResponseType::LearningDatasourceError => {
// TODO: drop all learning_waiters with error // TODO: drop all learning_waiters with error
self.learning_results = None; self.analytic_unit_learning_results = None;
self.learning_status = LearningStatus::Error; self.analytic_unit_learning_status = LearningStatus::Error;
} }
} }
} }
@ -311,7 +313,7 @@ impl AnalyticService {
} }
} }
let lr = PatternDetector::learn(&learn_tss, &learn_anti_tss).await; let lr = PatternAnalyticUnit::learn(&learn_tss, &learn_anti_tss).await;
match tx match tx
.send(AnalyticServiceMessage::Response( .send(AnalyticServiceMessage::Response(
ResponseType::LearningFinished(lr), ResponseType::LearningFinished(lr),
@ -323,14 +325,14 @@ impl AnalyticService {
} }
} }
async fn get_pattern_detection( async fn get_detections(
tx: oneshot::Sender<anyhow::Result<Vec<Segment>>>, tx: oneshot::Sender<anyhow::Result<Vec<Segment>>>,
lr: LearningResults, lr: LearningResults,
ms: MetricService, ms: MetricService,
from: u64, from: u64,
to: u64, to: u64,
) { ) {
let pt = pattern_detector::PatternDetector::new(lr); let pt = pattern_analytic_unit::PatternAnalyticUnit::new(lr);
let mr = ms.query(from, to, DETECTION_STEP).await.unwrap(); let mr = ms.query(from, to, DETECTION_STEP).await.unwrap();
if mr.data.keys().len() == 0 { if mr.data.keys().len() == 0 {
@ -367,58 +369,5 @@ impl AnalyticService {
return; return;
} }
// TODO: move this to another analytic unit
async fn get_threshold_detections(
&self,
from: u64,
to: u64,
step: u64,
threashold: f64,
) -> anyhow::Result<Vec<Segment>> {
let mr = self.metric_service.query(from, to, step).await?;
if mr.data.keys().len() == 0 {
return Ok(Vec::new());
}
let key = mr.data.keys().nth(0).unwrap();
let ts = &mr.data[key];
let mut result = Vec::<Segment>::new();
let mut from: Option<u64> = None;
for (t, v) in ts {
if *v > threashold {
if from.is_some() {
continue;
} else {
from = Some(*t);
}
} else {
if from.is_some() {
result.push(Segment {
// TODO: persist detections together with id
id: Some(get_random_str(ID_LENGTH)),
from: from.unwrap(),
to: *t,
segment_type: SegmentType::Detection,
});
from = None;
}
}
}
// TODO: don't repeat myself
if from.is_some() {
result.push(Segment {
id: Some(get_random_str(ID_LENGTH)),
from: from.unwrap(),
to,
segment_type: SegmentType::Detection,
});
}
// TODO: decide what to do it from is Some() in the end
Ok(result)
}
} }

12
server/src/services/analytic_service/analytic_unit/mod.rs

@ -1,6 +1,12 @@
pub mod pattern_detector; pub mod pattern_analytic_unit;
pub mod threshold_analytic_unit;
pub mod types; pub mod types;
use async_trait::async_trait;
#[async_trait]
trait AnalyticUnit<C> { trait AnalyticUnit<C> {
async fn learn(reads: &Vec<Vec<(u64, f64)>>, anti_reads: &Vec<Vec<(u64, f64)>>);
} fn detect(&self, ts: &Vec<(u64, f64)>) -> Vec<(u64, u64)>;
}

20
server/src/services/analytic_service/analytic_unit/pattern_detector.rs → server/src/services/analytic_service/analytic_unit/pattern_analytic_unit.rs

@ -55,7 +55,7 @@ pub type Features = [f64; FEATURES_SIZE];
pub const SCORE_THRESHOLD: f64 = 0.95; pub const SCORE_THRESHOLD: f64 = 0.95;
#[derive(Clone)] #[derive(Clone)]
pub struct PatternDetector { pub struct PatternAnalyticUnit {
learning_results: LearningResults, learning_results: LearningResults,
} }
@ -67,9 +67,9 @@ fn nan_to_zero(n: f64) -> f64 {
} }
// TODO: move this to loginc of analytic unit // TODO: move this to loginc of analytic unit
impl PatternDetector { impl PatternAnalyticUnit {
pub fn new(learning_results: LearningResults) -> PatternDetector { pub fn new(learning_results: LearningResults) -> PatternAnalyticUnit {
PatternDetector { learning_results } PatternAnalyticUnit { learning_results }
} }
pub async fn learn( pub async fn learn(
@ -86,7 +86,7 @@ impl PatternDetector {
for r in reads { for r in reads {
let xs: Vec<f64> = r.iter().map(|e| e.1).map(nan_to_zero).collect(); let xs: Vec<f64> = r.iter().map(|e| e.1).map(nan_to_zero).collect();
let fs = PatternDetector::get_features(&xs); let fs = PatternAnalyticUnit::get_features(&xs);
records_raw.push(fs); records_raw.push(fs);
targets_raw.push(true); targets_raw.push(true);
@ -95,7 +95,7 @@ impl PatternDetector {
for r in anti_reads { for r in anti_reads {
let xs: Vec<f64> = r.iter().map(|e| e.1).map(nan_to_zero).collect(); let xs: Vec<f64> = r.iter().map(|e| e.1).map(nan_to_zero).collect();
let fs = PatternDetector::get_features(&xs); let fs = PatternAnalyticUnit::get_features(&xs);
records_raw.push(fs); records_raw.push(fs);
targets_raw.push(false); targets_raw.push(false);
anti_patterns.push(xs); anti_patterns.push(xs);
@ -168,7 +168,7 @@ impl PatternDetector {
for j in 0..p.len() { for j in 0..p.len() {
backet.push(nan_to_zero(ts[i + j].1)); backet.push(nan_to_zero(ts[i + j].1));
} }
let score = PatternDetector::corr_aligned(p, &backet); let score = PatternAnalyticUnit::corr_aligned(p, &backet);
if score > pattern_match_score { if score > pattern_match_score {
pattern_match_score = score; pattern_match_score = score;
pattern_match_len = p.len(); pattern_match_len = p.len();
@ -182,7 +182,7 @@ impl PatternDetector {
for j in 0..p.len() { for j in 0..p.len() {
backet.push(nan_to_zero(ts[i + j].1)); backet.push(nan_to_zero(ts[i + j].1));
} }
let score = PatternDetector::corr_aligned(p, &backet); let score = PatternAnalyticUnit::corr_aligned(p, &backet);
if score > anti_pattern_match_score { if score > anti_pattern_match_score {
anti_pattern_match_score = score; anti_pattern_match_score = score;
} }
@ -194,7 +194,7 @@ impl PatternDetector {
for j in 0..pattern_match_len { for j in 0..pattern_match_len {
backet.push(nan_to_zero(ts[i + j].1)); backet.push(nan_to_zero(ts[i + j].1));
} }
let fs = PatternDetector::get_features(&backet); let fs = PatternAnalyticUnit::get_features(&backet);
let detected = self let detected = self
.learning_results .learning_results
.model .model
@ -245,8 +245,6 @@ impl PatternDetector {
return 0.; return 0.;
} }
// TODO: case when denominator = 0
let result: f64 = numerator / denominator; let result: f64 = numerator / denominator;
// assert!(result.abs() <= 1.01); // assert!(result.abs() <= 1.01);

40
server/src/services/analytic_service/analytic_unit/threshold_analytic_unit.rs

@ -0,0 +1,40 @@
use super::types::ThresholdConfig;
struct ThresholdDetector {
config: ThresholdConfig
}
impl ThresholdDetector {
fn new(config: ThresholdConfig) -> ThresholdDetector {
ThresholdDetector{ config }
}
pub fn detect(&self, ts: &Vec<(u64, f64)>) -> Vec<(u64, u64)> {
let mut result = Vec::<(u64, u64)>::new();
let mut from: Option<u64> = None;
for (t, v) in ts {
if *v > self.config.threashold {
if from.is_some() {
continue;
} else {
from = Some(*t);
}
} else {
if from.is_some() {
result.push((from.unwrap(), *t));
from = None;
}
}
}
// TODO: don't repeat myself
if from.is_some() {
result.push((from.unwrap(), ts.last().unwrap().0));
}
// TODO: decide what to do it from is Some() in the end
result
}
}

10
server/src/services/analytic_service/analytic_unit/types.rs

@ -1,12 +1,18 @@
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize, Clone)] #[derive(Debug, Serialize, Deserialize, Clone)]
pub struct PatternDetectorConfig { pub struct PatternConfig {
pub correlation_score: f32, pub correlation_score: f32,
pub model_score: f32 pub model_score: f32
} }
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct ThresholdConfig {
pub threashold: f64,
}
#[derive(Debug, Serialize, Deserialize, Clone)] #[derive(Debug, Serialize, Deserialize, Clone)]
pub enum AnalyticUnitConfig { pub enum AnalyticUnitConfig {
PatternDetector(PatternDetectorConfig) Pattern(PatternConfig),
Threshold(ThresholdConfig)
} }

4
server/src/services/analytic_service/types.rs

@ -1,6 +1,6 @@
use crate::services::segments_service::Segment; use crate::services::segments_service::Segment;
use super::analytic_unit::{pattern_detector::{self, LearningResults}, types::AnalyticUnitConfig}; use super::analytic_unit::{pattern_analytic_unit::{self, LearningResults}, types::AnalyticUnitConfig};
use anyhow::Result; use anyhow::Result;
use serde::Serialize; use serde::Serialize;
@ -18,7 +18,7 @@ pub enum LearningStatus {
// TODO: move to analytic_unit config of pattern detector // TODO: move to analytic_unit config of pattern detector
#[derive(Clone, Serialize, Debug)] #[derive(Clone, Serialize, Debug)]
pub struct LearningTrain { pub struct LearningTrain {
pub features: Vec<pattern_detector::Features>, pub features: Vec<pattern_analytic_unit::Features>,
pub target: Vec<bool>, pub target: Vec<bool>,
} }

Loading…
Cancel
Save