Browse Source

Module for messaging on node.js side #30

pull/1/head
Alexey Velikiy 7 years ago
parent
commit
a6567c8e6f
  1. 127
      server/dist/server.js
  2. 6
      server/src/index.ts
  3. 45
      server/src/services/analytics.ts
  4. 41
      server/src/services/analyticsConnection.ts

127
server/dist/server.js vendored

File diff suppressed because one or more lines are too long

6
server/src/index.ts

@ -30,14 +30,14 @@ rootRouter.use('/anomalies', anomaliesRouter.routes(), anomaliesRouter.allowedMe
rootRouter.use('/segments', segmentsRouter.routes(), segmentsRouter.allowedMethods());
rootRouter.use('/alerts', alertsRouter.routes(), alertsRouter.allowedMethods());
rootRouter.get('/', async (ctx) => {
ctx.response.body = { status: 'Ok ok' };
ctx.response.body = { status: 'Ok' };
});
app
.use(rootRouter.routes())
.use(rootRouter.allowedMethods())
.use(rootRouter.allowedMethods());
app.listen(HASTIC_PORT, () => {
console.log(`Server is running on :${HASTIC_PORT}`)
console.log(`Server is running on :${HASTIC_PORT}`);
});

45
server/src/services/analytics.ts

@ -1,5 +1,3 @@
import { spawn } from 'child_process'
import { ANALYTICS_PATH } from '../config'
import {
Anomaly,
AnomalyId, getAnomalyTypeInfo,
@ -8,33 +6,18 @@ import {
setAnomalyStatus
} from './anomalyType'
import { getTarget } from './metrics';
import { getLabeledSegments, insertSegments, removeSegments } from './segments';
import { split, mapSync } from 'event-stream';
import * as fs from 'fs';
import * as path from 'path';
import { getLabeledSegments, insertSegments, removeSegments } from './segments'
import { AnalyticsConnection } from './analyticsConnection'
var learnWorker;
if(fs.existsSync(path.join(ANALYTICS_PATH, 'dist/worker/worker'))) {
learnWorker = spawn('dist/worker/worker', [], { cwd: ANALYTICS_PATH })
} else {
// If compiled analytics script doesn't exist - fallback to regular python
learnWorker = spawn('python3', ['worker.py'], { cwd: ANALYTICS_PATH })
}
learnWorker.stdout.pipe(split()).pipe(mapSync(onMessage));
learnWorker.stderr.on('data', data => console.error(`worker stderr: ${data}`));
const taskMap = {};
let nextTaskId = 0;
function onMessage(data) {
console.log(`worker stdout: ${data}`);
let response = JSON.parse(data);
const analyticsConnection = new AnalyticsConnection(onResponse);
function onResponse(response: any) {
let taskId = response.__task_id;
// let anomalyName = response.anomaly_name;
// let task = response.task;
let status = response.status;
if(status === 'success' || status === 'failed') {
if(taskId in taskMap) {
let resolver = taskMap[taskId];
@ -44,22 +27,22 @@ function onMessage(data) {
}
}
function runTask(task) : Promise<any> {
let anomaly:Anomaly = loadAnomalyById(task.anomaly_id);
async function runTask(task): Promise<any> {
let anomaly: Anomaly = loadAnomalyById(task.anomaly_id);
task.metric = {
datasource: anomaly.metric.datasource,
targets: anomaly.metric.targets.map(t => getTarget(t))
};
task.__task_id = nextTaskId++;
let command = JSON.stringify(task)
learnWorker.stdin.write(`${command}\n`);
return new Promise<Object>((resolve, reject) => {
taskMap[task.__task_id] = resolve
await analyticsConnection.sendMessage(task);
return new Promise<void>((resolve, reject) => {
taskMap[task.__task_id] = resolve;
})
}
async function runLearning(anomalyId:AnomalyId) {
export async function runLearning(anomalyId:AnomalyId) {
let segments = getLabeledSegments(anomalyId);
setAnomalyStatus(anomalyId, 'learning');
let anomaly:Anomaly = loadAnomalyById(anomalyId);
@ -82,7 +65,7 @@ async function runLearning(anomalyId:AnomalyId) {
}
}
async function runPredict(anomalyId:AnomalyId) {
export async function runPredict(anomalyId:AnomalyId) {
let anomaly:Anomaly = loadAnomalyById(anomalyId);
let pattern = anomaly.pattern;
let task = {
@ -112,5 +95,3 @@ async function runPredict(anomalyId:AnomalyId) {
setAnomalyPredictionTime(anomalyId, result.last_prediction_time);
return result.segments;
}
export { runLearning, runPredict }

41
server/src/services/analyticsConnection.ts

@ -0,0 +1,41 @@
import { ANALYTICS_PATH } from '../config'
import { spawn, ChildProcess } from 'child_process'
import { split, mapSync } from 'event-stream';
import * as fs from 'fs';
import * as path from 'path';
export class AnalyticsConnection {
private _learnWorker: ChildProcess;
constructor(private _onResponse: (response: any) => void) {
if(fs.existsSync(path.join(ANALYTICS_PATH, 'dist/worker/worker'))) {
this._learnWorker = spawn('dist/worker/worker', [], { cwd: ANALYTICS_PATH })
} else {
// If compiled analytics script doesn't exist - fallback to regular python
this._learnWorker = spawn('python3', ['worker.py'], { cwd: ANALYTICS_PATH })
}
this._learnWorker.stdout.pipe(
split()).pipe(mapSync(this._onPipeMessage.bind(this))
);
this._learnWorker.stderr.on('data', data => console.error(`worker stderr: ${data}`));
}
private _onPipeMessage(data) {
console.log(`worker stdout: ${data}`);
let response = JSON.parse(data);
this._onResponse(response);
}
public async sendMessage(task: any): Promise<void> {
// return Promise.resolve().then(() => {
let command = JSON.stringify(task);
this._learnWorker.stdin.write(`${command}\n`);
// });
}
}
Loading…
Cancel
Save