Browse Source

pattern detector continue

pull/25/head
Alexey Velikiy 3 years ago
parent
commit
f6862fbc7f
  1. 44
      server/Cargo.lock
  2. 1
      server/Cargo.toml
  3. 8
      server/src/api/analytics.rs
  4. 40
      server/src/services/analytic_service.rs
  5. 27
      server/src/services/analytic_service/pattern_detector.rs
  6. 1
      server/src/services/metric_service.rs

44
server/Cargo.lock generated

@ -251,6 +251,7 @@ checksum = "a12aa0eb539080d55c3f2d45a67c3b58b6b0773c1a3ca2dfec66d58c97fd66ca"
dependencies = [
"futures-channel",
"futures-core",
"futures-executor",
"futures-io",
"futures-sink",
"futures-task",
@ -273,12 +274,36 @@ version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "88d1c26957f23603395cd326b0ffe64124b818f4449552f960d815cfba83a53d"
[[package]]
name = "futures-executor"
version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "45025be030969d763025784f7f355043dc6bc74093e4ecc5000ca4dc50d8745c"
dependencies = [
"futures-core",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-io"
version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "522de2a0fe3e380f1bc577ba0474108faf3f6b18321dbf60b3b9c39a75073377"
[[package]]
name = "futures-macro"
version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "18e4a4b95cea4b4ccbcf1c5675ca7c4ee4e9e75eb79944d07defde18068f79bb"
dependencies = [
"autocfg",
"proc-macro-hack",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "futures-sink"
version = "0.3.17"
@ -298,11 +323,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "36568465210a3a6ee45e1f165136d68671471a501e632e9a98d96872222b5481"
dependencies = [
"autocfg",
"futures-channel",
"futures-core",
"futures-io",
"futures-macro",
"futures-sink",
"futures-task",
"memchr",
"pin-project-lite",
"pin-utils",
"proc-macro-hack",
"proc-macro-nested",
"slab",
]
@ -382,6 +413,7 @@ dependencies = [
"anyhow",
"config",
"fastrand",
"futures",
"libsqlite3-sys",
"openssl",
"parking_lot",
@ -866,6 +898,18 @@ version = "0.2.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c3ca011bd0129ff4ae15cd04c4eef202cadf6c51c21e47aba319b4e0501db741"
[[package]]
name = "proc-macro-hack"
version = "0.5.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dbf0c48bc1d91375ae5c3cd81e3722dff1abcf81a30960240640d223f59fe0e5"
[[package]]
name = "proc-macro-nested"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc881b2c22681370c6a780e47af9840ef841837bc98118431d4e1868bd0c1086"
[[package]]
name = "proc-macro2"
version = "1.0.30"

1
server/Cargo.toml

@ -19,3 +19,4 @@ openssl = { version = "=0.10.33", features = ["vendored"] }
rusqlite = "0.26.1"
# https://github.com/rusqlite/rusqlite/issues/914
libsqlite3-sys = { version = "*", features = ["bundled"] }
futures = "0.3.17"

8
server/src/api/analytics.rs

@ -37,9 +37,13 @@ mod handlers {
use crate::api::{BadQuery, API};
pub async fn list(opts: ListOptions, srv: Srv) -> Result<impl warp::Reply, warp::Rejection> {
match srv.get_threshold_detections(opts.from, opts.to, 10, 100_000.).await {
// match srv.get_threshold_detections(opts.from, opts.to, 10, 100_000.).await {
match srv.get_pattern_detection(opts.from, opts.to).await {
Ok(segments) => Ok(API::json(&segments)),
Err(e) => Err(warp::reject::custom(BadQuery)),
Err(e) => {
println!("{:?}", e);
Err(warp::reject::custom(BadQuery))
}
}
}
}

40
server/src/services/analytic_service.rs

@ -1,3 +1,4 @@
use crate::{utils::get_random_str};
use super::{metric_service::MetricService, segments_service::{self, ID_LENGTH, Segment, SegmentType, SegmentsService}};
@ -8,6 +9,8 @@ use anyhow;
mod pattern_detector;
use futures::future;
#[derive(Clone)]
pub struct AnalyticService {
@ -23,10 +26,39 @@ impl AnalyticService {
}
}
pub async fn get_pattern_detection() -> anyhow::Result<Vec<Segment>> {
// TODO: get segments
// TODO: get reads from segments
// TODO: run learn
pub async fn get_pattern_detection(&self, from: u64, to: u64) -> anyhow::Result<Vec<Segment>> {
let innter_step = 10u64;
let segments = self.segments_service.get_segments_inside(0, u64::MAX / 2)?;
let prom = self.metric_service.get_prom();
let fs = segments.iter().map(|s| prom.query(s.from, s.to, innter_step));
let rs = future::join_all(fs).await;
let mut pt = pattern_detector::PatternDetector::new();
// TODO: run this on label adding
// TODO: save learning results in cache
let mut learn_tss = Vec::new();
for r in rs {
let mr = r.unwrap();
if mr.data.keys().len() == 0 {
continue;
}
let k = mr.data.keys().nth(0).unwrap();
let ts = &mr.data[k];
// TODO: maybe not clone
learn_tss.push(ts.clone());
}
pt.learn(&learn_tss);
let ts = prom.query(from, to, innter_step).await?;
// pt.detect(ts);
// TODO: run detections
// TODO: convert detections to segments
Ok(Vec::new())

27
server/src/services/analytic_service/pattern_detector.rs

@ -1,19 +1,34 @@
use subbeat::metric::MetricResult;
struct PatternDetector {
struct LearningResults {
backet_size: usize
}
pub struct PatternDetector {
learning_results: Option<LearningResults>
}
impl PatternDetector {
fn new() -> PatternDetector {
PatternDetector{}
pub fn new() -> PatternDetector {
PatternDetector{
learning_results: None
}
}
fn learn(reads: &Vec<Vec<(u64, f64)>>) {
pub fn learn(&mut self, reads: &Vec<Vec<(u64, f64)>>) {
// TODO: implement
let mut min_size = usize::MAX;
let mut max_size = 0usize;
for r in reads {
min_size = min_size.min(r.len());
max_size = max_size.max(r.len());
}
self.learning_results = Some(LearningResults{
backet_size: (min_size + max_size) / 2
});
}
fn detect(ts: &Vec<(u64, f64)>) -> Vec<(u64, u64)> {
pub fn detect(&self, ts: &Vec<(u64, f64)>) -> Vec<(u64, u64)> {
// fill backet
return Vec::new();
}

1
server/src/services/metric_service.rs

@ -14,6 +14,7 @@ impl MetricService {
}
}
// TODO: make prom as field, but Prometheus should be clonable first
pub fn get_prom(&self) -> Prometheus {
Prometheus::new(&self.url.to_string(), &self.query.to_string())
}

Loading…
Cancel
Save