From 504283d918ad3cfb10b9d6d4adadc89e3675657d Mon Sep 17 00:00:00 2001 From: rozetko Date: Fri, 14 Sep 2018 16:11:43 +0300 Subject: [PATCH] Anti-segments #142 (#150) Node.js part of #142 - Merge segments while inserting - Add deleted flag to segments --- .../src/controllers/analytics_controller.ts | 6 ++- server/src/models/segment_model.ts | 37 ++++++++++++++- server/src/services/data_service.ts | 46 +++++++++++++++---- 3 files changed, 77 insertions(+), 12 deletions(-) diff --git a/server/src/controllers/analytics_controller.ts b/server/src/controllers/analytics_controller.ts index 0098a8a..6a29fe8 100644 --- a/server/src/controllers/analytics_controller.ts +++ b/server/src/controllers/analytics_controller.ts @@ -210,7 +210,7 @@ function processPredictionResult(analyticUnitId: AnalyticUnit.AnalyticUnitId, ta ); } - let segments = payload.segments.map(segment => new Segment.Segment(analyticUnitId, segment.from, segment.to, false)); + let segments = payload.segments.map(segment => new Segment.Segment(analyticUnitId, segment.from, segment.to, false, false)); return { lastPredictionTime: payload.lastPredictionTime, @@ -240,8 +240,10 @@ export async function updateSegments( ) { let [addedIds, removed] = await Promise.all([ Segment.insertSegments(segmentsToInsert), - Segment.removeSegments(removedIds) + Segment.setSegmentsDeleted(removedIds) ]); + removed = removed.map(s => s._id); + // TODO: move setting status somehow "inside" learning await AnalyticUnit.setStatus(id, AnalyticUnit.AnalyticUnitStatus.PENDING); runLearning(id).then(() => runPredict(id)); diff --git a/server/src/models/segment_model.ts b/server/src/models/segment_model.ts index 56c5330..cb7e67b 100644 --- a/server/src/models/segment_model.ts +++ b/server/src/models/segment_model.ts @@ -2,6 +2,8 @@ import { AnalyticUnitId } from './analytic_unit_model'; import { Collection, makeDBQ } from '../services/data_service'; +import * as _ from 'lodash'; + let db = makeDBQ(Collection.SEGMENTS); @@ -13,6 +15,7 @@ export class Segment { public from: number, public to: number, public labeled: boolean = false, + public deleted: boolean = false, public id?: SegmentId ) { if(analyticUnitId === undefined) { @@ -38,7 +41,8 @@ export class Segment { analyticUnitId: this.analyticUnitId, from: this.from, to: this.to, - labeled: this.labeled + labeled: this.labeled, + deleted: this.deleted }; } @@ -49,7 +53,7 @@ export class Segment { return new Segment( obj.analyticUnitId, +obj.from, +obj.to, - obj.labeled, obj._id + obj.labeled, obj.deleted, obj._id ); } } @@ -80,9 +84,38 @@ export async function findMany(id: AnalyticUnitId, query: FindManyQuery): Promis } export async function insertSegments(segments: Segment[]) { + let segmentIdsToRemove: SegmentId[] = []; + let segmentsToInsert: Segment[] = []; + for(let segment of segments) { + let intersectedSegments = await db.findMany({ + analyticUnitId: segments[0].analyticUnitId, + to: { $gte: segment.from }, + from: { $lte: segment.to }, + labeled: segment.labeled, + deleted: segment.deleted + }); + + if(intersectedSegments.length > 0) { + let from = _.minBy(intersectedSegments, 'from').from; + let to = _.maxBy(intersectedSegments, 'to').to; + let newSegment = Segment.fromObject(segment.toObject()); + newSegment.from = from; + newSegment.to = to; + segmentIdsToRemove = segmentIdsToRemove.concat(intersectedSegments.map(s => s._id)); + segmentsToInsert.push(newSegment); + } else { + segmentsToInsert.push(segment); + } + } + + await db.removeMany(segmentIdsToRemove); return db.insertMany(segments.map(s => s.toObject())); } +export async function setSegmentsDeleted(ids: SegmentId[]) { + return db.updateMany(ids, { deleted: true }); +} + export function removeSegments(idsToRemove: SegmentId[]) { return db.removeMany(idsToRemove); } diff --git a/server/src/services/data_service.ts b/server/src/services/data_service.ts index fa2cfce..51ee14a 100644 --- a/server/src/services/data_service.ts +++ b/server/src/services/data_service.ts @@ -17,7 +17,8 @@ export type DBQ = { findMany: (query: string[] | object) => Promise, insertOne: (document: object) => Promise, insertMany: (documents: object[]) => Promise, - updateOne: (query: string | object, updateQuery: any) => Promise, + updateOne: (query: string | object, updateQuery: any) => Promise, + updateMany: (query: string[] | object, updateQuery: any) => Promise, removeOne: (query: string) => Promise removeMany: (query: string[] | object) => Promise } @@ -29,6 +30,7 @@ export function makeDBQ(collection: Collection): DBQ { insertOne: dbInsertOne.bind(null, collection), insertMany: dbInsertMany.bind(null, collection), updateOne: dbUpdateOne.bind(null, collection), + updateMany: dbUpdateMany.bind(null, collection), removeOne: dbRemoveOne.bind(null, collection), removeMany: dbRemoveMany.bind(null, collection) } @@ -88,14 +90,42 @@ let dbUpdateOne = (collection: Collection, query: string | object, updateQuery: // https://github.com/louischatriot/nedb#updating-documents let nedbUpdateQuery = { $set: updateQuery } query = wrapIdToQuery(query); - return new Promise((resolve, reject) => { - db.get(collection).update(query, nedbUpdateQuery, { /* options */ }, (err: Error) => { - if(err) { - reject(err); - } else { - resolve(); + return new Promise((resolve, reject) => { + db.get(collection).update( + query, + nedbUpdateQuery, + { returnUpdatedDocs: true }, + (err: Error, numAffected: number, affectedDocument: any) => { + if(err) { + reject(err); + } else { + resolve(affectedDocument); + } } - }); + ); + }); +} + +let dbUpdateMany = (collection: Collection, query: string[] | object, updateQuery: object) => { + // https://github.com/louischatriot/nedb#updating-documents + if(isEmptyArray(query)) { + return Promise.resolve([]); + } + let nedbUpdateQuery = { $set: updateQuery }; + query = wrapIdsToQuery(query); + return new Promise((resolve, reject) => { + db.get(collection).update( + query, + nedbUpdateQuery, + { returnUpdatedDocs: true, multi: true }, + (err: Error, numAffected: number, affectedDocuments: any[]) => { + if(err) { + reject(err); + } else { + resolve(affectedDocuments); + } + } + ); }); }