Browse Source

detections begin

pull/25/head
Alexey Velikiy 3 years ago
parent
commit
148cedbed9
  1. 7
      server/src/api.rs
  2. 60
      server/src/api/analytics.rs
  3. 3
      server/src/api/metric.rs
  4. 27
      server/src/services/analytic_service.rs
  5. 1
      server/src/services/metric_service.rs
  6. 3
      server/src/services/segments_service.rs

7
server/src/api.rs

@ -1,4 +1,5 @@
use hastic::config::Config; use hastic::config::Config;
use hastic::services::analytic_service::AnalyticService;
use hastic::services::{metric_service, segments_service, user_service}; use hastic::services::{metric_service, segments_service, user_service};
use warp::http::HeaderValue; use warp::http::HeaderValue;
use warp::hyper::{Body, StatusCode}; use warp::hyper::{Body, StatusCode};
@ -9,6 +10,7 @@ use warp::{http::Response, Filter};
mod auth; mod auth;
mod metric; mod metric;
mod segments; mod segments;
mod analytics;
use serde::Serialize; use serde::Serialize;
@ -30,6 +32,7 @@ pub struct API<'a> {
user_service: Arc<RwLock<user_service::UserService>>, user_service: Arc<RwLock<user_service::UserService>>,
metric_service: Arc<RwLock<metric_service::MetricService>>, metric_service: Arc<RwLock<metric_service::MetricService>>,
data_service: Arc<RwLock<segments_service::SegmentsService>>, data_service: Arc<RwLock<segments_service::SegmentsService>>,
analytic_service: AnalyticService
} }
impl API<'_> { impl API<'_> {
@ -42,6 +45,7 @@ impl API<'_> {
&config.query, &config.query,
))), ))),
data_service: Arc::new(RwLock::new(segments_service::SegmentsService::new()?)), data_service: Arc::new(RwLock::new(segments_service::SegmentsService::new()?)),
analytic_service: AnalyticService::new(config)
}) })
} }
@ -77,12 +81,13 @@ impl API<'_> {
let metrics = metric::get_route(self.metric_service.clone()); let metrics = metric::get_route(self.metric_service.clone());
let login = auth::get_route(self.user_service.clone()); let login = auth::get_route(self.user_service.clone());
let segments = segments::filters::filters(self.data_service.clone()); let segments = segments::filters::filters(self.data_service.clone());
let analytics = analytics::filters::filters(self.analytic_service.clone());
let public = warp::fs::dir("public"); let public = warp::fs::dir("public");
println!("Start server on {} port", self.config.port); println!("Start server on {} port", self.config.port);
// TODO: move it to "server" // TODO: move it to "server"
let routes = warp::path("api") let routes = warp::path("api")
.and(login.or(metrics).or(segments).or(options)) .and(login.or(metrics).or(segments).or(analytics).or(options))
.or(public) .or(public)
.or(not_found); .or(not_found);
warp::serve(routes) warp::serve(routes)

60
server/src/api/analytics.rs

@ -0,0 +1,60 @@
pub mod filters {
use super::handlers;
use super::models::{Srv, ListOptions};
use warp::Filter;
/// The 4 REST API filters combined.
pub fn filters(
srv: Srv,
) -> impl Filter<Extract = impl warp::Reply, Error = warp::Rejection> + Clone {
list(srv.clone())
// .or(create(db.clone()))
// // .or(update(db.clone()))
// .or(delete(db.clone()))
}
/// GET /analytics?from=3&to=5
pub fn list(
db: Srv,
) -> impl Filter<Extract = impl warp::Reply, Error = warp::Rejection> + Clone {
warp::path!("analytics")
.and(warp::get())
.and(warp::query::<ListOptions>())
.and(with_srv(db))
.and_then(handlers::list)
}
fn with_srv(srv: Srv) -> impl Filter<Extract = (Srv,), Error = std::convert::Infallible> + Clone {
warp::any().map(move || srv.clone())
}
}
mod handlers {
use super::models::{ListOptions, Srv};
use crate::api::{API, BadQuery};
pub async fn list(opts: ListOptions, srv: Srv) -> Result<impl warp::Reply, warp::Rejection> {
match srv.get_detections(opts.from, opts.to, 10).await {
Ok(segments) => Ok(API::json(&segments)),
Err(e) => Err(warp::reject::custom(BadQuery)),
}
}
}
mod models {
use hastic::services::analytic_service;
use serde::{Deserialize, Serialize};
// use parking_lot::RwLock;
// use std::sync::Arc;
pub type Srv = analytic_service::AnalyticService;
// The query parameters for list_todos.
#[derive(Debug, Deserialize)]
pub struct ListOptions {
pub from: u64,
pub to: u64,
}
}

3
server/src/api/metric.rs

@ -17,8 +17,6 @@ use crate::api::{self, API};
use parking_lot::RwLock; use parking_lot::RwLock;
use std::collections::HashMap; use std::collections::HashMap;
use std::error::Error;
use std::fmt::Debug;
use std::sync::Arc; use std::sync::Arc;
use super::BadQuery; use super::BadQuery;
@ -54,7 +52,6 @@ async fn query(
p: HashMap<String, String>, p: HashMap<String, String>,
ms: Arc<RwLock<metric_service::MetricService>>, ms: Arc<RwLock<metric_service::MetricService>>,
) -> Result<impl warp::Reply, warp::Rejection> { ) -> Result<impl warp::Reply, warp::Rejection> {
//Err(warp::reject::custom(BadQuery));
match get_query(p, ms).await { match get_query(p, ms).await {
Ok(res) => Ok(API::json(&res)), Ok(res) => Ok(API::json(&res)),
// TODO: parse different error types // TODO: parse different error types

27
server/src/services/analytic_service.rs

@ -7,21 +7,25 @@ use subbeat::metric::Metric;
use anyhow; use anyhow;
#[derive(Clone)]
struct AnalyticService { pub struct AnalyticService {
metric_service: MetricService, metric_service: MetricService,
} }
impl AnalyticService { impl AnalyticService {
fn new(config: &Config) -> AnalyticService { pub fn new(config: &Config) -> AnalyticService {
AnalyticService { AnalyticService {
metric_service: MetricService::new(&config.prom_url, &config.query), metric_service: MetricService::new(&config.prom_url, &config.query),
} }
} }
pub async fn get_detections(&self, from: u64, to: u64) -> anyhow::Result<Vec<Segment>> { pub async fn get_detections(&self, from: u64, to: u64, step: u64) -> anyhow::Result<Vec<Segment>> {
let prom = self.metric_service.get_prom(); let prom = self.metric_service.get_prom();
let mr = prom.query(from, to, 10).await?; let mr = prom.query(from, to, step).await?;
if mr.data.keys().len() == 0 {
return Ok(Vec::new());
}
let key = mr.data.keys().nth(0).unwrap(); let key = mr.data.keys().nth(0).unwrap();
let ts = &mr.data[key]; let ts = &mr.data[key];
@ -29,7 +33,7 @@ impl AnalyticService {
let mut result = Vec::<Segment>::new(); let mut result = Vec::<Segment>::new();
let mut from: Option<u64> = None; let mut from: Option<u64> = None;
for (t, v) in ts { for (t, v) in ts {
if *v > 100.0 { if *v > 10_000.0 {
if from.is_some() { if from.is_some() {
continue; continue;
} else { } else {
@ -48,6 +52,17 @@ impl AnalyticService {
} }
} }
if from.is_some() {
result.push(Segment {
id: None,
from: from.unwrap(),
to,
segment_type: SegmentType::Detection,
});
}
// TODO: decide what to do it from is Some() in the end
Ok(result) Ok(result)
} }
} }

1
server/src/services/metric_service.rs

@ -1,5 +1,6 @@
use subbeat::datasources::prometheus::Prometheus; use subbeat::datasources::prometheus::Prometheus;
#[derive(Clone)]
pub struct MetricService { pub struct MetricService {
url: String, url: String,
query: String, query: String,

3
server/src/services/segments_service.rs

@ -6,6 +6,7 @@ use std::sync::{Arc, Mutex};
use std::iter::repeat_with; use std::iter::repeat_with;
const ID_LENGTH: usize = 20; const ID_LENGTH: usize = 20;
pub type SegmentId = String; pub type SegmentId = String;
@ -55,8 +56,6 @@ impl Segment {
} }
} }
// TODO: find a way to remove this unsafe
unsafe impl Sync for SegmentsService {}
pub struct SegmentsService { pub struct SegmentsService {
connection: Arc<Mutex<Connection>>, connection: Arc<Mutex<Connection>>,

Loading…
Cancel
Save