Browse Source

analytic unit

pull/25/head
Alexey Velikiy 3 years ago
parent
commit
6197641bf6
  1. 36
      server/src/api/analytics.rs
  2. 14
      server/src/services/analytic_service/analytic_client.rs
  3. 225
      server/src/services/analytic_service/analytic_service.rs
  4. 15
      server/src/services/analytic_service/analytic_unit/mod.rs
  5. 299
      server/src/services/analytic_service/analytic_unit/pattern_analytic_unit.rs
  6. 46
      server/src/services/analytic_service/analytic_unit/threshold_analytic_unit.rs
  7. 29
      server/src/services/analytic_service/analytic_unit/types.rs
  8. 2
      server/src/services/analytic_service/mod.rs
  9. 21
      server/src/services/analytic_service/types.rs

36
server/src/api/analytics.rs

@ -10,7 +10,7 @@ pub mod filters {
list(client.clone())
.or(status(client.clone()))
.or(get_config(client.clone()))
.or(list_train(client.clone()))
// .or(list_train(client.clone()))
// .or(create(db.clone()))
// // .or(update(db.clone()))
// .or(delete(db.clone()))
@ -48,14 +48,14 @@ pub mod filters {
}
/// GET /analytics/model
pub fn list_train(
client: Client,
) -> impl Filter<Extract = impl warp::Reply, Error = warp::Rejection> + Clone {
warp::path!("analytics" / "model")
.and(warp::get())
.and(with_client(client))
.and_then(handlers::list_train)
}
// pub fn list_train(
// client: Client,
// ) -> impl Filter<Extract = impl warp::Reply, Error = warp::Rejection> + Clone {
// warp::path!("analytics" / "model")
// .and(warp::get())
// .and(with_client(client))
// .and_then(handlers::list_train)
// }
fn with_client(
client: Client,
@ -103,15 +103,15 @@ mod handlers {
}
}
pub async fn list_train(client: Client) -> Result<impl warp::Reply, warp::Rejection> {
match client.get_train().await {
Ok(lt) => Ok(API::json(&lt)),
Err(e) => {
println!("{:?}", e);
Err(warp::reject::custom(BadQuery))
}
}
}
// pub async fn list_train(client: Client) -> Result<impl warp::Reply, warp::Rejection> {
// match client.get_train().await {
// Ok(lt) => Ok(API::json(&lt)),
// Err(e) => {
// println!("{:?}", e);
// Err(warp::reject::custom(BadQuery))
// }
// }
// }
}
mod models {

14
server/src/services/analytic_service/analytic_client.rs

@ -43,13 +43,13 @@ impl AnalyticClient {
Ok(r)
}
pub async fn get_train(&self) -> anyhow::Result<LearningTrain> {
let (tx, rx) = oneshot::channel();
let req = AnalyticServiceMessage::Request(RequestType::GetLearningTrain(tx));
self.tx.send(req).await?;
let r = rx.await?;
Ok(r)
}
// pub async fn get_train(&self) -> anyhow::Result<LearningTrain> {
// let (tx, rx) = oneshot::channel();
// let req = AnalyticServiceMessage::Request(RequestType::GetLearningTrain(tx));
// self.tx.send(req).await?;
// let r = rx.await?;
// Ok(r)
// }
pub async fn get_pattern_detection(&self, from: u64, to: u64) -> anyhow::Result<Vec<Segment>> {
let (tx, rx) = oneshot::channel();

225
server/src/services/analytic_service/analytic_service.rs

@ -1,3 +1,5 @@
use std::sync::Arc;
use super::analytic_unit::types::{AnalyticUnitConfig, PatternConfig};
use super::types::{self, DetectionRunnerConfig, LearningTrain};
use super::{
@ -6,53 +8,27 @@ use super::{
types::{AnalyticServiceMessage, DetectionTask, LearningStatus, RequestType, ResponseType},
};
use crate::services::analytic_service::analytic_unit::resolve;
use crate::services::{
metric_service::MetricService,
segments_service::{self, Segment, SegmentType, SegmentsService, ID_LENGTH},
};
use crate::utils::{self, get_random_str};
use anyhow;
use crate::services::analytic_service::analytic_unit::types::{AnalyticUnit, LearningResult};
use tokio::sync::{mpsc, oneshot};
use anyhow;
use futures::future;
use tokio::sync::{mpsc, oneshot, RwLock};
use chrono::Utc;
// TODO: get this from pattern detector
const DETECTION_STEP: u64 = 10;
struct SegData {
label: bool,
data: Vec<(u64, f64)>,
}
async fn segment_to_segdata(ms: &MetricService, segment: &Segment) -> anyhow::Result<SegData> {
let mut mr = ms.query(segment.from, segment.to, DETECTION_STEP).await?;
if mr.data.keys().len() == 0 {
return Ok(SegData {
label: segment.segment_type == SegmentType::Label,
data: Default::default(),
});
}
let k = mr.data.keys().nth(0).unwrap().clone();
let ts = mr.data.remove(&k).unwrap();
Ok(SegData {
label: segment.segment_type == SegmentType::Label,
data: ts,
})
}
// TODO: now it's basically single analytic unit, service will operate on many AU
pub struct AnalyticService {
metric_service: MetricService,
segments_service: SegmentsService,
analytic_unit_learning_results: Option<LearningResults>,
analytic_unit: Option<Arc<RwLock<Box<dyn AnalyticUnit + Send + Sync>>>>,
analytic_unit_config: AnalyticUnitConfig,
analytic_unit_learning_status: LearningStatus,
@ -84,10 +60,10 @@ impl AnalyticService {
segments_service,
// TODO: get it from persistance
analytic_unit_learning_results: None,
analytic_unit: None,
analytic_unit_config: AnalyticUnitConfig::Pattern(PatternConfig {
correlation_score: 0.95,
model_score: 0.95
model_score: 0.95,
}),
analytic_unit_learning_status: LearningStatus::Initialization,
@ -113,28 +89,27 @@ impl AnalyticService {
fn run_detection_task(&self, task: DetectionTask) {
// TODO: save handler of the task
tokio::spawn({
let lr = self.analytic_unit_learning_results.as_ref().unwrap().clone();
let ms = self.metric_service.clone();
let au = self.analytic_unit.as_ref().unwrap().clone();
async move {
AnalyticService::get_detections(task.sender, lr, ms, task.from, task.to)
.await;
AnalyticService::get_detections(task.sender, au, ms, task.from, task.to).await;
}
});
}
fn run_detection_runner(&mut self, task: DetectionRunnerConfig) {
if self.runner_handler.is_some() {
self.runner_handler.as_mut().unwrap().abort();
}
// TODO: save handler of the task
self.runner_handler = Some(tokio::spawn({
let lr = self.analytic_unit_learning_results.as_ref().unwrap().clone();
let ms = self.metric_service.clone();
async move {
// TODO: implement
}
}));
}
// fn run_detection_runner(&mut self, task: DetectionRunnerConfig) {
// if self.runner_handler.is_some() {
// self.runner_handler.as_mut().unwrap().abort();
// }
// // TODO: save handler of the task
// self.runner_handler = Some(tokio::spawn({
// let au = self.analytic_unit.unwrap();
// let ms = self.metric_service.clone();
// async move {
// // TODO: implement
// }
// }));
// }
fn consume_request(&mut self, req: types::RequestType) -> () {
match req {
@ -148,8 +123,9 @@ impl AnalyticService {
let tx = self.tx.clone();
let ms = self.metric_service.clone();
let ss = self.segments_service.clone();
let cfg = self.analytic_unit_config.clone();
async move {
AnalyticService::run_learning(tx, ms, ss).await;
AnalyticService::run_learning(tx, cfg, ms, ss).await;
}
}));
}
@ -176,20 +152,20 @@ impl AnalyticService {
RequestType::GetStatus(tx) => {
tx.send(self.analytic_unit_learning_status.clone()).unwrap();
}
RequestType::GetLearningTrain(tx) => {
if self.analytic_unit_learning_results.is_none() {
tx.send(LearningTrain::default()).unwrap();
} else {
tx.send(
self.analytic_unit_learning_results
.as_ref()
.unwrap()
.learning_train
.clone(),
)
.unwrap();
}
}
// RequestType::GetLearningTrain(tx) => {
// if self.analytic_unit_learning_results.is_none() {
// tx.send(LearningTrain::default()).unwrap();
// } else {
// tx.send(
// self.analytic_unit_learning_results
// .as_ref()
// .unwrap()
// .learning_train
// .clone(),
// )
// .unwrap();
// }
// }
RequestType::GetConfig(tx) => {
tx.send(self.analytic_unit_config.clone()).unwrap();
}
@ -199,10 +175,12 @@ impl AnalyticService {
fn consume_response(&mut self, res: types::ResponseType) {
match res {
// TODO: handle when learning panic
ResponseType::LearningStarted => self.analytic_unit_learning_status = LearningStatus::Learning,
ResponseType::LearningStarted => {
self.analytic_unit_learning_status = LearningStatus::Learning
}
ResponseType::LearningFinished(results) => {
self.learning_handler = None;
self.analytic_unit_learning_results = Some(results);
self.analytic_unit = Some(Arc::new(tokio::sync::RwLock::new(results)));
self.analytic_unit_learning_status = LearningStatus::Ready;
// TODO: run tasks from self.learning_waiter
@ -212,21 +190,21 @@ impl AnalyticService {
}
// TODO: fix this
if self.endpoint.is_some() {
self.run_detection_runner(DetectionRunnerConfig {
endpoint: self.endpoint.as_ref().unwrap().clone(),
from: Utc::now().timestamp() as u64,
});
}
// if self.endpoint.is_some() {
// self.run_detection_runner(DetectionRunnerConfig {
// endpoint: self.endpoint.as_ref().unwrap().clone(),
// from: Utc::now().timestamp() as u64,
// });
// }
}
ResponseType::LearningFinishedEmpty => {
// TODO: drop all learning_waiters with empty results
self.analytic_unit_learning_results = None;
self.analytic_unit = None;
self.analytic_unit_learning_status = LearningStatus::Initialization;
}
ResponseType::LearningDatasourceError => {
// TODO: drop all learning_waiters with error
self.analytic_unit_learning_results = None;
self.analytic_unit = None;
self.analytic_unit_learning_status = LearningStatus::Error;
}
}
@ -246,9 +224,12 @@ impl AnalyticService {
async fn run_learning(
tx: mpsc::Sender<AnalyticServiceMessage>,
aucfg: AnalyticUnitConfig,
ms: MetricService,
ss: SegmentsService,
) {
let mut au = resolve(aucfg);
match tx
.send(AnalyticServiceMessage::Response(
ResponseType::LearningStarted,
@ -256,70 +237,17 @@ impl AnalyticService {
.await
{
Ok(_) => {}
Err(_e) => println!("Fail to send notification about learning start"),
Err(_e) => println!("Fail to send learning started notification"),
}
// TODO: logic for returning error
// be careful if decide to store detections in db
let segments = ss.get_segments_inside(0, u64::MAX / 2).unwrap();
let has_segments_label = segments
.iter()
.find(|s| s.segment_type == SegmentType::Label)
.is_some();
if !has_segments_label {
match tx
.send(AnalyticServiceMessage::Response(
ResponseType::LearningFinishedEmpty,
))
.await
{
Ok(_) => {}
Err(_e) => println!("Fail to send learning results"),
}
return;
}
let fs = segments.iter().map(|s| segment_to_segdata(&ms, s));
let rs = future::join_all(fs).await;
let mut learn_tss = Vec::new();
let mut learn_anti_tss = Vec::new();
for r in rs {
if r.is_err() {
println!("Error extracting metrics from datasource");
match tx
.send(AnalyticServiceMessage::Response(
ResponseType::LearningDatasourceError,
))
.await
{
Ok(_) => {}
Err(_e) => println!("Fail send error abour extracting error"),
}
return;
}
let sd = r.unwrap();
if sd.data.is_empty() {
continue;
}
if sd.label {
learn_tss.push(sd.data);
} else {
learn_anti_tss.push(sd.data);
}
}
// TODO: maybe to spawn_blocking here
let lr = match au.learn(ms, ss).await {
LearningResult::Finished => ResponseType::LearningFinished(au),
LearningResult::DatasourceError => ResponseType::LearningDatasourceError,
LearningResult::FinishedEmpty => ResponseType::LearningFinishedEmpty,
};
let lr = PatternAnalyticUnit::learn(&learn_tss, &learn_anti_tss).await;
match tx
.send(AnalyticServiceMessage::Response(
ResponseType::LearningFinished(lr),
))
.await
{
match tx.send(AnalyticServiceMessage::Response(lr)).await {
Ok(_) => {}
Err(_e) => println!("Fail to send learning results"),
}
@ -327,28 +255,17 @@ impl AnalyticService {
async fn get_detections(
tx: oneshot::Sender<anyhow::Result<Vec<Segment>>>,
lr: LearningResults,
analytic_unit: Arc<RwLock<Box<dyn AnalyticUnit + Send + Sync>>>,
ms: MetricService,
from: u64,
to: u64,
) {
let pt = pattern_analytic_unit::PatternAnalyticUnit::new(lr);
let mr = ms.query(from, to, DETECTION_STEP).await.unwrap();
if mr.data.keys().len() == 0 {
match tx.send(Ok(Vec::new())) {
Ok(_) => {}
Err(_e) => {
println!("failed to send empty results");
}
}
return;
}
let k = mr.data.keys().nth(0).unwrap();
let ts = &mr.data[k];
let result = pt.detect(ts);
let result = analytic_unit
.read()
.await
.detect(ms, from, to)
.await
.unwrap();
let result_segments: Vec<Segment> = result
.iter()
@ -368,6 +285,4 @@ impl AnalyticService {
}
return;
}
}

15
server/src/services/analytic_service/analytic_unit/mod.rs

@ -2,11 +2,14 @@ pub mod pattern_analytic_unit;
pub mod threshold_analytic_unit;
pub mod types;
use async_trait::async_trait;
use self::{
pattern_analytic_unit::PatternAnalyticUnit, threshold_analytic_unit::ThresholdAnalyticUnit,
types::AnalyticUnitConfig,
};
#[async_trait]
trait AnalyticUnit<C> {
async fn learn(reads: &Vec<Vec<(u64, f64)>>, anti_reads: &Vec<Vec<(u64, f64)>>);
fn detect(&self, ts: &Vec<(u64, f64)>) -> Vec<(u64, u64)>;
pub fn resolve(cfg: AnalyticUnitConfig) -> Box<dyn types::AnalyticUnit + Send + Sync> {
match cfg {
AnalyticUnitConfig::Pattern(c) => Box::new(PatternAnalyticUnit::new(c.clone())),
AnalyticUnitConfig::Threshold(c) => Box::new(ThresholdAnalyticUnit::new(c.clone())),
}
}

299
server/src/services/analytic_service/analytic_unit/pattern_analytic_unit.rs

@ -1,19 +1,27 @@
use std::{fmt, sync::Arc};
use futures::future;
use parking_lot::Mutex;
use serde::{Deserialize, Serialize};
use serde_json;
use linfa::prelude::*;
use linfa;
use linfa_svm::{error::Result, Svm};
use linfa_svm::Svm;
use ndarray::{Array, ArrayView, Axis};
use ndarray::Array;
use crate::services::analytic_service::types::LearningTrain;
use crate::services::{
analytic_service::types::{self, LearningTrain},
metric_service::MetricService,
segments_service::{Segment, SegmentType, SegmentsService},
};
use super::types::{AnalyticUnit, LearningResult, PatternConfig};
use async_trait::async_trait;
// TODO: move to config
const DETECTION_STEP: u64 = 10;
#[derive(Clone)]
pub struct LearningResults {
@ -54,11 +62,6 @@ pub type Features = [f64; FEATURES_SIZE];
pub const SCORE_THRESHOLD: f64 = 0.95;
#[derive(Clone)]
pub struct PatternAnalyticUnit {
learning_results: LearningResults,
}
fn nan_to_zero(n: f64) -> f64 {
if n.is_nan() {
return 0.;
@ -66,16 +69,159 @@ fn nan_to_zero(n: f64) -> f64 {
return n;
}
struct SegData {
label: bool,
data: Vec<(u64, f64)>,
}
async fn segment_to_segdata(ms: &MetricService, segment: &Segment) -> anyhow::Result<SegData> {
let mut mr = ms.query(segment.from, segment.to, DETECTION_STEP).await?;
if mr.data.keys().len() == 0 {
return Ok(SegData {
label: segment.segment_type == SegmentType::Label,
data: Default::default(),
});
}
let k = mr.data.keys().nth(0).unwrap().clone();
let ts = mr.data.remove(&k).unwrap();
Ok(SegData {
label: segment.segment_type == SegmentType::Label,
data: ts,
})
}
pub struct PatternAnalyticUnit {
config: PatternConfig,
learning_results: Option<LearningResults>,
}
// TODO: move this to loginc of analytic unit
impl PatternAnalyticUnit {
pub fn new(learning_results: LearningResults) -> PatternAnalyticUnit {
PatternAnalyticUnit { learning_results }
pub fn new(cfg: PatternConfig) -> PatternAnalyticUnit {
PatternAnalyticUnit {
config: cfg,
learning_results: None,
}
}
fn corr_aligned(xs: &Vec<f64>, ys: &Vec<f64>) -> f64 {
let n = xs.len() as f64;
let mut s_xs: f64 = 0f64;
let mut s_ys: f64 = 0f64;
let mut s_xsys: f64 = 0f64;
let mut s_xs_2: f64 = 0f64;
let mut s_ys_2: f64 = 0f64;
let min = xs.len().min(ys.len());
xs.iter()
.take(min)
.zip(ys.iter().take(min))
.for_each(|(xi, yi)| {
s_xs += xi;
s_ys += yi;
s_xsys += xi * yi;
s_xs_2 += xi * xi;
s_ys_2 += yi * yi;
});
let numerator: f64 = n * s_xsys - s_xs * s_ys;
let denominator: f64 = ((n * s_xs_2 - s_xs * s_xs) * (n * s_ys_2 - s_ys * s_ys)).sqrt();
// IT"s a hack
if denominator < 0.01 {
return 0.;
}
let result: f64 = numerator / denominator;
// assert!(result.abs() <= 1.01);
if result.abs() > 1.1 {
println!("{:?}", xs);
println!("------------");
println!("{:?}", ys);
println!("WARNING: corr result > 1: {}", result);
}
return result;
}
pub async fn learn(
reads: &Vec<Vec<(u64, f64)>>,
anti_reads: &Vec<Vec<(u64, f64)>>,
) -> LearningResults {
fn get_features(xs: &Vec<f64>) -> Features {
let mut min = f64::MAX;
let mut max = f64::MIN;
let mut sum = 0f64;
for x in xs {
min = min.min(*x);
max = max.max(*x);
sum += x;
}
let mean = sum / xs.len() as f64;
sum = 0f64;
for x in xs {
sum += (x - mean) * (x - mean);
}
let sd = sum.sqrt();
// TODO: add autocorrelation
// TODO: add FFT
// TODO: add DWT
return [
min, max, mean, sd,
// 0f64,0f64,
// 0f64,0f64,0f64, 0f64
];
}
}
#[async_trait]
impl AnalyticUnit for PatternAnalyticUnit {
async fn learn(&mut self, ms: MetricService, ss: SegmentsService) -> LearningResult {
// be careful if decide to store detections in db
let segments = ss.get_segments_inside(0, u64::MAX / 2).unwrap();
let has_segments_label = segments
.iter()
.find(|s| s.segment_type == SegmentType::Label)
.is_some();
if !has_segments_label {
return LearningResult::FinishedEmpty;
}
let fs = segments.iter().map(|s| segment_to_segdata(&ms, s));
let rs = future::join_all(fs).await;
let mut learn_tss = Vec::new();
let mut learn_anti_tss = Vec::new();
for r in rs {
if r.is_err() {
println!("Error extracting metrics from datasource");
return LearningResult::DatasourceError;
}
let sd = r.unwrap();
if sd.data.is_empty() {
continue;
}
if sd.label {
learn_tss.push(sd.data);
} else {
learn_anti_tss.push(sd.data);
}
}
// let reads: &Vec<Vec<(u64, f64)>> = // TODO
// let anti_reads: &Vec<Vec<(u64, f64)>> // TODO
// let size_avg = reads.iter().map(|r| r.len()).sum::<usize>() / reads.len();
let mut patterns = Vec::<Vec<f64>>::new();
@ -84,7 +230,7 @@ impl PatternAnalyticUnit {
let mut records_raw = Vec::<Features>::new();
let mut targets_raw = Vec::<bool>::new();
for r in reads {
for r in learn_tss {
let xs: Vec<f64> = r.iter().map(|e| e.1).map(nan_to_zero).collect();
let fs = PatternAnalyticUnit::get_features(&xs);
@ -93,7 +239,7 @@ impl PatternAnalyticUnit {
patterns.push(xs);
}
for r in anti_reads {
for r in learn_anti_tss {
let xs: Vec<f64> = r.iter().map(|e| e.1).map(nan_to_zero).collect();
let fs = PatternAnalyticUnit::get_features(&xs);
records_raw.push(fs);
@ -137,7 +283,7 @@ impl PatternAnalyticUnit {
// println!("pridiction: {}", prediction );
LearningResults {
self.learning_results = Some(LearningResults {
model: Arc::new(Mutex::new(model)),
learning_train: LearningTrain {
@ -147,15 +293,36 @@ impl PatternAnalyticUnit {
patterns,
anti_patterns,
}
});
return LearningResult::Finished;
}
// TODO: get iterator instead of vector
pub fn detect(&self, ts: &Vec<(u64, f64)>) -> Vec<(u64, u64)> {
async fn detect(
&self,
ms: MetricService,
from: u64,
to: u64,
) -> anyhow::Result<Vec<(u64, u64)>> {
if self.learning_results.is_none() {
return Err(anyhow::format_err!("Learning results are not ready"));
}
let mr = ms.query(from, to, DETECTION_STEP).await.unwrap();
if mr.data.keys().len() == 0 {
return Ok(Vec::new());
}
let k = mr.data.keys().nth(0).unwrap();
let ts = &mr.data[k];
let lr = self.learning_results.as_ref().unwrap();
let mut results = Vec::new();
let pt = &self.learning_results.patterns;
let apt = &self.learning_results.anti_patterns;
let pt = &lr.patterns;
let apt = &lr.anti_patterns;
for i in 0..ts.len() {
let mut pattern_match_score = 0f64;
@ -168,7 +335,7 @@ impl PatternAnalyticUnit {
for j in 0..p.len() {
backet.push(nan_to_zero(ts[i + j].1));
}
let score = PatternAnalyticUnit::corr_aligned(p, &backet);
let score = PatternAnalyticUnit::corr_aligned(&p, &backet);
if score > pattern_match_score {
pattern_match_score = score;
pattern_match_len = p.len();
@ -182,7 +349,7 @@ impl PatternAnalyticUnit {
for j in 0..p.len() {
backet.push(nan_to_zero(ts[i + j].1));
}
let score = PatternAnalyticUnit::corr_aligned(p, &backet);
let score = PatternAnalyticUnit::corr_aligned(&p, &backet);
if score > anti_pattern_match_score {
anti_pattern_match_score = score;
}
@ -195,11 +362,7 @@ impl PatternAnalyticUnit {
backet.push(nan_to_zero(ts[i + j].1));
}
let fs = PatternAnalyticUnit::get_features(&backet);
let detected = self
.learning_results
.model
.lock()
.predict(Array::from_vec(fs.to_vec()));
let detected = lr.model.lock().predict(Array::from_vec(fs.to_vec()));
if detected {
pattern_match_score += 0.1;
} else {
@ -214,80 +377,6 @@ impl PatternAnalyticUnit {
}
}
return results;
}
fn corr_aligned(xs: &Vec<f64>, ys: &Vec<f64>) -> f64 {
let n = xs.len() as f64;
let mut s_xs: f64 = 0f64;
let mut s_ys: f64 = 0f64;
let mut s_xsys: f64 = 0f64;
let mut s_xs_2: f64 = 0f64;
let mut s_ys_2: f64 = 0f64;
let min = xs.len().min(ys.len());
xs.iter()
.take(min)
.zip(ys.iter().take(min))
.for_each(|(xi, yi)| {
s_xs += xi;
s_ys += yi;
s_xsys += xi * yi;
s_xs_2 += xi * xi;
s_ys_2 += yi * yi;
});
let numerator: f64 = n * s_xsys - s_xs * s_ys;
let denominator: f64 = ((n * s_xs_2 - s_xs * s_xs) * (n * s_ys_2 - s_ys * s_ys)).sqrt();
// IT"s a hack
if denominator < 0.01 {
return 0.;
}
let result: f64 = numerator / denominator;
// assert!(result.abs() <= 1.01);
if result.abs() > 1.1 {
println!("{:?}", xs);
println!("------------");
println!("{:?}", ys);
println!("WARNING: corr result > 1: {}", result);
}
return result;
}
fn get_features(xs: &Vec<f64>) -> Features {
let mut min = f64::MAX;
let mut max = f64::MIN;
let mut sum = 0f64;
for x in xs {
min = min.min(*x);
max = max.max(*x);
sum += x;
}
let mean = sum / xs.len() as f64;
sum = 0f64;
for x in xs {
sum += (x - mean) * (x - mean);
}
let sd = sum.sqrt();
// TODO: add autocorrelation
// TODO: add FFT
// TODO: add DWT
return [
min, max, mean, sd,
// 0f64,0f64,
// 0f64,0f64,0f64, 0f64
];
Ok(results)
}
}

46
server/src/services/analytic_service/analytic_unit/threshold_analytic_unit.rs

@ -1,15 +1,43 @@
use super::types::ThresholdConfig;
use crate::services::{
analytic_service::types, metric_service::MetricService, segments_service::SegmentsService,
};
struct ThresholdDetector {
config: ThresholdConfig
use super::types::{AnalyticUnit, LearningResult, ThresholdConfig};
use async_trait::async_trait;
// TODO: move to config
const DETECTION_STEP: u64 = 10;
pub struct ThresholdAnalyticUnit {
config: ThresholdConfig,
}
impl ThresholdDetector {
fn new(config: ThresholdConfig) -> ThresholdDetector {
ThresholdDetector{ config }
impl ThresholdAnalyticUnit {
pub fn new(config: ThresholdConfig) -> ThresholdAnalyticUnit {
ThresholdAnalyticUnit { config }
}
}
pub fn detect(&self, ts: &Vec<(u64, f64)>) -> Vec<(u64, u64)> {
#[async_trait]
impl AnalyticUnit for ThresholdAnalyticUnit {
async fn learn(&mut self, _ms: MetricService, _ss: SegmentsService) -> LearningResult {
return LearningResult::Finished;
}
async fn detect(
&self,
ms: MetricService,
from: u64,
to: u64,
) -> anyhow::Result<Vec<(u64, u64)>> {
let mr = ms.query(from, to, DETECTION_STEP).await.unwrap();
if mr.data.keys().len() == 0 {
return Ok(Vec::new());
}
let k = mr.data.keys().nth(0).unwrap();
let ts = &mr.data[k];
let mut result = Vec::<(u64, u64)>::new();
let mut from: Option<u64> = None;
@ -35,6 +63,6 @@ impl ThresholdDetector {
// TODO: decide what to do it from is Some() in the end
result
Ok(result)
}
}
}

29
server/src/services/analytic_service/analytic_unit/types.rs

@ -1,9 +1,15 @@
use serde::{Deserialize, Serialize};
use async_trait::async_trait;
use crate::services::{
analytic_service::types, metric_service::MetricService, segments_service::SegmentsService,
};
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct PatternConfig {
pub correlation_score: f32,
pub model_score: f32
pub model_score: f32,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
@ -14,5 +20,22 @@ pub struct ThresholdConfig {
#[derive(Debug, Serialize, Deserialize, Clone)]
pub enum AnalyticUnitConfig {
Pattern(PatternConfig),
Threshold(ThresholdConfig)
}
Threshold(ThresholdConfig),
}
pub enum LearningResult {
Finished,
FinishedEmpty,
DatasourceError,
}
#[async_trait]
pub trait AnalyticUnit {
async fn learn(&mut self, ms: MetricService, ss: SegmentsService) -> LearningResult;
async fn detect(
&self,
ms: MetricService,
from: u64,
to: u64,
) -> anyhow::Result<Vec<(u64, u64)>>;
}

2
server/src/services/analytic_service/mod.rs

@ -1,5 +1,5 @@
mod analytic_service;
mod analytic_unit;
pub mod analytic_unit;
pub mod types;
pub mod analytic_client;

21
server/src/services/analytic_service/types.rs

@ -1,11 +1,18 @@
use std::fmt;
use crate::services::segments_service::Segment;
use super::analytic_unit::{pattern_analytic_unit::{self, LearningResults}, types::AnalyticUnitConfig};
use super::analytic_unit::{
pattern_analytic_unit::{self, LearningResults},
types::AnalyticUnitConfig,
};
use anyhow::Result;
use serde::Serialize;
use tokio::sync::oneshot;
use crate::services::analytic_service::analytic_unit::types::AnalyticUnit;
#[derive(Debug, Clone, PartialEq, Serialize)]
pub enum LearningStatus {
Initialization,
@ -31,14 +38,20 @@ impl Default for LearningTrain {
}
}
#[derive(Debug)]
pub enum ResponseType {
LearningStarted,
LearningFinished(LearningResults),
LearningFinished(Box<dyn AnalyticUnit + Send + Sync>),
LearningFinishedEmpty,
LearningDatasourceError,
}
impl fmt::Debug for ResponseType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// TODO: implement
f.debug_tuple("foo").finish()
}
}
#[derive(Debug)]
pub struct DetectionTask {
pub sender: oneshot::Sender<Result<Vec<Segment>>>,
@ -60,7 +73,7 @@ pub enum RequestType {
RunDetection(DetectionTask),
GetStatus(oneshot::Sender<LearningStatus>),
GetConfig(oneshot::Sender<AnalyticUnitConfig>),
GetLearningTrain(oneshot::Sender<LearningTrain>),
// GetLearningTrain(oneshot::Sender<LearningTrain>),
}
#[derive(Debug)]

Loading…
Cancel
Save