Browse Source

status buggy impl

pull/25/head
Alexey Velikiy 3 years ago
parent
commit
1c06b44cbc
  1. 42
      server/src/api/analytics.rs
  2. 7
      server/src/api/mod.rs
  3. 13
      server/src/api/segments.rs
  4. 18
      server/src/services/analytic_service/analytic_client.rs
  5. 74
      server/src/services/analytic_service/analytic_service.rs
  6. 2
      server/src/services/analytic_service/mod.rs
  7. 19
      server/src/services/analytic_service/types.rs

42
server/src/api/analytics.rs

@ -7,8 +7,7 @@ pub mod filters {
pub fn filters(
client: Client,
) -> impl Filter<Extract = impl warp::Reply, Error = warp::Rejection> + Clone {
list(client.clone())
// TODO: /status endpoint
list(client.clone()).or(status(client.clone()))
// .or(create(db.clone()))
// // .or(update(db.clone()))
// .or(delete(db.clone()))
@ -25,6 +24,16 @@ pub mod filters {
.and_then(handlers::list)
}
/// GET /analytics/status
pub fn status(
client: Client,
) -> impl Filter<Extract = impl warp::Reply, Error = warp::Rejection> + Clone {
warp::path!("analytics" / "status")
.and(warp::get())
.and(with_client(client))
.and_then(handlers::status)
}
fn with_client(
client: Client,
) -> impl Filter<Extract = (Client,), Error = std::convert::Infallible> + Clone {
@ -34,12 +43,15 @@ pub mod filters {
mod handlers {
use super::models::{Client, ListOptions};
use super::models::{Client, ListOptions, Status};
use crate::api::{BadQuery, API};
pub async fn list(opts: ListOptions, srv: Client) -> Result<impl warp::Reply, warp::Rejection> {
pub async fn list(
opts: ListOptions,
client: Client,
) -> Result<impl warp::Reply, warp::Rejection> {
// match srv.get_threshold_detections(opts.from, opts.to, 10, 100_000.).await {
match srv.get_pattern_detection(opts.from, opts.to).await {
match client.get_pattern_detection(opts.from, opts.to).await {
Ok(segments) => Ok(API::json(&segments)),
Err(e) => {
println!("{:?}", e);
@ -47,13 +59,20 @@ mod handlers {
}
}
}
pub async fn status(client: Client) -> Result<impl warp::Reply, warp::Rejection> {
match client.get_status().await {
Ok(ls) => Ok(API::json(&Status { status: ls })),
Err(e) => {
println!("{:?}", e);
Err(warp::reject::custom(BadQuery))
}
}
}
}
mod models {
use std::sync::Arc;
use hastic::services::analytic_service;
use hastic::services::analytic_service::{self, types::LearningStatus};
use serde::{Deserialize, Serialize};
pub type Client = analytic_service::analytic_client::AnalyticClient;
@ -64,4 +83,9 @@ mod models {
pub from: u64,
pub to: u64,
}
#[derive(Debug, Serialize)]
pub struct Status {
pub status: LearningStatus,
}
}

7
server/src/api/mod.rs

@ -32,7 +32,7 @@ pub struct API<'a> {
user_service: Arc<RwLock<user_service::UserService>>,
metric_service: metric_service::MetricService,
data_service: segments_service::SegmentsService,
// TODO: get analytic service as reference
// TODO: get analytic service as reference and create it in main
analytic_service: AnalyticService,
}
@ -81,7 +81,10 @@ impl API<'_> {
});
let metrics = metric::get_route(self.metric_service.clone());
let login = auth::get_route(self.user_service.clone());
let segments = segments::filters::filters(self.data_service.clone());
let segments = segments::filters::filters(
self.data_service.clone(),
self.analytic_service.get_client(),
);
let analytics = analytics::filters::filters(self.analytic_service.get_client());
let public = warp::fs::dir("public");

13
server/src/api/segments.rs

@ -1,14 +1,16 @@
pub mod filters {
use super::handlers;
use super::models::{Db, ListOptions};
use hastic::services::analytic_service::analytic_client::AnalyticClient;
use warp::Filter;
/// The 4 REST API filters combined.
pub fn filters(
db: Db,
ac: AnalyticClient,
) -> impl Filter<Extract = impl warp::Reply, Error = warp::Rejection> + Clone {
list(db.clone())
.or(create(db.clone()))
.or(create(db.clone(), ac))
// .or(update(db.clone()))
.or(delete(db.clone()))
}
@ -27,11 +29,13 @@ pub mod filters {
/// POST /segments with JSON body
pub fn create(
db: Db,
ac: AnalyticClient,
) -> impl Filter<Extract = impl warp::Reply, Error = warp::Rejection> + Clone {
warp::path!("segments")
.and(warp::post())
.and(warp::body::json())
.and(with_db(db))
.and(warp::any().map(move || ac.clone()))
.and_then(handlers::create)
}
@ -52,6 +56,7 @@ pub mod filters {
}
mod handlers {
use hastic::services::analytic_service::analytic_client::AnalyticClient;
use hastic::services::segments_service;
use super::models::{Db, ListOptions};
@ -70,9 +75,13 @@ mod handlers {
pub async fn create(
segment: segments_service::Segment,
db: Db,
ac: AnalyticClient,
) -> Result<impl warp::Reply, warp::Rejection> {
match db.insert_segment(&segment) {
Ok(segment) => Ok(API::json(&segment)),
Ok(segment) => {
ac.run_learning().await.unwrap();
Ok(API::json(&segment))
}
Err(e) => {
println!("{:?}", e);
// TODO: return proper http error

18
server/src/services/analytic_service/analytic_client.rs

@ -1,12 +1,12 @@
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use crate::services::segments_service::Segment;
use super::types::LearningStatus;
use super::types::{AnalyticServiceMessage, RequestType};
/// CLient to be used multithreaded
///
///
/// Client to be used multithreaded
#[derive(Clone)]
pub struct AnalyticClient {
tx: mpsc::Sender<AnalyticServiceMessage>,
@ -16,11 +16,21 @@ impl AnalyticClient {
pub fn new(tx: mpsc::Sender<AnalyticServiceMessage>) -> AnalyticClient {
AnalyticClient { tx }
}
pub async fn run_learning(&self) -> anyhow::Result<()> {
self.tx.send(AnalyticServiceMessage::Request(RequestType::RunLearning)).await?;
self.tx
.send(AnalyticServiceMessage::Request(RequestType::RunLearning))
.await?;
Ok(())
}
pub async fn get_status(&self) -> anyhow::Result<LearningStatus> {
let (tx, rx) = oneshot::channel();
let req = AnalyticServiceMessage::Request(RequestType::GetStatus(tx));
let r = rx.await?;
Ok(r)
}
pub async fn get_pattern_detection(&self, from: u64, to: u64) -> anyhow::Result<Vec<Segment>> {
return Ok(Vec::new());
}

74
server/src/services/analytic_service/analytic_service.rs

@ -1,4 +1,8 @@
use super::{analytic_client::AnalyticClient, pattern_detector::{self, LearningResults, PatternDetector}, types::{AnalyticServiceMessage, RequestType, ResponseType}};
use super::{
analytic_client::AnalyticClient,
pattern_detector::{self, LearningResults, PatternDetector},
types::{AnalyticServiceMessage, LearningStatus, RequestType, ResponseType},
};
use crate::services::{
metric_service::MetricService,
@ -10,26 +14,16 @@ use subbeat::metric::Metric;
use anyhow;
use tokio::sync::{mpsc};
use tokio::sync::mpsc;
use tokio::time::{sleep, Duration};
use futures::future;
use super::types;
const DETECTION_STEP: u64 = 10;
const LEARNING_WAITING_INTERVAL: u64 = 100;
#[derive(Clone, PartialEq)]
enum LearningStatus {
Initialization,
Starting,
Learning,
Error,
Ready,
}
// TODO: now it's basically single analytic unit, service will opreate many AU
pub struct AnalyticService {
metric_service: MetricService,
@ -62,16 +56,22 @@ impl AnalyticService {
AnalyticClient::new(self.tx.clone())
}
fn consume_request(&mut self, req: types::RequestType) {
fn consume_request(&mut self, req: types::RequestType) -> () {
match req {
RequestType::RunLearning => tokio::spawn({
self.learning_status = LearningStatus::Starting;
let tx = self.tx.clone();
let ms = self.metric_service.clone();
let ss = self.segments_service.clone();
async move {
AnalyticService::run_learning(tx, ms, ss)
}})
RequestType::RunLearning => {
tokio::spawn({
self.learning_status = LearningStatus::Starting;
let tx = self.tx.clone();
let ms = self.metric_service.clone();
let ss = self.segments_service.clone();
async move {
AnalyticService::run_learning(tx, ms, ss).await;
}
});
}
RequestType::GetStatus(tx) => {
tx.send(self.learning_status.clone()).unwrap();
}
};
}
@ -79,9 +79,7 @@ impl AnalyticService {
match res {
// TODO: handle when learning panic
ResponseType::LearningStarted => self.learning_status = LearningStatus::Learning,
ResponseType::LearningFinished(results) => {
self.learning_results = Some(results)
}
ResponseType::LearningFinished(results) => self.learning_results = Some(results),
}
}
@ -89,17 +87,25 @@ impl AnalyticService {
while let Some(message) = self.rx.recv().await {
match message {
AnalyticServiceMessage::Request(req) => self.consume_request(req),
AnalyticServiceMessage::Response(res) => self.consume_response(res)
AnalyticServiceMessage::Response(res) => self.consume_response(res),
}
}
}
// call this from api
async fn run_learning(tx: mpsc::Sender<AnalyticServiceMessage>, ms: MetricService, ss : SegmentsService) {
match tx.send(AnalyticServiceMessage::Response(ResponseType::LearningStarted)).await {
async fn run_learning(
tx: mpsc::Sender<AnalyticServiceMessage>,
ms: MetricService,
ss: SegmentsService,
) {
match tx
.send(AnalyticServiceMessage::Response(
ResponseType::LearningStarted,
))
.await
{
Ok(_) => println!("Learning starting"),
Err(_e) => println!("Fail to send notification about learning start")
Err(_e) => println!("Fail to send notification about learning start"),
}
let prom = ms.get_prom();
@ -130,11 +136,15 @@ impl AnalyticService {
let lr = PatternDetector::learn(&learn_tss).await;
match tx.send(AnalyticServiceMessage::Response(ResponseType::LearningFinished(lr))).await {
match tx
.send(AnalyticServiceMessage::Response(
ResponseType::LearningFinished(lr),
))
.await
{
Ok(_) => println!("Learning resuls sent"),
Err(_e) => println!("Fail to send learning results")
Err(_e) => println!("Fail to send learning results"),
}
}
async fn get_pattern_detection(&self, from: u64, to: u64) -> anyhow::Result<Vec<Segment>> {

2
server/src/services/analytic_service/mod.rs

@ -1,6 +1,6 @@
mod analytic_service;
mod pattern_detector;
mod types;
pub mod types;
pub mod analytic_client;

19
server/src/services/analytic_service/types.rs

@ -1,20 +1,31 @@
use super::pattern_detector::LearningResults;
use serde::Serialize;
use tokio::sync::oneshot;
#[derive(Debug, Clone, PartialEq, Serialize)]
pub enum LearningStatus {
Initialization,
Starting,
Learning,
Error,
Ready,
}
#[derive(Debug)]
pub enum ResponseType {
LearningStarted,
LearningFinished(LearningResults)
LearningFinished(LearningResults),
}
#[derive(Debug)]
pub enum RequestType {
RunLearning
RunLearning,
GetStatus(oneshot::Sender<LearningStatus>),
}
#[derive(Debug)]
pub enum AnalyticServiceMessage {
// Status,
Request(RequestType),
Response(ResponseType)
// Detect { from: u64, to: u64 },
Response(ResponseType), // Detect { from: u64, to: u64 },
}

Loading…
Cancel
Save