diff --git a/.env b/.env index a6f559f..8bfdddf 100644 --- a/.env +++ b/.env @@ -1 +1,4 @@ ZMQ_CONNECTION_STRING=tcp://analytics:8002 +HASTIC_API_KEY=eyJrIjoiVDRUTUlKSjJ5N3dYTDdsd1JyWWRBNHFkb0VSeDBNTTYiLCJuIjoiaGFzdGljLXNlcnZlciIsImlkIjoxfQ== +HASTIC_WEBHOOK_URL=http://localhost:8888 +HASTIC_WEBHOOK_TYPE=application/json diff --git a/.gitignore b/.gitignore index b6debd5..8ad8a4c 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,7 @@ data dist config.json +.env node_modules/ diff --git a/analytics/analytics/analytic_unit_manager.py b/analytics/analytics/analytic_unit_manager.py index b2489ae..4f88a42 100644 --- a/analytics/analytics/analytic_unit_manager.py +++ b/analytics/analytics/analytic_unit_manager.py @@ -1,7 +1,8 @@ from typing import Dict import pandas as pd import numpy as np -import logging, traceback +import logging as log +import traceback from concurrent.futures import Executor, ThreadPoolExecutor import detectors @@ -9,7 +10,7 @@ from analytic_unit_worker import AnalyticUnitWorker from models import ModelCache -logger = logging.getLogger('AnalyticUnitManager') +logger = log.getLogger('AnalyticUnitManager') WORKERS_EXECUTORS = 20 AnalyticUnitId = str @@ -73,7 +74,8 @@ class AnalyticUnitManager: if task['type'] == 'PUSH': # TODO: do it a better way res = await worker.recieve_data(data, payload['cache']) - return res.update({ 'analyticUnitId': analytic_unit_id }) + res.update({ 'analyticUnitId': analytic_unit_id }) + return res elif task['type'] == 'LEARN': if 'segments' in payload: return await worker.do_train(payload['segments'], data, payload['cache']) @@ -89,13 +91,13 @@ class AnalyticUnitManager: async def handle_analytic_task(self, task): try: result_payload = await self.__handle_analytic_task(task) - return { + result_message = { 'status': 'SUCCESS', 'payload': result_payload } + return result_message except Exception as e: error_text = traceback.format_exc() - logger.error("handle_analytic_task exception: '%s'" % error_text) # TODO: move result to a class which renders to json for messaging to analytics return { 'status': 'FAILED', diff --git a/analytics/analytics/buckets/data_bucket.py b/analytics/analytics/buckets/data_bucket.py index 4dd79a1..ee791d1 100644 --- a/analytics/analytics/buckets/data_bucket.py +++ b/analytics/analytics/buckets/data_bucket.py @@ -6,6 +6,7 @@ class DataBucket(object): def __init__(self): self.data = pd.DataFrame([], columns=['timestamp', 'value']) + def receive_data(self, data: pd.DataFrame): self.data = self.data.append(data, ignore_index=True) diff --git a/analytics/analytics/detectors/threshold_detector.py b/analytics/analytics/detectors/threshold_detector.py index 38c2750..f96299d 100644 --- a/analytics/analytics/detectors/threshold_detector.py +++ b/analytics/analytics/detectors/threshold_detector.py @@ -1,13 +1,14 @@ -import logging +import logging as log import pandas as pd from typing import Optional from detectors import Detector from models import ModelCache +from time import time -logger = logging.getLogger('THRESHOLD_DETECTOR') +logger = log.getLogger('THRESHOLD_DETECTOR') class ThresholdDetector(Detector): @@ -16,6 +17,7 @@ class ThresholdDetector(Detector): pass def train(self, dataframe: pd.DataFrame, threshold: dict, cache: Optional[ModelCache]) -> ModelCache: + log.debug('run train for threshold detector') return { 'cache': { 'value': threshold['value'], @@ -24,6 +26,7 @@ class ThresholdDetector(Detector): } def detect(self, dataframe: pd.DataFrame, cache: Optional[ModelCache]) -> dict: + log.debug('run detect for threshold detector') value = cache['value'] condition = cache['condition'] @@ -32,10 +35,9 @@ class ThresholdDetector(Detector): return dict() last_entry = dataframe_without_nans.iloc[-1] last_value = last_entry['value'] - # TODO: convert from nanoseconds to millisecond in a better way: not by dividing by 10^6 - last_time = last_entry['timestamp'].value / 1000000 - segment = ({ 'from': last_time, 'to': last_time }) + now = int(time()) * 1000 + segment = ({ 'from': now, 'to': now }) segments = [] if condition == '>': if last_value > value: @@ -52,12 +54,13 @@ class ThresholdDetector(Detector): elif condition == '<': if last_value < value: segments.append(segment) - + log.debug('seg {}'.format(segments)) return { 'cache': cache, 'segments': segments, - 'lastDetectionTime': last_time + 'lastDetectionTime': now } def recieve_data(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> Optional[dict]: + log.debug('threshhold recieve data') return self.detect(data, cache) diff --git a/docker-compose.yml b/docker-compose.yml index 17b6caa..225b0a8 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -12,10 +12,12 @@ services: - 8000:8000 volumes: - data-volume:/var/www/data + restart: always analytics: image: hastic/analytics:latest build: analytics + restart: always volumes: data-volume: diff --git a/server/src/controllers/analytics_controller.ts b/server/src/controllers/analytics_controller.ts index b3ad076..b437d64 100644 --- a/server/src/controllers/analytics_controller.ts +++ b/server/src/controllers/analytics_controller.ts @@ -302,8 +302,10 @@ async function processDetectionResult(analyticUnitId: AnalyticUnit.AnalyticUnitI try { sendWebhook(analyticUnit.name, _.last(segments)); } catch(err) { - console.error(`Error while sending webhook: ${err.message}`); + console.error(`error while sending webhook: ${err.message}`); } + } else { + console.debug(`skip sending webhook for ${analyticUnitId}`); } return { lastDetectionTime: detectionResult.lastDetectionTime, diff --git a/server/src/services/data_puller.ts b/server/src/services/data_puller.ts index 70745ea..035051c 100644 --- a/server/src/services/data_puller.ts +++ b/server/src/services/data_puller.ts @@ -43,7 +43,10 @@ export class DataPuller { panelUrl = unit.panelUrl; } - return queryByMetric(unit.metric, panelUrl, from, to, HASTIC_API_KEY); + let startTime = Date.now(); + let data = queryByMetric(unit.metric, panelUrl, from, to, HASTIC_API_KEY); + console.log(`data puller: query took ${Date.now() - startTime}ms for unit id ${unit.id}`); + return data; } @@ -55,6 +58,7 @@ export class DataPuller { try { this.analyticsService.sendTask(task); + console.log(`data puller successfuly pushed data for unit id: ${unit.id}`); } catch(e) { console.log(`data puller got error while push data ${e.message}`); } @@ -70,15 +74,16 @@ export class DataPuller { this._runAnalyticUnitPuller(analyticUnit); }); - console.log('Data puller started'); + console.log('data puller started'); } public stopPuller() { this._unitTimes = {}; - console.log('Data puller stopped'); + console.log('data puller stopped'); } private async _runAnalyticUnitPuller(analyticUnit: AnalyticUnit.AnalyticUnit) { + console.debug(`run data puller for analytic unit ${analyticUnit.id}`); // TODO: lastDetectionTime can be in ns const time = analyticUnit.lastDetectionTime + 1 || Date.now(); this._unitTimes[analyticUnit.id] = time; @@ -89,6 +94,7 @@ export class DataPuller { for await (const data of dataGenerator) { if(!_.has(this._unitTimes, analyticUnit.id)) { + console.log(`data puller: ${analyticUnit.id} not in _unitTimes, break`); break; } @@ -119,20 +125,21 @@ export class DataPuller { async * getDataGenerator(analyticUnit: AnalyticUnit.AnalyticUnit, duration: number): AsyncIterableIterator { - if(!this.analyticsService.ready) { - return { - columns: [], - values: [] + const getData = async () => { + if(!this.analyticsService.ready) { + console.debug(`data generator: analytic service not ready, return empty result while wait service`); + return { + columns: [], + values: [] + }; } - } - const getData = async () => { try { const time = this._unitTimes[analyticUnit.id] const now = Date.now(); return await this.pullData(analyticUnit, time, now); } catch(err) { - throw new Error(`Error while pulling data: ${err.message}`); + throw new Error(`error while pulling data: ${err.message}`); } } diff --git a/server/src/services/notification_service.ts b/server/src/services/notification_service.ts index fd6bc1b..6bdbae4 100644 --- a/server/src/services/notification_service.ts +++ b/server/src/services/notification_service.ts @@ -13,7 +13,7 @@ export async function sendWebhook(analyticUnitName: string, segment: Segment) { to: segment.to }; - console.log(`Sending alert: ${JSON.stringify(alert)}`); + console.log(`Sending alert name:${alert.analyticUnitName} from:${new Date(alert.from)} to:${new Date(alert.to)}`); if(HASTIC_WEBHOOK_URL === null) { throw new Error(`Can't send alert, HASTIC_WEBHOOK_URL is undefined`); @@ -37,8 +37,7 @@ export async function sendWebhook(analyticUnitName: string, segment: Segment) { }; try { - const response = await axios(options); - console.log(response); + await axios(options); } catch(err) { console.error(`Can't send alert to ${HASTIC_WEBHOOK_URL}. Error: ${err.message}`); }