Browse Source

Labeling for anomalies #631 (#655)

pull/1/head
Evgeny Smyshlyaev 6 years ago committed by rozetko
parent
commit
e0dde3c8ba
  1. 2
      analytics/analytics/analytic_unit_manager.py
  2. 125
      analytics/analytics/detectors/anomaly_detector.py
  3. 2
      analytics/analytics/detectors/detector.py
  4. 2
      analytics/analytics/utils/time.py
  5. 59
      server/src/controllers/analytics_controller.ts
  6. 17
      server/src/models/analytic_units/anomaly_analytic_unit_model.ts
  7. 1
      server/src/models/segment_model.ts
  8. 4
      server/src/services/analytics_service.ts

2
analytics/analytics/analytic_unit_manager.py

@ -57,7 +57,7 @@ class AnalyticUnitManager:
payload = task['payload']
worker = self.__ensure_worker(analytic_unit_id, payload['detector'], payload['analyticUnitType'])
data = payload['data']
data = payload.get('data')
if task['type'] == 'PUSH':
# TODO: do it a better way
res = await worker.consume_data(data, payload['cache'])

125
analytics/analytics/detectors/anomaly_detector.py

@ -12,6 +12,7 @@ import utils
MAX_DEPENDENCY_LEVEL = 100
MIN_DEPENDENCY_FACTOR = 0.1
BASIC_ALPHA = 0.5
logger = logging.getLogger('ANOMALY_DETECTOR')
@ -22,24 +23,83 @@ class AnomalyDetector(ProcessingDetector):
self.bucket = DataBucket()
def train(self, dataframe: pd.DataFrame, payload: Union[list, dict], cache: Optional[ModelCache]) -> ModelCache:
return {
'cache': {
segments = payload.get('segments')
prepared_segments = []
new_cache = {
'confidence': payload['confidence'],
'alpha': payload['alpha']
}
if segments is not None:
seasonality = payload.get('seasonality')
assert seasonality is not None and seasonality > 0, \
f'{self.analytic_unit_id} got invalid seasonality {seasonality}'
for segment in segments:
segment_len = (int(segment['to']) - int(segment['from']))
assert segment_len <= seasonality, \
f'seasonality {seasonality} must be great then segment length {segment_len}'
from_index = utils.timestamp_to_index(dataframe, pd.to_datetime(segment['from'], unit='ms'))
to_index = utils.timestamp_to_index(dataframe, pd.to_datetime(segment['to'], unit='ms'))
segment_data = dataframe[from_index : to_index]
prepared_segments.append({'from': segment['from'], 'data': segment_data.value.tolist()})
data_start_time = utils.convert_pd_timestamp_to_ms(dataframe['timestamp'][0])
data_second_time = utils.convert_pd_timestamp_to_ms(dataframe['timestamp'][1])
time_step = data_second_time - data_start_time
new_cache['seasonality'] = seasonality
new_cache['segments'] = prepared_segments
new_cache['timeStep'] = time_step
return {
'cache': new_cache
}
# TODO: ModelCache -> ModelState
def detect(self, dataframe: pd.DataFrame, cache: Optional[ModelCache]) -> DetectionResult:
data = dataframe['value']
segments = cache.get('segments')
last_value = None
if cache is not None:
last_value = cache.get('last_value')
smoothed_data = utils.exponential_smoothing(data, cache['alpha'], last_value)
# TODO: use class for cache to avoid using string literals
upper_bound = smoothed_data + cache['confidence']
lower_bound = smoothed_data - cache['confidence']
if segments is not None:
seasonality = cache.get('seasonality')
assert seasonality is not None and seasonality > 0, \
f'{self.analytic_unit_id} got invalid seasonality {seasonality}'
data_start_time = utils.convert_pd_timestamp_to_ms(dataframe['timestamp'][0])
data_second_time = utils.convert_pd_timestamp_to_ms(dataframe['timestamp'][1])
time_step = data_second_time - data_start_time
for segment in segments:
seasonality_offset = (abs(segment['from'] - data_start_time) % seasonality) // time_step
seasonality_index = seasonality // time_step
#TODO: upper and lower bounds for segment_data
segment_data = utils.exponential_smoothing(pd.Series(segment['data']), BASIC_ALPHA)
upper_seasonality_curve = self.add_season_to_data(
smoothed_data, segment_data, seasonality_offset, seasonality_index, True
)
lower_seasonality_curve = self.add_season_to_data(
smoothed_data, segment_data, seasonality_offset, seasonality_index, False
)
assert len(smoothed_data) == len(upper_seasonality_curve), \
f'len smoothed {len(smoothed_data)} != len seasonality {len(upper_seasonality_curve)}'
# TODO: use class for cache to avoid using string literals
upper_bound = upper_seasonality_curve + cache['confidence']
lower_bound = lower_seasonality_curve - cache['confidence']
anomaly_indexes = []
for idx, val in enumerate(data.values):
if val > upper_bound.values[idx] or val < lower_bound.values[idx]:
@ -91,7 +151,11 @@ class AnomalyDetector(ProcessingDetector):
for level in range(1, MAX_DEPENDENCY_LEVEL):
if (1 - cache['alpha']) ** level < MIN_DEPENDENCY_FACTOR:
break
return level
seasonality = 0
if cache.get('segments') is not None and cache['seasonality'] > 0:
seasonality = cache['seasonality'] // cache['timeStep']
return max(level, seasonality)
def concat_detection_results(self, detections: List[DetectionResult]) -> DetectionResult:
result = DetectionResult()
@ -102,15 +166,56 @@ class AnomalyDetector(ProcessingDetector):
result.segments = utils.merge_intersecting_segments(result.segments)
return result
# TODO: ModelCache -> ModelState
def process_data(self, data: pd.DataFrame, cache: ModelCache) -> ProcessingResult:
# TODO: ModelCache -> ModelState (don't use string literals)
def process_data(self, dataframe: pd.DataFrame, cache: ModelCache) -> ProcessingResult:
segments = cache.get('segments')
# TODO: exponential_smoothing should return dataframe with related timestamps
smoothed = utils.exponential_smoothing(data['value'], cache['alpha'], cache.get('lastValue'))
timestamps = utils.convert_series_to_timestamp_list(data.timestamp)
smoothed = utils.exponential_smoothing(dataframe['value'], cache['alpha'], cache.get('lastValue'))
# TODO: remove duplication with detect()
if segments is not None:
seasonality = cache.get('seasonality')
assert seasonality is not None and seasonality > 0, \
f'{self.analytic_unit_id} got invalid seasonality {seasonality}'
data_start_time = utils.convert_pd_timestamp_to_ms(dataframe['timestamp'][0])
time_step = utils.convert_pd_timestamp_to_ms(dataframe['timestamp'][1]) - utils.convert_pd_timestamp_to_ms(dataframe['timestamp'][0])
for segment in segments:
seasonality_offset = (abs(segment['from'] - data_start_time) % seasonality) // time_step
seasonality_index = seasonality // time_step
segment_data = utils.exponential_smoothing(pd.Series(segment['data']), BASIC_ALPHA)
upper_seasonality_curve = self.add_season_to_data(
smoothed, segment_data, seasonality_offset, seasonality_index, True
)
lower_seasonality_curve = self.add_season_to_data(
smoothed, segment_data, seasonality_offset, seasonality_index, False
)
assert len(smoothed) == len(upper_seasonality_curve), \
f'len smoothed {len(smoothed)} != len seasonality {len(upper_seasonality_curve)}'
smoothed = upper_seasonality_curve
# TODO: support multiple segments
upper_bound = upper_seasonality_curve + cache['confidence']
lower_bound = lower_seasonality_curve - cache['confidence']
timestamps = utils.convert_series_to_timestamp_list(dataframe.timestamp)
smoothed_dataset = list(zip(timestamps, smoothed.values.tolist()))
result = ProcessingResult(smoothed_dataset)
return result
def merge_segments(self, segments: List[Segment]) -> List[Segment]:
segments = utils.merge_intersecting_segments(segments)
return segments
def add_season_to_data(self, data: pd.Series, segment: pd.Series, offset: int, seasonality: int, addition: bool) -> pd.Series:
#data - smoothed data to which seasonality will be added
#if addition == True -> segment is added
#if addition == False -> segment is subtracted
len_smoothed_data = len(data)
for idx, _ in enumerate(data):
if idx - offset < 0:
continue
if (idx - offset) % seasonality == 0:
if addition:
data = data.add(pd.Series(segment.values, index = segment.index + idx), fill_value = 0)
else:
data = data.add(pd.Series(segment.values * -1, index = segment.index + idx), fill_value = 0)
return data[:len_smoothed_data]

2
analytics/analytics/detectors/detector.py

@ -39,8 +39,6 @@ class Detector(ABC):
result.cache = detection.cache
return result
def merge_segments(self, segments: List[Segment]) -> List[Segment]:
return segments
class ProcessingDetector(Detector):

2
analytics/analytics/utils/time.py

@ -6,7 +6,7 @@ def convert_sec_to_ms(sec) -> int:
def convert_pd_timestamp_to_ms(timestamp: pd.Timestamp) -> int:
# TODO: convert from nanoseconds to millisecond in a better way: not by dividing by 10^6
return int(timestamp.value) / 1000000
return int(timestamp.value) // 1000000
def convert_series_to_timestamp_list(series: pd.Series) -> List[int]:
timestamps = map(lambda value: convert_pd_timestamp_to_ms(value), series)

59
server/src/controllers/analytics_controller.ts

@ -117,9 +117,11 @@ async function getQueryRange(
analyticUnitId: AnalyticUnit.AnalyticUnitId,
detectorType: AnalyticUnit.DetectorType
): Promise<TimeRange> {
if(detectorType === AnalyticUnit.DetectorType.PATTERN) {
// TODO: find labeled OR deleted segments to generate timerange
const segments = await Segment.findMany(analyticUnitId, { labeled: true });
if(
detectorType === AnalyticUnit.DetectorType.PATTERN ||
detectorType === AnalyticUnit.DetectorType.ANOMALY
) {
const segments = await Segment.findMany(analyticUnitId, { $or: { labeled: true, deleted: true } });
if(segments.length === 0) {
throw new Error('Need at least 1 labeled segment');
}
@ -127,10 +129,7 @@ async function getQueryRange(
return getQueryRangeForLearningBySegments(segments);
}
if(
detectorType === AnalyticUnit.DetectorType.THRESHOLD ||
detectorType === AnalyticUnit.DetectorType.ANOMALY
) {
if(detectorType === AnalyticUnit.DetectorType.THRESHOLD) {
const now = Date.now();
return {
from: now - 5 * SECONDS_IN_MINUTE * 1000,
@ -199,7 +198,7 @@ function getQueryRangeForLearningBySegments(segments: Segment.Segment[]) {
}
export async function runLearning(id: AnalyticUnit.AnalyticUnitId, from?: number, to?: number) {
console.log('learning started...');
console.log(`LEARNING started for ${id}`);
try {
let analyticUnit = await AnalyticUnit.findById(id);
@ -235,6 +234,7 @@ export async function runLearning(id: AnalyticUnit.AnalyticUnitId, from?: number
let deletedSegmentsObjs = deletedSegments.map(s => s.toObject());
segmentObjs = _.concat(segmentObjs, deletedSegmentsObjs);
taskPayload.segments = segmentObjs;
taskPayload.data = await getPayloadData(analyticUnit, from, to);
break;
case AnalyticUnit.DetectorType.THRESHOLD:
taskPayload.threshold = {
@ -247,24 +247,30 @@ export async function runLearning(id: AnalyticUnit.AnalyticUnitId, from?: number
alpha: (analyticUnit as AnomalyAnalyticUnit).alpha,
confidence: (analyticUnit as AnomalyAnalyticUnit).confidence
};
const seasonality = (analyticUnit as AnomalyAnalyticUnit).seasonality;
if(seasonality > 0) {
let segments = await Segment.findMany(id, { deleted: true });
if(segments.length === 0) {
console.log('Need at least 1 labeled segment, ignore seasonality');
break;
default:
throw new Error(`Unknown type of detector: ${detector}`);
}
taskPayload.anomaly.seasonality = seasonality;
let range: TimeRange;
if(from !== undefined && to !== undefined) {
range = { from, to };
} else {
range = await getQueryRange(id, detector);
let segmentObjs = segments.map(s => s.toObject());
taskPayload.anomaly.segments = segmentObjs;
taskPayload.data = await getPayloadData(analyticUnit, from, to);
}
break;
default:
throw new Error(`Unknown type of detector: ${detector}`);
}
taskPayload.data = await query(analyticUnit, range);
let task = new AnalyticsTask(
id, AnalyticsTaskType.LEARN, taskPayload
);
AnalyticUnit.setStatus(id, AnalyticUnit.AnalyticUnitStatus.LEARNING);
console.log(`run task, id:${id}`);
console.log(`run ${task.type} task, id:${id}`);
let result = await runTask(task);
if(result.status !== AnalyticUnit.AnalyticUnitStatus.SUCCESS) {
throw new Error(result.error);
@ -502,11 +508,6 @@ export async function runLearningWithDetection(
): Promise<void> {
// TODO: move setting status somehow "inside" learning
await AnalyticUnit.setStatus(id, AnalyticUnit.AnalyticUnitStatus.PENDING);
const foundSegments = await Segment.findMany(id, { labeled: false, deleted: false });
if(foundSegments !== null) {
await Segment.removeSegments(foundSegments.map(segment => segment.id));
}
await Detection.clearSpans(id);
runLearning(id, from, to)
.then(() => runDetect(id, from, to))
.catch(err => console.error(err));
@ -559,6 +560,20 @@ export async function getDetectionSpans(
return _.concat(readySpans, alreadyRunningSpans, newRunningSpans.filter(span => span !== null));
}
async function getPayloadData(
analyticUnit: AnalyticUnit.AnalyticUnit,
from: number,
to:number
) {
let range: TimeRange;
if(from !== undefined && to !== undefined) {
range = { from, to };
} else {
range = await getQueryRange(analyticUnit.id, analyticUnit.detectorType);
}
return await query(analyticUnit, range);
}
async function runDetectionOnExtendedSpan(
analyticUnitId: AnalyticUnit.AnalyticUnitId,
from: number,

17
server/src/models/analytic_units/anomaly_analytic_unit_model.ts

@ -3,7 +3,10 @@ import { AnalyticUnitId, AnalyticUnitStatus, DetectorType } from './types';
import { Metric } from 'grafana-datasource-kit';
type SeasonalityPeriod = {
unit: string,
value: number
}
export class AnomalyAnalyticUnit extends AnalyticUnit {
public learningAfterUpdateRequired = true;
@ -15,6 +18,8 @@ export class AnomalyAnalyticUnit extends AnalyticUnit {
type: string,
public alpha: number,
public confidence: number,
public seasonality: number, //seasonality in ms
private seasonalityPeriod: SeasonalityPeriod,
metric?: Metric,
alert?: boolean,
id?: AnalyticUnitId,
@ -48,7 +53,9 @@ export class AnomalyAnalyticUnit extends AnalyticUnit {
return {
...baseObject,
alpha: this.alpha,
confidence: this.confidence
confidence: this.confidence,
seasonality: this.seasonality,
seasonalityPeriod: this.seasonalityPeriod
};
}
@ -57,7 +64,9 @@ export class AnomalyAnalyticUnit extends AnalyticUnit {
return {
...baseObject,
alpha: this.alpha,
confidence: this.confidence
confidence: this.confidence,
seasonality: this.seasonality,
seasonalityPeriod: this.seasonalityPeriod
};
}
@ -75,6 +84,8 @@ export class AnomalyAnalyticUnit extends AnalyticUnit {
obj.type,
obj.alpha,
obj.confidence,
obj.seasonality,
obj.seasonalityPeriod,
metric,
obj.alert,
obj._id,

1
server/src/models/segment_model.ts

@ -68,6 +68,7 @@ export class Segment {
}
export type FindManyQuery = {
$or?: any,
timeFromGTE?: number,
timeToLTE?: number,
intexGT?: number,

4
server/src/services/analytics_service.ts

@ -212,9 +212,9 @@ export class AnalyticsService {
try {
response = JSON.parse(text);
} catch (e) {
console.error("Can`t parse response from analytics as json:");
console.error('Can`t parse response from analytics as json:');
console.error(text);
throw new Error('Unexpected response');
throw new Error('Can`t parse response from analytics as json, see log');
}
this._onMessage(AnalyticsMessage.fromObject(response));
}

Loading…
Cancel
Save