You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
96 lines
2.5 KiB
96 lines
2.5 KiB
6 years ago
|
#!/usr/bin/env python3
|
||
|
|
||
|
import sys
|
||
|
import os
|
||
|
|
||
|
#TODO: make wrapper script that set PYTHONPATH instead
|
||
|
sys.path.append(os.path.join(os.path.dirname(__file__), '..', 'analytics'))
|
||
|
|
||
7 years ago
|
import config
|
||
|
import json
|
||
|
import logging
|
||
|
import asyncio
|
||
6 years ago
|
import traceback
|
||
7 years ago
|
|
||
|
import services
|
||
6 years ago
|
from analytic_unit_manager import handle_analytic_task
|
||
7 years ago
|
|
||
|
|
||
|
root = logging.getLogger()
|
||
|
logger = logging.getLogger('SERVER')
|
||
|
|
||
6 years ago
|
|
||
7 years ago
|
server_service: services.ServerService = None
|
||
|
data_service: services.DataService = None
|
||
7 years ago
|
|
||
|
root.setLevel(logging.DEBUG)
|
||
|
|
||
7 years ago
|
|
||
6 years ago
|
logging_formatter = logging.Formatter("%(asctime)s [Analytics] [%(levelname)-5.5s] %(message)s")
|
||
7 years ago
|
|
||
|
logging_handler = logging.StreamHandler(sys.stdout)
|
||
|
#logging_handler = logging.FileHandler(config.DATA_FOLDER + '/analytics.log')
|
||
|
logging_handler.setLevel(logging.DEBUG)
|
||
|
logging_handler.setFormatter(logging_formatter)
|
||
|
|
||
|
root.addHandler(logging_handler)
|
||
7 years ago
|
|
||
|
|
||
6 years ago
|
|
||
|
|
||
7 years ago
|
async def handle_task(task: object):
|
||
7 years ago
|
try:
|
||
7 years ago
|
|
||
7 years ago
|
logger.info("Command is OK")
|
||
|
|
||
6 years ago
|
task_result_payload = {
|
||
|
'_id': task['_id'],
|
||
7 years ago
|
'task': task['type'],
|
||
|
'analyticUnitId': task['analyticUnitId'],
|
||
7 years ago
|
'status': "IN_PROGRESS"
|
||
|
}
|
||
7 years ago
|
|
||
6 years ago
|
message = services.server_service.ServerMessage('TASK_RESULT', task_result_payload)
|
||
7 years ago
|
await server_service.send_message(message)
|
||
7 years ago
|
|
||
6 years ago
|
res = await handle_analytic_task(task)
|
||
6 years ago
|
res['_id'] = task['_id']
|
||
6 years ago
|
|
||
7 years ago
|
message = services.server_service.ServerMessage('TASK_RESULT', res)
|
||
|
await server_service.send_message(message)
|
||
7 years ago
|
|
||
|
except Exception as e:
|
||
6 years ago
|
error_text = traceback.format_exc()
|
||
|
logger.error("handle_task Exception: '%s'" % error_text)
|
||
7 years ago
|
|
||
|
async def handle_message(message: services.ServerMessage):
|
||
|
payload = None
|
||
7 years ago
|
if message.method == 'TASK':
|
||
|
await handle_task(message.payload)
|
||
6 years ago
|
|
||
7 years ago
|
|
||
|
def init_services():
|
||
|
logger.info("Starting services...")
|
||
|
logger.info("Server...")
|
||
7 years ago
|
server_service = services.ServerService(handle_message)
|
||
7 years ago
|
logger.info("Ok")
|
||
|
logger.info("Data service...")
|
||
|
data_service = services.DataService(server_service)
|
||
|
logger.info("Ok")
|
||
|
|
||
|
return server_service, data_service
|
||
|
|
||
7 years ago
|
async def app_loop():
|
||
7 years ago
|
await server_service.handle_loop()
|
||
7 years ago
|
# await asyncio.gather(server_service.handle_loop(), test_file_save())
|
||
|
|
||
|
|
||
7 years ago
|
if __name__ == "__main__":
|
||
|
loop = asyncio.get_event_loop()
|
||
|
logger.info("Ok")
|
||
|
server_service, data_service = init_services()
|
||
7 years ago
|
print('Analytics process is running') # we need to print to stdout and flush
|
||
|
sys.stdout.flush() # because node.js expects it
|
||
7 years ago
|
|
||
|
loop.run_until_complete(app_loop())
|