Browse Source

webhook continue

pull/25/head
Alexey Velikiy 3 years ago
parent
commit
aba6034441
  1. 5
      server/Cargo.lock
  2. 3
      server/Cargo.toml
  3. 3
      server/config.example.toml
  4. 2
      server/src/api/mod.rs
  5. 17
      server/src/config.rs
  6. 10
      server/src/main.rs
  7. 31
      server/src/services/analytic_service/analytic_service.rs
  8. 4
      server/src/services/analytic_service/types.rs

5
server/Cargo.lock generated

@ -458,6 +458,7 @@ name = "hastic"
version = "0.0.1"
dependencies = [
"anyhow",
"chrono",
"config",
"fastrand",
"futures",
@ -1321,9 +1322,9 @@ checksum = "8ea5119cdb4c55b55d432abb513a0429384878c15dde60cc77b1c99de1a95a6a"
[[package]]
name = "subbeat"
version = "0.0.13"
version = "0.0.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e935262d66772027700000fb1d8c81bb4bc7db8ef884880ce658686406e5416b"
checksum = "6c9f8b9c73d37420622f933fabdf8a783a98fd24804b7def4c2fc140820105d8"
dependencies = [
"anyhow",
"async-trait",

3
server/Cargo.toml

@ -13,10 +13,11 @@ parking_lot = "0.11.2"
serde = { version = "1.0", features = ["derive"] }
fastrand = "1.5.0"
# subbeat = { path = "../../../subbeat/subbeat/" }
subbeat = "0.0.13"
subbeat = "0.0.14"
config = "0.11.0"
openssl = { version = "=0.10.33", features = ["vendored"] }
rusqlite = "0.26.1"
# https://github.com/rusqlite/rusqlite/issues/914
libsqlite3-sys = { version = "*", features = ["bundled"] }
futures = "0.3.17"
chrono = "0.4.19"

3
server/config.example.toml

@ -16,3 +16,6 @@ query = "rate(go_memstats_alloc_bytes_total[5m])"
# |> yield(name: "mean")
# """
[webhook]
endpoint = "http://localhost:8086"

2
server/src/api/mod.rs

@ -96,7 +96,7 @@ impl API<'_> {
.or(not_found);
warp::serve(routes)
.run(([127, 0, 0, 1], self.config.port))
.run(([0, 0, 0, 0], self.config.port))
.await;
}
}

17
server/src/config.rs

@ -3,6 +3,17 @@ use subbeat::types::{DatasourceConfig, InfluxConfig, PrometheusConfig};
pub struct Config {
pub port: u16,
pub datasource_config: DatasourceConfig,
pub endpoint: Option<String>,
}
impl Clone for Config {
fn clone(&self) -> Self {
return Config {
port: self.port,
datasource_config: self.datasource_config.clone(),
endpoint: self.endpoint.clone(),
};
}
}
fn resolve_datasource(config: &mut config::Config) -> anyhow::Result<DatasourceConfig> {
@ -41,9 +52,15 @@ impl Config {
config.set("port", "8000").unwrap();
}
let mut endpoint = None;
if config.get::<String>("webhook.endpoint").is_ok() {
endpoint = Some(config.get("webhook.endpoint").unwrap());
}
Ok(Config {
port: config.get::<u16>("port").unwrap(),
datasource_config: resolve_datasource(&mut config)?,
endpoint,
})
}
}

10
server/src/main.rs

@ -7,15 +7,19 @@ use anyhow;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let config = hastic::config::Config::new()?;
let cfg_clone = config.clone();
let metric_service = metric_service::MetricService::new(&config.datasource_config);
let segments_service = segments_service::SegmentsService::new()?;
let mut analytic_service =
analytic_service::AnalyticService::new(metric_service.clone(), segments_service.clone());
let mut analytic_service = analytic_service::AnalyticService::new(
metric_service.clone(),
segments_service.clone(),
config.endpoint,
);
let api = api::API::new(
&config,
&cfg_clone,
metric_service.clone(),
segments_service.clone(),
analytic_service.get_client(),

31
server/src/services/analytic_service/analytic_service.rs

@ -17,6 +17,8 @@ use tokio::sync::{mpsc, oneshot};
use futures::future;
use chrono::Utc;
// TODO: get this from pattern detector
const DETECTION_STEP: u64 = 10;
@ -29,17 +31,23 @@ pub struct AnalyticService {
tx: mpsc::Sender<AnalyticServiceMessage>,
rx: mpsc::Receiver<AnalyticServiceMessage>,
endpoint: Option<String>,
// handlers
learning_handler: Option<tokio::task::JoinHandle<()>>,
// awaiters
learning_waiters: Vec<DetectionTask>,
// runner
runner_handler: Option<tokio::task::JoinHandle<()>>,
}
impl AnalyticService {
pub fn new(
metric_service: MetricService,
segments_service: segments_service::SegmentsService,
endpoint: Option<String>,
) -> AnalyticService {
let (tx, rx) = mpsc::channel::<AnalyticServiceMessage>(32);
@ -52,11 +60,15 @@ impl AnalyticService {
tx,
rx,
endpoint,
// handlers
learning_handler: None,
// awaiters
learning_waiters: Vec::new(),
runner_handler: None,
}
}
@ -76,15 +88,18 @@ impl AnalyticService {
});
}
fn run_detection_runner(&self, task: DetectionRunnerConfig) {
fn run_detection_runner(&mut self, task: DetectionRunnerConfig) {
if self.runner_handler.is_some() {
self.runner_handler.as_mut().unwrap().abort();
}
// TODO: save handler of the task
tokio::spawn({
self.runner_handler = Some(tokio::spawn({
let lr = self.learning_results.as_ref().unwrap().clone();
let ms = self.metric_service.clone();
async move {
// TODO: implement
}
});
}));
}
fn consume_request(&mut self, req: types::RequestType) -> () {
@ -144,6 +159,14 @@ impl AnalyticService {
let task = self.learning_waiters.pop().unwrap();
self.run_detection_task(task);
}
// TODO: fix this
if self.endpoint.is_some() {
self.run_detection_runner(DetectionRunnerConfig {
endpoint: self.endpoint.as_ref().unwrap().clone(),
from: Utc::now().timestamp() as u64,
});
}
}
ResponseType::LearningFinishedEmpty => {
// TODO: drop all learning_waiters with empty results

4
server/src/services/analytic_service/types.rs

@ -32,9 +32,9 @@ pub struct DetectionTask {
#[derive(Debug)]
pub struct DetectionRunnerConfig {
pub sender: mpsc::Sender<Result<Vec<Segment>>>,
// pub sender: mpsc::Sender<Result<Vec<Segment>>>,
pub endpoint: String,
pub from: u64
pub from: u64,
}
#[derive(Debug)]

Loading…
Cancel
Save