Compare commits

..

1 Commits

Author SHA1 Message Date
Alexey Velikiy f59633344c rm console log 2 years ago
  1. 1
      client/src/components/pods/pattern_pod.ts
  2. 1
      docker-compose.yml
  3. 17
      server/config.example.toml
  4. 27
      server/src/config.rs
  5. 7
      server/src/main.rs
  6. 15
      server/src/services/analytic_service/analytic_service.rs
  7. 10
      server/src/services/analytic_service/detection_runner.rs
  8. 1
      server/src/services/analytic_service/types.rs
  9. 15
      server/src/services/analytic_unit_service.rs
  10. 23
      server/src/services/data_service.rs
  11. 1
      server/src/services/mod.rs
  12. 16
      server/src/services/segments_service.rs

1
client/src/components/pods/pattern_pod.ts

@ -79,6 +79,7 @@ export class PatternPod extends HasticPod<UpdateDataCallback> {
} else { } else {
console.log('took from range from default'); console.log('took from range from default');
} }
console.log(from + " ---- " + to);
this.udc({ from, to }) this.udc({ from, to })
.then(resp => { .then(resp => {

1
docker-compose.yml

@ -3,6 +3,7 @@ version: '3'
services: services:
app: app:
image: hastic/hastic:latest image: hastic/hastic:latest
network_mode: host
restart: always restart: always
environment: environment:
HASTIC_PORT: "4347" HASTIC_PORT: "4347"

17
server/config.example.toml

@ -1,10 +1,8 @@
port = 4347 port = 4347
# one of datasource sections (prometheus / influx) should be uncommented and edited corresponding to your environment [prometheus]
url = "http://localhost:9090"
# [prometheus] query = "rate(go_memstats_alloc_bytes_total[5m])"
# url = "http://localhost:9090"
# query = "rate(go_memstats_alloc_bytes_total[5m])"
# [influx] # [influx]
@ -18,7 +16,8 @@ port = 4347
# |> yield(name: "mean") # |> yield(name: "mean")
# """ # """
# [alerting] [alerting]
# type = "webhook" type = "webhook"
# interval = 10 # in seconds interval = 10 # in seconds
# endpoint = "http://localhost:9092" endpoint = "http://localhost:9092"

27
server/src/config.rs

@ -27,7 +27,6 @@ pub struct Config {
fn resolve_datasource(config: &config::Config) -> anyhow::Result<DatasourceConfig> { fn resolve_datasource(config: &config::Config) -> anyhow::Result<DatasourceConfig> {
if config.get::<String>("prometheus.url").is_ok() { if config.get::<String>("prometheus.url").is_ok() {
println!("using Prometheus");
return Ok(DatasourceConfig::Prometheus(PrometheusConfig { return Ok(DatasourceConfig::Prometheus(PrometheusConfig {
url: config.get("prometheus.url")?, url: config.get("prometheus.url")?,
query: config.get("prometheus.query")?, query: config.get("prometheus.query")?,
@ -35,7 +34,6 @@ fn resolve_datasource(config: &config::Config) -> anyhow::Result<DatasourceConfi
} }
if config.get::<String>("influx.url").is_ok() { if config.get::<String>("influx.url").is_ok() {
println!("using Influx");
return Ok(DatasourceConfig::Influx(InfluxConfig { return Ok(DatasourceConfig::Influx(InfluxConfig {
url: config.get("influx.url")?, url: config.get("influx.url")?,
org_id: config.get("influx.org_id")?, org_id: config.get("influx.org_id")?,
@ -44,7 +42,7 @@ fn resolve_datasource(config: &config::Config) -> anyhow::Result<DatasourceConfi
})); }));
} }
return Err(anyhow::format_err!("please configure a datasource")); return Err(anyhow::format_err!("no datasource found"));
} }
fn resolve_alerting(config: &config::Config) -> anyhow::Result<Option<AlertingConfig>> { fn resolve_alerting(config: &config::Config) -> anyhow::Result<Option<AlertingConfig>> {
@ -82,38 +80,24 @@ fn resolve_alerting(config: &config::Config) -> anyhow::Result<Option<AlertingCo
// config::Environment doesn't support nested configs, e.g. `alerting.type`, // config::Environment doesn't support nested configs, e.g. `alerting.type`,
// so I've copied this from: // so I've copied this from:
// https://github.com/rust-lang/mdBook/blob/f3e5fce6bf5e290c713f4015947dc0f0ad172d20/src/config.rs#L132 // https://github.com/rust-lang/mdBook/blob/f3e5fce6bf5e290c713f4015947dc0f0ad172d20/src/config.rs#L132
// so that `__` can be used in env variables instead of `.`, // so that `__` can be used in env variables instead of `.`,
// e.g. `HASTIC_ALERTING__TYPE` -> alerting.type // e.g. `HASTIC_ALERTING__TYPE` -> alerting.type
pub fn update_from_env(config: &mut config::Config) { pub fn update_from_env(config: &mut config::Config) {
let overrides = let overrides =
env::vars().filter_map(|(key, value)| parse_env(&key).map(|index| (index, value))); env::vars().filter_map(|(key, value)| parse_env(&key).map(|index| (index, value)));
for (key, value) in overrides { for (key, value) in overrides {
println!("{} => {}", key, value);
config.set(&key, value).unwrap(); config.set(&key, value).unwrap();
} }
} }
pub fn print_config(config: config::Config) {
// TODO: support any nesting level
let sections = config.to_owned().cache.into_table().unwrap();
for (section_name, values) in sections {
match values.clone().into_table() {
Err(_) => println!("{} => {}", section_name, values),
Ok(section) => {
for (key, value) in section {
println!("{}.{} => {}", section_name, key, value);
}
}
}
}
}
fn parse_env(key: &str) -> Option<String> { fn parse_env(key: &str) -> Option<String> {
const PREFIX: &str = "HASTIC_"; const PREFIX: &str = "HASTIC_";
if key.starts_with(PREFIX) { if key.starts_with(PREFIX) {
let key = &key[PREFIX.len()..]; let key = &key[PREFIX.len()..];
Some(key.to_lowercase().replace("__", ".")) Some(key.to_lowercase().replace("__", ".").replace("_", "-"))
} else { } else {
None None
} }
@ -133,8 +117,7 @@ impl Config {
config.set("port", 4347).unwrap(); config.set("port", 4347).unwrap();
} }
print_config(config.clone()); // TODO: print resulted config (perfectly, it needs adding `derive(Debug)` in `subbeat`'s `DatasourceConfig`)
Ok(Config { Ok(Config {
port: config.get::<u16>("port").unwrap(), port: config.get::<u16>("port").unwrap(),
datasource_config: resolve_datasource(&config)?, datasource_config: resolve_datasource(&config)?,

7
server/src/main.rs

@ -1,6 +1,6 @@
mod api; mod api;
use hastic::services::{analytic_service, analytic_unit_service, metric_service, segments_service, data_service}; use hastic::services::{analytic_service, analytic_unit_service, metric_service, segments_service};
use anyhow; use anyhow;
@ -9,10 +9,9 @@ async fn main() -> anyhow::Result<()> {
let config = hastic::config::Config::new()?; let config = hastic::config::Config::new()?;
let cfg_clone = config.clone(); let cfg_clone = config.clone();
let data_service = data_service::DataService::new()?; let analytic_unit_service = analytic_unit_service::AnalyticUnitService::new()?;
let analytic_unit_service = analytic_unit_service::AnalyticUnitService::new(&data_service)?;
let metric_service = metric_service::MetricService::new(&config.datasource_config); let metric_service = metric_service::MetricService::new(&config.datasource_config);
let segments_service = segments_service::SegmentsService::new(&data_service)?; let segments_service = segments_service::SegmentsService::new()?;
let mut analytic_service = analytic_service::AnalyticService::new( let mut analytic_service = analytic_service::AnalyticService::new(
analytic_unit_service.clone(), analytic_unit_service.clone(),

15
server/src/services/analytic_service/analytic_service.rs

@ -245,9 +245,6 @@ impl AnalyticService {
.set_last_detection(id, timestamp) .set_last_detection(id, timestamp)
.unwrap(); .unwrap();
} }
ResponseType::DetectionRunnerDetection(from, to) => {
println!("detection: {} {}", from, to);
}
ResponseType::LearningStarted => { ResponseType::LearningStarted => {
self.analytic_unit_learning_status = LearningStatus::Learning self.analytic_unit_learning_status = LearningStatus::Learning
} }
@ -302,9 +299,8 @@ impl AnalyticService {
self.consume_request(RequestType::RunLearning); self.consume_request(RequestType::RunLearning);
match tx.send(()) { match tx.send(()) {
Ok(_) => {} Ok(_) => {}
Err(e) => { Err(_e) => {
println!("Can`t send patch config notification"); println!("Can`t send patch config notification");
println!("{:?}", e);
} }
} }
return; return;
@ -316,9 +312,8 @@ impl AnalyticService {
au.unwrap().write().await.set_config(cfg); au.unwrap().write().await.set_config(cfg);
match tx.send(()) { match tx.send(()) {
Ok(_) => {} Ok(_) => {}
Err(e) => { Err(_e) => {
println!("Can`t send patch config notification"); println!("Can`t send patch config notification");
println!("{:?}", e);
} }
} }
} }
@ -328,9 +323,8 @@ impl AnalyticService {
// TODO: check if we need this else // TODO: check if we need this else
match tx.send(()) { match tx.send(()) {
Ok(_) => {} Ok(_) => {}
Err(e) => { Err(_e) => {
println!("Can`t send patch config notification"); println!("Can`t send patch config notification");
println!("{:?}", e);
} }
} }
} }
@ -343,9 +337,8 @@ impl AnalyticService {
self.consume_request(RequestType::RunLearning); self.consume_request(RequestType::RunLearning);
match tx.send(()) { match tx.send(()) {
Ok(_) => {} Ok(_) => {}
Err(e) => { Err(_e) => {
println!("Can`t send patch config notification"); println!("Can`t send patch config notification");
println!("{:?}", e);
} }
} }
} }

10
server/src/services/analytic_service/detection_runner.rs

@ -69,15 +69,7 @@ impl DetectionRunner {
let detections = a.detect(ms.clone(), t_from, t_to).await.unwrap(); let detections = a.detect(ms.clone(), t_from, t_to).await.unwrap();
for d in detections { for d in detections {
match tx println!("detection: {} {}", d.0, d.1);
.send(AnalyticServiceMessage::Response(Ok(
ResponseType::DetectionRunnerDetection(d.0, d.1),
)))
.await
{
Ok(_) => {}
Err(_e) => println!("Fail to send detection runner detection notification"),
}
} }
// TODO: send info about detections to tx // TODO: send info about detections to tx

1
server/src/services/analytic_service/types.rs

@ -45,7 +45,6 @@ impl Default for LearningTrain {
pub enum ResponseType { pub enum ResponseType {
DetectionRunnerStarted(u64), DetectionRunnerStarted(u64),
DetectionRunnerUpdate(String, u64), // analytic_unit id and timestamp DetectionRunnerUpdate(String, u64), // analytic_unit id and timestamp
DetectionRunnerDetection(u64, u64), // TODO: add more into about analytic unit and more
LearningStarted, LearningStarted,
LearningFinished(Box<dyn AnalyticUnit + Send + Sync>), LearningFinished(Box<dyn AnalyticUnit + Send + Sync>),
LearningFinishedEmpty, LearningFinishedEmpty,

15
server/src/services/analytic_unit_service.rs

@ -1,3 +1,5 @@
use serde::{Deserialize, Serialize};
use serde_json::{Result, Value};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use rusqlite::{params, Connection}; use rusqlite::{params, Connection};
@ -9,20 +11,19 @@ use super::analytic_service::analytic_unit::{
types::{self, AnalyticUnitConfig}, types::{self, AnalyticUnitConfig},
}; };
use super::data_service::DataService;
#[derive(Clone)] #[derive(Clone)]
pub struct AnalyticUnitService { pub struct AnalyticUnitService {
connection: Arc<Mutex<Connection>>, connection: Arc<Mutex<Connection>>,
} }
// TODO: get DataService
impl AnalyticUnitService { impl AnalyticUnitService {
pub fn new(ds: &DataService) -> anyhow::Result<AnalyticUnitService> { pub fn new() -> anyhow::Result<AnalyticUnitService> {
let conn = ds.analytic_units_connection.clone(); // TODO: remove repetitoin with segment_service
std::fs::create_dir_all("./data").unwrap();
let conn = Connection::open("./data/analytic_units.db")?;
// TODO: add learning results field // TODO: add learning results field
conn.lock().unwrap().execute( conn.execute(
"CREATE TABLE IF NOT EXISTS analytic_unit ( "CREATE TABLE IF NOT EXISTS analytic_unit (
id TEXT PRIMARY KEY, id TEXT PRIMARY KEY,
last_detection INTEGER, last_detection INTEGER,
@ -34,7 +35,7 @@ impl AnalyticUnitService {
)?; )?;
Ok(AnalyticUnitService { Ok(AnalyticUnitService {
connection: conn connection: Arc::new(Mutex::new(conn)),
}) })
} }

23
server/src/services/data_service.rs

@ -1,23 +0,0 @@
use std::sync::{Arc, Mutex};
use rusqlite::{Connection};
pub struct DataService {
pub analytic_units_connection: Arc<Mutex<Connection>>,
pub segments_connection: Arc<Mutex<Connection>>
}
impl DataService {
pub fn new() -> anyhow::Result<DataService> {
std::fs::create_dir_all("./data").unwrap();
let analytic_units_connection = Connection::open("./data/analytic_units.db")?;
let segments_connection = Connection::open("./data/segments.db")?;
Ok(DataService {
analytic_units_connection: Arc::new(Mutex::new(analytic_units_connection)),
segments_connection: Arc::new(Mutex::new(segments_connection))
})
}
}

1
server/src/services/mod.rs

@ -1,5 +1,4 @@
pub mod analytic_service; pub mod analytic_service;
pub mod data_service;
pub mod analytic_unit_service; pub mod analytic_unit_service;
pub mod metric_service; pub mod metric_service;
pub mod segments_service; pub mod segments_service;

16
server/src/services/segments_service.rs

@ -6,13 +6,9 @@ use serde::{Deserialize, Serialize};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use super::data_service::DataService;
pub const ID_LENGTH: usize = 20; pub const ID_LENGTH: usize = 20;
pub type SegmentId = String; pub type SegmentId = String;
// TODO: make logic with this enum shorter // TODO: make logic with this enum shorter
#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq)] #[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq)]
pub enum SegmentType { pub enum SegmentType {
@ -62,19 +58,19 @@ impl Segment {
} }
} }
// TODO: get DataService
#[derive(Clone)] #[derive(Clone)]
pub struct SegmentsService { pub struct SegmentsService {
connection: Arc<Mutex<Connection>>, connection: Arc<Mutex<Connection>>,
} }
impl SegmentsService { impl SegmentsService {
pub fn new(ds: &DataService) -> anyhow::Result<SegmentsService> { pub fn new() -> anyhow::Result<SegmentsService> {
// TODO: move it to data service
std::fs::create_dir_all("./data").unwrap();
// TODO: add unilytic_unit id as a new field // TODO: add unilytic_unit id as a new field
let conn = ds.segments_connection.clone(); let conn = Connection::open("./data/segments.db")?;
conn.lock().unwrap().execute( conn.execute(
"CREATE TABLE IF NOT EXISTS segment ( "CREATE TABLE IF NOT EXISTS segment (
id TEXT PRIMARY KEY, id TEXT PRIMARY KEY,
start INTEGER NOT NULL, start INTEGER NOT NULL,
@ -85,7 +81,7 @@ impl SegmentsService {
)?; )?;
Ok(SegmentsService { Ok(SegmentsService {
connection: conn connection: Arc::new(Mutex::new(conn)),
}) })
} }

Loading…
Cancel
Save