Browse Source

Move analytics to hastic/analytics repo (#890)

* move analytics to hastic/analytics repo

* analytics submodule
pull/1/head
corpglory-dev 5 years ago committed by GitHub
parent
commit
7d9a217596
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      .gitmodules
  2. 1
      analytics
  3. 2
      analytics/.dockerignore
  4. 5
      analytics/.gitignore
  5. 1
      analytics/.vscode/.env
  6. 32
      analytics/.vscode/launch.json
  7. 22
      analytics/.vscode/settings.json
  8. 27
      analytics/Codestyle.md
  9. 12
      analytics/Dockerfile
  10. 12
      analytics/README.md
  11. 39
      analytics/analytics/analytic_types/__init__.py
  12. 38
      analytics/analytics/analytic_types/cache.py
  13. 14
      analytics/analytics/analytic_types/data_bucket.py
  14. 47
      analytics/analytics/analytic_types/detector.py
  15. 17
      analytics/analytics/analytic_types/learning_info.py
  16. 57
      analytics/analytics/analytic_types/segment.py
  17. 103
      analytics/analytics/analytic_unit_manager.py
  18. 116
      analytics/analytics/analytic_unit_worker.py
  19. 30
      analytics/analytics/config.py
  20. 4
      analytics/analytics/detectors/__init__.py
  21. 277
      analytics/analytics/detectors/anomaly_detector.py
  22. 80
      analytics/analytics/detectors/detector.py
  23. 147
      analytics/analytics/detectors/pattern_detector.py
  24. 111
      analytics/analytics/detectors/threshold_detector.py
  25. 9
      analytics/analytics/models/__init__.py
  26. 30
      analytics/analytics/models/custom_model.py
  27. 9
      analytics/analytics/models/drop_model.py
  28. 104
      analytics/analytics/models/general_model.py
  29. 9
      analytics/analytics/models/jump_model.py
  30. 230
      analytics/analytics/models/model.py
  31. 44
      analytics/analytics/models/peak_model.py
  32. 147
      analytics/analytics/models/stair_model.py
  33. 119
      analytics/analytics/models/triangle_model.py
  34. 44
      analytics/analytics/models/trough_model.py
  35. 94
      analytics/analytics/server.py
  36. 2
      analytics/analytics/services/__init__.py
  37. 85
      analytics/analytics/services/data_service.py
  38. 132
      analytics/analytics/services/server_service.py
  39. 4
      analytics/analytics/utils/__init__.py
  40. 443
      analytics/analytics/utils/common.py
  41. 130
      analytics/analytics/utils/concurrent.py
  42. 63
      analytics/analytics/utils/dataframe.py
  43. 81
      analytics/analytics/utils/meta.py
  44. 13
      analytics/analytics/utils/time.py
  45. 32
      analytics/bin/server
  46. 1
      analytics/pyinstaller_hooks/hook-pandas.py
  47. 1
      analytics/pyinstaller_hooks/hook-scipy.py
  48. 7
      analytics/requirements.txt
  49. 3
      analytics/scripts/build-dist.sh
  50. 4
      analytics/tests/__init__.py
  51. 16
      analytics/tests/test_analytic_types.py
  52. 38
      analytics/tests/test_bucket.py
  53. 386
      analytics/tests/test_dataset.py
  54. 265
      analytics/tests/test_detectors.py
  55. 100
      analytics/tests/test_manager.py
  56. 43
      analytics/tests/test_models.py
  57. 359
      analytics/tests/test_utils.py
  58. 43
      analytics/tests/test_utils_dataframe.py
  59. 122
      analytics/tools/analytic_model_tester.py
  60. 104
      analytics/tools/send_zmq_message.py

3
.gitmodules vendored

@ -0,0 +1,3 @@
[submodule "analytics"]
path = analytics
url = https://github.com/hastic/analytics

1
analytics

@ -0,0 +1 @@
Subproject commit 8734258c84f3278bbc14508e1222c73dda5f90cd

2
analytics/.dockerignore

@ -1,2 +0,0 @@
__pycache__
.vscode

5
analytics/.gitignore vendored

@ -1,5 +0,0 @@
build/
dist/
*.spec
__pycache__/
test/

1
analytics/.vscode/.env vendored

@ -1 +0,0 @@
PYTHONPATH=analytics

32
analytics/.vscode/launch.json vendored

@ -1,32 +0,0 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Attach (Remote Debug)",
"type": "python",
"request": "attach",
"port": 5679,
"host": "localhost",
"pathMappings": [
{
"localRoot": "${workspaceFolder}",
"remoteRoot": "/var/www/analytics"
}
]
},
{
"name": "Python: Current File",
"type": "python",
"request": "launch",
"windows": {
"program": "${workspaceFolder}\\bin\\server"
},
"linux": {
"program": "${workspaceFolder}/bin/server"
}
}
]
}

22
analytics/.vscode/settings.json vendored

@ -1,22 +0,0 @@
{
"terminal.integrated.shell.windows": "C:\\WINDOWS\\System32\\WindowsPowerShell\\v1.0\\powershell.exe",
"editor.insertSpaces": true,
"files.eol": "\n",
"files.exclude": {
"**/__pycache__/": true,
"dist": true,
"build": true
},
"[python]": {
"editor.tabSize": 4,
},
"python.envFile": "${workspaceFolder}/.vscode/.env",
"python.pythonPath": "python",
"python.linting.enabled": true,
"python.testing.unittestArgs": [ "-v" ],
"python.testing.pytestEnabled": false,
"python.testing.nosetestsEnabled": false,
"python.testing.unittestEnabled": true,
"python.linting.pylintEnabled": true,
"python.jediEnabled": false
}

27
analytics/Codestyle.md

@ -1,27 +0,0 @@
# Type hints
Please use: https://www.python.org/dev/peps/pep-0484/
# Line endings
We use LF everywhere
# Imports
You import local files first, than spesific liba and then standart libs.
So you import from something very scecific to something very common.
It allows you to pay attention on most important things from beginning.
```
from data_provider import DataProvider
from anomaly_model import AnomalyModel
from pattern_detection_model import PatternDetectionModel
import numpy as np
from scipy.signal import argrelextrema
import pickle
```

12
analytics/Dockerfile

@ -1,12 +0,0 @@
FROM python:3.6.6
COPY requirements.txt /requirements.txt
RUN pip install -r /requirements.txt
WORKDIR /var/www/analytics
COPY . /var/www/analytics/
CMD ["python", "-u", "bin/server"]

12
analytics/README.md

@ -1,12 +0,0 @@
# Hastic-server-analytics
Python service which gets tasks from [hastic-server-node](https://github.com/hastic/hastic-server/tree/master/server) like
* trains statistical models
* detect patterns in time series data
## Arhitecture
The service uses [asyncio](https://docs.python.org/3/library/asyncio.html),
[concurrency](https://docs.python.org/3.6/library/concurrent.futures.html#module-concurrent.futures) and
[pyzmq](https://pyzmq.readthedocs.io/en/latest/).

39
analytics/analytics/analytic_types/__init__.py

@ -1,39 +0,0 @@
"""
It is the place where we put all classes and types
common for all analytics code
For example, if you write someting which is used
in analytic_unit_manager, it should be here.
If you create something spicific which is used only in one place,
like PatternDetectionCache, then it should not be here.
"""
import pandas as pd
from typing import Union, List, Tuple
AnalyticUnitId = str
ModelCache = dict
# TODO: explicit timestamp / value
TimeSeries = List[Tuple[int, float]]
"""
Example:
tsis = TimeSeriesIndex(['2017-12-31 16:00:00-08:00', '2017-12-31 17:00:00-08:00', '2017-12-31 18:00:00-08:00'])
ts = TimeSeries([4, 5, 6], tsis)
"""
Timestamp = Union[str, pd.Timestamp]
class TimeSeriesIndex(pd.DatetimeIndex):
def __new__(cls, *args, **kwargs):
return pd.DatetimeIndex.__new__(cls, *args, **kwargs)
# TODO: make generic type for values. See List definition for example of generic class
# TODO: constructor from DataFrame
# TODO: repleace TimeSeries (above) with this class: rename TimeSeries2 to TimeSeries
class TimeSeries2(pd.Series):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

38
analytics/analytics/analytic_types/cache.py

@ -1,38 +0,0 @@
from typing import Optional, List, Dict
from analytic_types.segment import AnomalyDetectorSegment
from analytic_types.detector import Bound
from utils.meta import JSONClass, SerializableList
@JSONClass
class AnomalyCache:
def __init__(
self,
alpha: float,
confidence: float,
enable_bounds: str,
seasonality: Optional[int] = None,
segments: Optional[List[Dict]] = None,
time_step: Optional[int] = None,
):
self.alpha = alpha
self.confidence = confidence
self.enable_bounds = enable_bounds
if seasonality != None and seasonality < 0:
raise ValueError(f'Can`t create AnomalyCache: got invalid seasonality {seasonality}')
self.seasonality = seasonality
self.time_step = time_step
if segments != None:
anomaly_segments = map(AnomalyDetectorSegment.from_json, segments)
self.segments = SerializableList(anomaly_segments)
else:
self.segments = []
def set_segments(self, segments: List[AnomalyDetectorSegment]):
if len(segments) > 0:
self.segments = SerializableList(segments)
def get_enabled_bounds(self) -> Bound:
#TODO: use class with to_json()
return Bound(self.enable_bounds)

14
analytics/analytics/analytic_types/data_bucket.py

@ -1,14 +0,0 @@
import pandas as pd
class DataBucket:
def __init__(self):
self.data = pd.DataFrame([], columns=['timestamp', 'value'])
def receive_data(self, data: pd.DataFrame):
self.data = self.data.append(data, ignore_index=True)
def drop_data(self, count: int):
if count > 0:
self.data = self.data.iloc[count:]

47
analytics/analytics/analytic_types/detector.py

@ -1,47 +0,0 @@
from analytic_types import ModelCache, TimeSeries
from analytic_types.segment import Segment
from enum import Enum
from typing import List, Optional, Tuple
import utils.meta
class Bound(Enum):
ALL = 'ALL'
UPPER = 'UPPER'
LOWER = 'LOWER'
class DetectionResult:
def __init__(
self,
cache: Optional[ModelCache] = None,
segments: Optional[List[Segment]] = None,
last_detection_time: int = None
):
if cache is None:
cache = {}
if segments is None:
segments = []
self.cache = cache
self.segments = segments
self.last_detection_time = last_detection_time
# TODO: use @utils.meta.JSONClass (now it can't serialize list of objects)
def to_json(self):
return {
'cache': self.cache,
'segments': list(map(lambda segment: segment.to_json(), self.segments)),
'lastDetectionTime': self.last_detection_time
}
@utils.meta.JSONClass
class ProcessingResult():
def __init__(
self,
lower_bound: Optional[TimeSeries] = None,
upper_bound: Optional[TimeSeries] = None,
):
self.lower_bound = lower_bound
self.upper_bound = upper_bound

17
analytics/analytics/analytic_types/learning_info.py

@ -1,17 +0,0 @@
import utils.meta
@utils.meta.JSONClass
class LearningInfo:
def __init__(self):
super().__init__()
self.confidence = []
self.patterns_list = []
self.pattern_width = []
self.pattern_height = []
self.pattern_timestamp = []
self.segment_center_list = []
self.patterns_value = []
def __str__(self):
return str(self.to_json())

57
analytics/analytics/analytic_types/segment.py

@ -1,57 +0,0 @@
from typing import Optional
import utils.meta
@utils.meta.JSONClass
class Segment:
'''
Used for segment manipulation instead of { 'from': ..., 'to': ... } dict
'''
def __init__(
self,
from_timestamp: int,
to_timestamp: int,
_id: Optional[str] = None,
analytic_unit_id: Optional[str] = None,
labeled: Optional[bool] = None,
deleted: Optional[bool] = None,
message: Optional[str] = None
):
if to_timestamp < from_timestamp:
raise ValueError(f'Can`t create segment with to < from: {to_timestamp} < {from_timestamp}')
self.from_timestamp = from_timestamp
self.to_timestamp = to_timestamp
self._id = _id
self.analytic_unit_id = analytic_unit_id
self.labeled = labeled
self.deleted = deleted
self.message = message
@utils.meta.JSONClass
class AnomalyDetectorSegment(Segment):
'''
Used for segment manipulation instead of { 'from': ..., 'to': ..., 'data': ... } dict
'''
def __init__(
self,
from_timestamp: int,
to_timestamp: int,
data = [],
_id: Optional[str] = None,
analytic_unit_id: Optional[str] = None,
labeled: Optional[bool] = None,
deleted: Optional[bool] = None,
message: Optional[str] = None
):
super().__init__(
from_timestamp,
to_timestamp,
_id,
analytic_unit_id,
labeled,
deleted,
message
)
self.data = data

103
analytics/analytics/analytic_unit_manager.py

@ -1,103 +0,0 @@
from typing import Dict
import logging as log
import traceback
from concurrent.futures import Executor, ThreadPoolExecutor
from analytic_unit_worker import AnalyticUnitWorker
from analytic_types import AnalyticUnitId, ModelCache
from analytic_types.segment import Segment
import detectors
logger = log.getLogger('AnalyticUnitManager')
def get_detector_by_type(
detector_type: str, analytic_unit_type: str, analytic_unit_id: AnalyticUnitId
) -> detectors.Detector:
if detector_type == 'pattern':
return detectors.PatternDetector(analytic_unit_type, analytic_unit_id)
elif detector_type == 'threshold':
return detectors.ThresholdDetector(analytic_unit_id)
elif detector_type == 'anomaly':
return detectors.AnomalyDetector(analytic_unit_id)
raise ValueError('Unknown detector type "%s"' % detector_type)
class AnalyticUnitManager:
def __init__(self):
self.analytic_workers: Dict[AnalyticUnitId, AnalyticUnitWorker] = dict()
self.workers_executor = ThreadPoolExecutor()
def __ensure_worker(
self,
analytic_unit_id: AnalyticUnitId,
detector_type: str,
analytic_unit_type: str
) -> AnalyticUnitWorker:
if analytic_unit_id in self.analytic_workers:
# TODO: check that type is the same
return self.analytic_workers[analytic_unit_id]
detector = get_detector_by_type(detector_type, analytic_unit_type, analytic_unit_id)
worker = AnalyticUnitWorker(analytic_unit_id, detector, self.workers_executor)
self.analytic_workers[analytic_unit_id] = worker
return worker
async def __handle_analytic_task(self, task: object) -> dict:
"""
returns payload or None
"""
analytic_unit_id: AnalyticUnitId = task['analyticUnitId']
log.debug('Analytics get task with type: {} for unit: {}'.format(task['type'], analytic_unit_id))
if task['type'] == 'CANCEL':
if analytic_unit_id in self.analytic_workers:
self.analytic_workers[analytic_unit_id].cancel()
return
payload = task['payload']
worker = self.__ensure_worker(analytic_unit_id, payload['detector'], payload['analyticUnitType'])
data = payload.get('data')
if task['type'] == 'PUSH':
# TODO: do it a better way
res = await worker.consume_data(data, payload['cache'])
if res:
res.update({ 'analyticUnitId': analytic_unit_id })
return res
elif task['type'] == 'LEARN':
if 'segments' in payload:
segments = payload['segments']
segments = [Segment.from_json(segment) for segment in segments]
return await worker.do_train(segments, data, payload['cache'])
elif 'threshold' in payload:
return await worker.do_train(payload['threshold'], data, payload['cache'])
elif 'anomaly' in payload:
return await worker.do_train(payload['anomaly'], data, payload['cache'])
else:
raise ValueError('No segments or threshold in LEARN payload')
elif task['type'] == 'DETECT':
return await worker.do_detect(data, payload['cache'])
elif task['type'] == 'PROCESS':
return await worker.process_data(data, payload['cache'])
raise ValueError('Unknown task type "%s"' % task['type'])
async def handle_analytic_task(self, task: object):
try:
log.debug('Start handle_analytic_task with analytic unit: {}'.format(task['analyticUnitId']))
result_payload = await self.__handle_analytic_task(task)
result_message = {
'status': 'SUCCESS',
'payload': result_payload
}
log.debug('End correctly handle_analytic_task with anatytic unit: {}'.format(task['analyticUnitId']))
return result_message
except Exception as e:
error_text = traceback.format_exc()
logger.error("handle_analytic_task Exception: '%s'" % error_text)
# TODO: move result to a class which renders to json for messaging to analytics
return {
'status': 'FAILED',
'error': repr(e)
}

116
analytics/analytics/analytic_unit_worker.py

@ -1,116 +0,0 @@
import config
import detectors
import logging
import pandas as pd
from typing import Optional, Union, Generator, List, Tuple
import concurrent.futures
import asyncio
import utils
from utils import get_intersected_chunks, get_chunks, prepare_data
from analytic_types import ModelCache, TimeSeries
from analytic_types.detector import DetectionResult
logger = logging.getLogger('AnalyticUnitWorker')
class AnalyticUnitWorker:
CHUNK_WINDOW_SIZE_FACTOR = 100
CHUNK_INTERSECTION_FACTOR = 2
assert CHUNK_WINDOW_SIZE_FACTOR > CHUNK_INTERSECTION_FACTOR, \
'CHUNK_INTERSECTION_FACTOR should be less than CHUNK_WINDOW_SIZE_FACTOR'
def __init__(self, analytic_unit_id: str, detector: detectors.Detector, executor: concurrent.futures.Executor):
self.analytic_unit_id = analytic_unit_id
self._detector = detector
self._executor: concurrent.futures.Executor = executor
self._training_future: asyncio.Future = None
async def do_train(
self, payload: Union[list, dict], data: TimeSeries, cache: Optional[ModelCache]
) -> Optional[ModelCache]:
dataframe = prepare_data(data)
cfuture: concurrent.futures.Future = self._executor.submit(
self._detector.train, dataframe, payload, cache
)
self._training_future = asyncio.wrap_future(cfuture)
try:
new_cache: ModelCache = await asyncio.wait_for(self._training_future, timeout = config.LEARNING_TIMEOUT)
return new_cache
except asyncio.CancelledError:
return None
except asyncio.TimeoutError:
raise Exception('Timeout ({}s) exceeded while learning'.format(config.LEARNING_TIMEOUT))
async def do_detect(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> DetectionResult:
window_size = self._detector.get_window_size(cache)
chunk_size = window_size * self.CHUNK_WINDOW_SIZE_FACTOR
chunk_intersection = window_size * self.CHUNK_INTERSECTION_FACTOR
detections: List[DetectionResult] = []
chunks = []
# XXX: get_chunks(data, chunk_size) == get_intersected_chunks(data, 0, chunk_size)
if self._detector.is_detection_intersected():
chunks = get_intersected_chunks(data, chunk_intersection, chunk_size)
else:
chunks = get_chunks(data, chunk_size)
for chunk in chunks:
await asyncio.sleep(0)
chunk_dataframe = prepare_data(chunk)
detected: DetectionResult = self._detector.detect(chunk_dataframe, cache)
detections.append(detected)
if len(detections) == 0:
raise RuntimeError(f'do_detect for {self.analytic_unit_id} got empty detection results')
detection_result = self._detector.concat_detection_results(detections)
return detection_result.to_json()
def cancel(self):
if self._training_future is not None:
self._training_future.cancel()
async def consume_data(self, data: TimeSeries, cache: Optional[ModelCache]) -> Optional[dict]:
window_size = self._detector.get_window_size(cache)
detections: List[DetectionResult] = []
for chunk in get_chunks(data, window_size * self.CHUNK_WINDOW_SIZE_FACTOR):
await asyncio.sleep(0)
chunk_dataframe = prepare_data(chunk)
detected = self._detector.consume_data(chunk_dataframe, cache)
if detected is not None:
detections.append(detected)
if len(detections) == 0:
return None
else:
detection_result = self._detector.concat_detection_results(detections)
return detection_result.to_json()
async def process_data(self, data: TimeSeries, cache: ModelCache) -> dict:
assert isinstance(self._detector, detectors.ProcessingDetector), \
f'{self.analytic_unit_id} detector is not ProcessingDetector, can`t process data'
assert cache is not None, f'{self.analytic_unit_id} got empty cache for processing data'
processed_chunks = []
window_size = self._detector.get_window_size(cache)
for chunk in get_chunks(data, window_size * self.CHUNK_WINDOW_SIZE_FACTOR):
await asyncio.sleep(0)
chunk_dataframe = prepare_data(chunk)
processed = self._detector.process_data(chunk_dataframe, cache)
if processed is not None:
processed_chunks.append(processed)
if len(processed_chunks) == 0:
raise RuntimeError(f'process_data for {self.analytic_unit_id} got empty processing results')
# TODO: maybe we should process all chunks inside of detector?
result = self._detector.concat_processing_results(processed_chunks)
return result.to_json()

30
analytics/analytics/config.py

@ -1,30 +0,0 @@
import os
import json
PARENT_FOLDER = os.path.dirname(os.path.dirname(os.path.dirname(os.path.realpath(__file__))))
CONFIG_FILE = os.path.join(PARENT_FOLDER, 'config.json')
config_exists = os.path.isfile(CONFIG_FILE)
if config_exists:
with open(CONFIG_FILE) as f:
config = json.load(f)
else:
print('Config file %s doesn`t exist, using defaults' % CONFIG_FILE)
def get_config_field(field: str, default_val = None):
if field in os.environ:
return os.environ[field]
if config_exists and field in config and config[field] != '':
return config[field]
if default_val is not None:
return default_val
raise Exception('Please configure {}'.format(field))
HASTIC_SERVER_URL = get_config_field('HASTIC_SERVER_URL', 'ws://localhost:8002')
LEARNING_TIMEOUT = get_config_field('LEARNING_TIMEOUT', 120)

4
analytics/analytics/detectors/__init__.py

@ -1,4 +0,0 @@
from detectors.detector import Detector, ProcessingDetector
from detectors.pattern_detector import PatternDetector
from detectors.threshold_detector import ThresholdDetector
from detectors.anomaly_detector import AnomalyDetector

277
analytics/analytics/detectors/anomaly_detector.py

@ -1,277 +0,0 @@
from enum import Enum
import logging
import numpy as np
import pandas as pd
import math
from typing import Optional, Union, List, Tuple, Generator
import operator
from analytic_types import AnalyticUnitId, ModelCache
from analytic_types.detector import DetectionResult, ProcessingResult, Bound
from analytic_types.data_bucket import DataBucket
from analytic_types.segment import Segment, AnomalyDetectorSegment
from analytic_types.cache import AnomalyCache
from detectors import Detector, ProcessingDetector
import utils
MAX_DEPENDENCY_LEVEL = 100
MIN_DEPENDENCY_FACTOR = 0.1
BASIC_ALPHA = 0.5
logger = logging.getLogger('ANOMALY_DETECTOR')
class AnomalyDetector(ProcessingDetector):
def __init__(self, analytic_unit_id: AnalyticUnitId):
super().__init__(analytic_unit_id)
self.bucket = DataBucket()
def train(self, dataframe: pd.DataFrame, payload: Union[list, dict], cache: Optional[ModelCache]) -> ModelCache:
cache = AnomalyCache.from_json(payload)
cache.time_step = utils.find_interval(dataframe)
segments = cache.segments
if len(segments) > 0:
seasonality = cache.seasonality
prepared_segments = []
for segment in segments:
segment_len = (int(segment.to_timestamp) - int(segment.from_timestamp))
assert segment_len <= seasonality, \
f'seasonality {seasonality} must be greater than segment length {segment_len}'
from_index = utils.timestamp_to_index(dataframe, pd.to_datetime(segment.from_timestamp, unit='ms'))
to_index = utils.timestamp_to_index(dataframe, pd.to_datetime(segment.to_timestamp, unit='ms'))
segment_data = dataframe[from_index : to_index]
prepared_segments.append(
AnomalyDetectorSegment(
segment.from_timestamp,
segment.to_timestamp,
segment_data.value.tolist()
)
)
cache.set_segments(prepared_segments)
return {
'cache': cache.to_json()
}
# TODO: ModelCache -> DetectorState
def detect(self, dataframe: pd.DataFrame, cache: Optional[ModelCache]) -> DetectionResult:
if cache == None:
raise f'Analytic unit {self.analytic_unit_id} got empty cache'
data = dataframe['value']
cache = AnomalyCache.from_json(cache)
segments = cache.segments
enabled_bounds = cache.get_enabled_bounds()
smoothed_data = utils.exponential_smoothing(data, cache.alpha)
lower_bound = smoothed_data - cache.confidence
upper_bound = smoothed_data + cache.confidence
if len(segments) > 0:
data_start_time = utils.convert_pd_timestamp_to_ms(dataframe['timestamp'][0])
for segment in segments:
seasonality_index = cache.seasonality // cache.time_step
seasonality_offset = self.get_seasonality_offset(
segment.from_timestamp,
cache.seasonality,
data_start_time,
cache.time_step
)
segment_data = pd.Series(segment.data)
lower_bound = self.add_season_to_data(lower_bound, segment_data, seasonality_offset, seasonality_index, Bound.LOWER)
upper_bound = self.add_season_to_data(upper_bound, segment_data, seasonality_offset, seasonality_index, Bound.UPPER)
detected_segments = list(self.detections_generator(dataframe, upper_bound, lower_bound, enabled_bounds))
last_dataframe_time = dataframe.iloc[-1]['timestamp']
last_detection_time = utils.convert_pd_timestamp_to_ms(last_dataframe_time)
return DetectionResult(cache.to_json(), detected_segments, last_detection_time)
def consume_data(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> Optional[DetectionResult]:
if cache is None:
msg = f'consume_data got invalid cache {cache} for task {self.analytic_unit_id}'
logging.debug(msg)
raise ValueError(msg)
data_without_nan = data.dropna()
if len(data_without_nan) == 0:
return None
self.bucket.receive_data(data_without_nan)
if len(self.bucket.data) >= self.get_window_size(cache):
return self.detect(self.bucket.data, cache)
return None
def is_detection_intersected(self) -> bool:
return False
def get_window_size(self, cache: Optional[ModelCache]) -> int:
'''
get the number of values that will affect the next value
'''
if cache is None:
raise ValueError('anomaly detector got None cache')
cache = AnomalyCache.from_json(cache)
for level in range(1, MAX_DEPENDENCY_LEVEL):
if (1 - cache.alpha) ** level < MIN_DEPENDENCY_FACTOR:
break
seasonality = 0
if len(cache.segments) > 0:
seasonality = cache.seasonality // cache.time_step
return max(level, seasonality)
def concat_detection_results(self, detections: List[DetectionResult]) -> DetectionResult:
result = DetectionResult()
time_step = detections[0].cache['timeStep']
for detection in detections:
result.segments.extend(detection.segments)
result.last_detection_time = detection.last_detection_time
result.cache = detection.cache
result.segments = utils.merge_intersecting_segments(result.segments, time_step)
return result
# TODO: remove duplication with detect()
def process_data(self, dataframe: pd.DataFrame, cache: ModelCache) -> ProcessingResult:
cache = AnomalyCache.from_json(cache)
segments = cache.segments
enabled_bounds = cache.get_enabled_bounds()
# TODO: exponential_smoothing should return dataframe with related timestamps
smoothed_data = utils.exponential_smoothing(dataframe['value'], cache.alpha)
lower_bound = smoothed_data - cache.confidence
upper_bound = smoothed_data + cache.confidence
if len(segments) > 0:
data_start_time = utils.convert_pd_timestamp_to_ms(dataframe['timestamp'][0])
for segment in segments:
seasonality_index = cache.seasonality // cache.time_step
# TODO: move it to utils and add tests
seasonality_offset = self.get_seasonality_offset(
segment.from_timestamp,
cache.seasonality,
data_start_time,
cache.time_step
)
segment_data = pd.Series(segment.data)
lower_bound = self.add_season_to_data(lower_bound, segment_data, seasonality_offset, seasonality_index, Bound.LOWER)
upper_bound = self.add_season_to_data(upper_bound, segment_data, seasonality_offset, seasonality_index, Bound.UPPER)
# TODO: support multiple segments
timestamps = utils.convert_series_to_timestamp_list(dataframe.timestamp)
lower_bound_timeseries = list(zip(timestamps, lower_bound.values.tolist()))
upper_bound_timeseries = list(zip(timestamps, upper_bound.values.tolist()))
if enabled_bounds == Bound.ALL:
return ProcessingResult(lower_bound_timeseries, upper_bound_timeseries)
elif enabled_bounds == Bound.UPPER:
return ProcessingResult(upper_bound = upper_bound_timeseries)
elif enabled_bounds == Bound.LOWER:
return ProcessingResult(lower_bound = lower_bound_timeseries)
def add_season_to_data(self, data: pd.Series, segment: pd.Series, offset: int, seasonality: int, bound_type: Bound) -> pd.Series:
#data - smoothed data to which seasonality will be added
#if addition == True -> segment is added
#if addition == False -> segment is subtracted
len_smoothed_data = len(data)
for idx, _ in enumerate(data):
if idx - offset < 0:
#TODO: add seasonality for non empty parts
continue
if (idx - offset) % seasonality == 0:
if bound_type == Bound.UPPER:
upper_segment_bound = self.get_segment_bound(segment, Bound.UPPER)
data = data.add(pd.Series(upper_segment_bound.values, index = segment.index + idx), fill_value = 0)
elif bound_type == Bound.LOWER:
lower_segment_bound = self.get_segment_bound(segment, Bound.LOWER)
data = data.add(pd.Series(lower_segment_bound.values * -1, index = segment.index + idx), fill_value = 0)
else:
raise ValueError(f'unknown bound type: {bound_type.value}')
return data[:len_smoothed_data]
def get_segment_bound(self, segment: pd.Series, bound: Bound) -> pd.Series:
'''
segment is divided by the median to determine its top or bottom part
the part is smoothed and raised above the segment or put down below the segment
'''
if len(segment) < 2:
return segment
comparison_operator = operator.gt if bound == Bound.UPPER else operator.le
segment = segment - segment.min()
segment_median = segment.median()
part = [val if comparison_operator(val, segment_median) else segment_median for val in segment.values]
part = pd.Series(part, index = segment.index)
smoothed_part = utils.exponential_smoothing(part, BASIC_ALPHA)
difference = [abs(x - y) for x, y in zip(part, smoothed_part)]
max_diff = max(difference)
bound = [val + max_diff for val in smoothed_part.values]
bound = pd.Series(bound, index = segment.index)
return bound
def get_seasonality_offset(self, from_timestamp: int, seasonality: int, data_start_time: int, time_step: int) -> int:
season_count = math.ceil(abs(from_timestamp - data_start_time) / seasonality)
start_seasonal_segment = from_timestamp + seasonality * season_count
seasonality_time_offset = abs(start_seasonal_segment - data_start_time) % seasonality
seasonality_offset = math.ceil(seasonality_time_offset / time_step)
return seasonality_offset
def detections_generator(
self,
dataframe: pd.DataFrame,
upper_bound: pd.DataFrame,
lower_bound: pd.DataFrame,
enabled_bounds: Bound
) -> Generator[Segment, None, Segment]:
in_segment = False
segment_start = 0
bound: Bound = None
for idx, val in enumerate(dataframe['value'].values):
if val > upper_bound.values[idx]:
if enabled_bounds == Bound.UPPER or enabled_bounds == Bound.ALL:
if not in_segment:
in_segment = True
segment_start = dataframe['timestamp'][idx]
bound = Bound.UPPER
continue
if val < lower_bound.values[idx]:
if enabled_bounds == Bound.LOWER or enabled_bounds == Bound.ALL:
if not in_segment:
in_segment = True
segment_start = dataframe['timestamp'][idx]
bound = Bound.LOWER
continue
if in_segment:
segment_end = dataframe['timestamp'][idx - 1]
yield Segment(
utils.convert_pd_timestamp_to_ms(segment_start),
utils.convert_pd_timestamp_to_ms(segment_end),
message=f'{val} out of {str(bound.value)} bound'
)
in_segment = False
else:
if in_segment:
segment_end = dataframe['timestamp'][idx]
return Segment(
utils.convert_pd_timestamp_to_ms(segment_start),
utils.convert_pd_timestamp_to_ms(segment_end),
message=f'{val} out of {str(bound.value)} bound'
)

80
analytics/analytics/detectors/detector.py

@ -1,80 +0,0 @@
from abc import ABC, abstractmethod
from pandas import DataFrame
from typing import Optional, Union, List
from analytic_types import ModelCache, TimeSeries, AnalyticUnitId
from analytic_types.detector import DetectionResult, ProcessingResult
from analytic_types.segment import Segment
class Detector(ABC):
def __init__(self, analytic_unit_id: AnalyticUnitId):
self.analytic_unit_id = analytic_unit_id
@abstractmethod
def train(self, dataframe: DataFrame, payload: Union[list, dict], cache: Optional[ModelCache]) -> ModelCache:
"""
Should be thread-safe to other detectors' train method
"""
pass
@abstractmethod
def detect(self, dataframe: DataFrame, cache: Optional[ModelCache]) -> DetectionResult:
pass
@abstractmethod
def consume_data(self, data: DataFrame, cache: Optional[ModelCache]) -> Optional[DetectionResult]:
pass
@abstractmethod
def get_window_size(self, cache: Optional[ModelCache]) -> int:
pass
def is_detection_intersected(self) -> bool:
return True
def concat_detection_results(self, detections: List[DetectionResult]) -> DetectionResult:
result = DetectionResult()
for detection in detections:
result.segments.extend(detection.segments)
result.last_detection_time = detection.last_detection_time
result.cache = detection.cache
return result
def get_value_from_cache(self, cache: ModelCache, key: str, required = False):
value = cache.get(key)
if value == None and required:
raise ValueError(f'Missing required "{key}" field in cache for analytic unit {self.analytic_unit_id}')
return value
class ProcessingDetector(Detector):
@abstractmethod
def process_data(self, data: TimeSeries, cache: Optional[ModelCache]) -> ProcessingResult:
'''
Data processing to receive additional time series that represents detector's settings
'''
pass
def concat_processing_results(self, processing_results: List[ProcessingResult]) -> Optional[ProcessingResult]:
'''
Concatenate sequential ProcessingResults that received via
splitting dataset to chunks in analytic worker
'''
if len(processing_results) == 0:
return None
united_result = ProcessingResult()
for result in processing_results:
if result.lower_bound is not None:
if united_result.lower_bound is None: united_result.lower_bound = []
united_result.lower_bound.extend(result.lower_bound)
if result.upper_bound is not None:
if united_result.upper_bound is None: united_result.upper_bound = []
united_result.upper_bound.extend(result.upper_bound)
return united_result

147
analytics/analytics/detectors/pattern_detector.py

@ -1,147 +0,0 @@
import models
import asyncio
import logging
import config
import pandas as pd
from typing import Optional, Generator, List
from detectors import Detector
from analytic_types.data_bucket import DataBucket
from utils import convert_pd_timestamp_to_ms
from analytic_types import AnalyticUnitId, ModelCache
from analytic_types.detector import DetectionResult
from analytic_types.segment import Segment
import utils
logger = logging.getLogger('PATTERN_DETECTOR')
def resolve_model_by_pattern(pattern: str) -> models.Model:
if pattern == 'GENERAL':
return models.GeneralModel()
if pattern == 'PEAK':
return models.PeakModel()
if pattern == 'TROUGH':
return models.TroughModel()
if pattern == 'DROP':
return models.DropModel()
if pattern == 'JUMP':
return models.JumpModel()
if pattern == 'CUSTOM':
return models.CustomModel()
raise ValueError('Unknown pattern "%s"' % pattern)
class PatternDetector(Detector):
MIN_BUCKET_SIZE = 150
BUCKET_WINDOW_SIZE_FACTOR = 5
DEFAULT_WINDOW_SIZE = 1
def __init__(self, pattern_type: str, analytic_unit_id: AnalyticUnitId):
super().__init__(analytic_unit_id)
self.pattern_type = pattern_type
self.model = resolve_model_by_pattern(self.pattern_type)
self.bucket = DataBucket()
def train(self, dataframe: pd.DataFrame, segments: List[Segment], cache: Optional[ModelCache]) -> ModelCache:
# TODO: pass only part of dataframe that has segments
if self.contains_labeled_segments(segments) == False:
msg = f'{self.analytic_unit_id} has no positive labeled segments. Pattern detector needs at least 1 positive labeled segment'
logger.error(msg)
raise ValueError(msg)
self.model.state: models.ModelState = self.model.get_state(cache)
new_cache: models.ModelState = self.model.fit(dataframe, segments, self.analytic_unit_id)
# time step is optional
if len(dataframe) > 1:
new_cache.time_step = utils.find_interval(dataframe)
new_cache = new_cache.to_json()
if len(new_cache) == 0:
logging.warning('new_cache is empty with data: {}, segments: {}, cache: {}, analytic unit: {}'.format(dataframe, segments, cache, self.analytic_unit_id))
return {
'cache': new_cache
}
def detect(self, dataframe: pd.DataFrame, cache: Optional[ModelCache]) -> DetectionResult:
logger.debug('Unit {} got {} data points for detection'.format(self.analytic_unit_id, len(dataframe)))
# TODO: split and sleep (https://github.com/hastic/hastic-server/pull/124#discussion_r214085643)
if cache is None:
msg = f'{self.analytic_unit_id} detection got invalid cache, skip detection'
logger.error(msg)
raise ValueError(msg)
self.model.state = self.model.get_state(cache)
window_size = self.model.state.window_size
if window_size is None:
message = '{} got cache without window_size for detection'.format(self.analytic_unit_id)
logger.error(message)
raise ValueError(message)
if len(dataframe) < window_size * 2:
message = f'{self.analytic_unit_id} skip detection: dataset length {len(dataframe)} points less than minimal length {window_size * 2} points'
logger.error(message)
raise ValueError(message)
detected = self.model.detect(dataframe, self.analytic_unit_id)
segments = [Segment(segment[0], segment[1]) for segment in detected['segments']]
new_cache = detected['cache'].to_json()
last_dataframe_time = dataframe.iloc[-1]['timestamp']
last_detection_time = convert_pd_timestamp_to_ms(last_dataframe_time)
return DetectionResult(new_cache, segments, last_detection_time)
def consume_data(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> Optional[DetectionResult]:
logging.debug('Start consume_data for analytic unit {}'.format(self.analytic_unit_id))
if cache is None:
logging.debug(f'consume_data get invalid cache {cache} for task {self.analytic_unit_id}, skip')
return None
data_without_nan = data.dropna()
if len(data_without_nan) == 0:
return None
self.bucket.receive_data(data_without_nan)
# TODO: use ModelState
window_size = cache['windowSize']
bucket_len = len(self.bucket.data)
if bucket_len < window_size * 2:
msg = f'{self.analytic_unit_id} bucket data {bucket_len} less than two window size {window_size * 2}, skip run detection from consume_data'
logger.debug(msg)
return None
res = self.detect(self.bucket.data, cache)
bucket_size = max(window_size * self.BUCKET_WINDOW_SIZE_FACTOR, self.MIN_BUCKET_SIZE)
if bucket_len > bucket_size:
excess_data = bucket_len - bucket_size
self.bucket.drop_data(excess_data)
logging.debug('End consume_data for analytic unit: {} with res: {}'.format(self.analytic_unit_id, str(res.to_json())))
if res:
return res
else:
return None
def get_window_size(self, cache: Optional[ModelCache]) -> int:
if cache is None: return self.DEFAULT_WINDOW_SIZE
# TODO: windowSize -> window_size
return cache.get('windowSize', self.DEFAULT_WINDOW_SIZE)
def contains_labeled_segments(self, segments: List[Segment]) -> bool:
for segment in segments:
if segment.labeled == True:
return True
return False

111
analytics/analytics/detectors/threshold_detector.py

@ -1,111 +0,0 @@
import logging as log
import operator
import pandas as pd
import numpy as np
from typing import Optional, List
from analytic_types import ModelCache, AnalyticUnitId
from analytic_types.detector import DetectionResult, ProcessingResult
from analytic_types.segment import Segment
from detectors import ProcessingDetector
from time import time
import utils
logger = log.getLogger('THRESHOLD_DETECTOR')
class ThresholdDetector(ProcessingDetector):
WINDOW_SIZE = 3
def __init__(self, analytic_unit_id: AnalyticUnitId):
super().__init__(analytic_unit_id)
def train(self, dataframe: pd.DataFrame, threshold: dict, cache: Optional[ModelCache]) -> ModelCache:
time_step = utils.find_interval(dataframe)
return {
'cache': {
'value': threshold['value'],
'condition': threshold['condition'],
'timeStep': time_step
}
}
def detect(self, dataframe: pd.DataFrame, cache: ModelCache) -> DetectionResult:
if cache is None or cache == {}:
raise ValueError('Threshold detector error: cannot detect before learning')
if len(dataframe) == 0:
return None
value = cache['value']
condition = cache['condition']
segments = []
for index, row in dataframe.iterrows():
current_value = row['value']
current_timestamp = utils.convert_pd_timestamp_to_ms(row['timestamp'])
segment = Segment(current_timestamp, current_timestamp)
# TODO: merge segments
if pd.isnull(current_value):
if condition == 'NO_DATA':
segment.message = 'NO_DATA detected'
segments.append(segment)
continue
comparators = {
'>': operator.gt,
'<': operator.lt,
'=': operator.eq,
'>=': operator.ge,
'<=': operator.le
}
assert condition in comparators.keys(), f'condition {condition} not allowed'
if comparators[condition](current_value, value):
segment.message = f"{current_value} {condition} threshold's value {value}"
segments.append(segment)
last_entry = dataframe.iloc[-1]
last_detection_time = utils.convert_pd_timestamp_to_ms(last_entry['timestamp'])
return DetectionResult(cache, segments, last_detection_time)
def consume_data(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> Optional[DetectionResult]:
result = self.detect(data, cache)
return result if result else None
def get_window_size(self, cache: Optional[ModelCache]) -> int:
return self.WINDOW_SIZE
def concat_detection_results(self, detections: List[DetectionResult]) -> DetectionResult:
result = DetectionResult()
time_step = detections[0].cache['timeStep']
for detection in detections:
result.segments.extend(detection.segments)
result.last_detection_time = detection.last_detection_time
result.cache = detection.cache
result.segments = utils.merge_intersecting_segments(result.segments, time_step)
return result
def process_data(self, dataframe: pd.DataFrame, cache: ModelCache) -> ProcessingResult:
data = dataframe['value']
value = self.get_value_from_cache(cache, 'value', required = True)
condition = self.get_value_from_cache(cache, 'condition', required = True)
if condition == 'NO_DATA':
return ProcessingResult()
data.values[:] = value
timestamps = utils.convert_series_to_timestamp_list(dataframe.timestamp)
result_series = list(zip(timestamps, data.values.tolist()))
if condition in ['>', '>=', '=']:
return ProcessingResult(upper_bound = result_series)
if condition in ['<', '<=']:
return ProcessingResult(lower_bound = result_series)
raise ValueError(f'{condition} condition not supported')

9
analytics/analytics/models/__init__.py

@ -1,9 +0,0 @@
from models.model import Model, ModelState, AnalyticSegment, ModelType, ExtremumType
from models.triangle_model import TriangleModel, TriangleModelState
from models.stair_model import StairModel, StairModelState
from models.drop_model import DropModel
from models.peak_model import PeakModel
from models.jump_model import JumpModel
from models.custom_model import CustomModel
from models.trough_model import TroughModel
from models.general_model import GeneralModel, GeneralModelState

30
analytics/analytics/models/custom_model.py

@ -1,30 +0,0 @@
from models import Model, AnalyticSegment, ModelState, ModelType
from analytic_types import AnalyticUnitId, ModelCache
from analytic_types.learning_info import LearningInfo
import utils
import pandas as pd
from typing import List, Optional
class CustomModel(Model):
def do_fit(
self,
dataframe: pd.DataFrame,
labeled_segments: List[AnalyticSegment],
deleted_segments: List[AnalyticSegment],
learning_info: LearningInfo
) -> None:
pass
def do_detect(self, dataframe: pd.DataFrame) -> list:
return []
def find_segment_center(self, dataframe: pd.DataFrame, start: int, end: int) -> int:
pass
def get_model_type(self) -> ModelType:
pass
def get_state(self, cache: Optional[ModelCache] = None) -> ModelState:
pass

9
analytics/analytics/models/drop_model.py

@ -1,9 +0,0 @@
from models import StairModel, ModelType, ExtremumType
class DropModel(StairModel):
def get_model_type(self) -> ModelType:
return ModelType.DROP
def get_extremum_type(self) -> ExtremumType:
return ExtremumType.MIN

104
analytics/analytics/models/general_model.py

@ -1,104 +0,0 @@
from analytic_types import AnalyticUnitId
from models import Model, ModelState, AnalyticSegment, ModelType
from typing import Union, List, Generator
import utils
import utils.meta
import numpy as np
import pandas as pd