Browse Source

refactor zmq-releated logic from server.py to ServerService

pull/1/head
Alexey Velikiy 6 years ago
parent
commit
29e0243d6b
  1. 37
      analytics/server.py
  2. 42
      analytics/services/server_service.py

37
analytics/server.py

@ -1,18 +1,18 @@
import config import config
import json import json
import logging import logging
import zmq
import zmq.asyncio
import sys import sys
import asyncio import asyncio
import services.server_service
from analytic_unit_worker import AnalyticUnitWorker from analytic_unit_worker import AnalyticUnitWorker
root = logging.getLogger() root = logging.getLogger()
logger = logging.getLogger('SERVER') logger = logging.getLogger('SERVER')
socket = None
worker = None worker = None
server_service = None
root.setLevel(logging.DEBUG) root.setLevel(logging.DEBUG)
@ -24,28 +24,13 @@ ch.setFormatter(formatter)
root.addHandler(ch) root.addHandler(ch)
async def server_handle_loop():
while True:
received_bytes = await socket.recv()
text = received_bytes.decode('utf-8')
if text == 'ping':
asyncio.ensure_future(handle_ping())
else:
asyncio.ensure_future(handle_task(text))
async def server_send_message(string):
await socket.send_string(string)
async def handle_ping():
await socket.send(b'pong')
async def handle_task(text): async def handle_task(text):
try: try:
task = json.loads(text) task = json.loads(text)
logger.info("Command is OK") logger.info("Command is OK")
await server_send_message(json.dumps({ await server_service.send_message(json.dumps({
'_taskId': task['_taskId'], '_taskId': task['_taskId'],
'task': task['type'], 'task': task['type'],
'analyticUnitId': task['analyticUnitId'], 'analyticUnitId': task['analyticUnitId'],
@ -54,19 +39,17 @@ async def handle_task(text):
res = await worker.do_task(task) res = await worker.do_task(task)
res['_taskId'] = task['_taskId'] res['_taskId'] = task['_taskId']
await server_send_message(json.dumps(res)) await server_service.send_message(json.dumps(res))
except Exception as e: except Exception as e:
logger.error("Exception: '%s'" % str(e)) logger.error("Exception: '%s'" % str(e))
if __name__ == "__main__": if __name__ == "__main__":
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
logger.info("Starting worker...")
worker = AnalyticUnitWorker() worker = AnalyticUnitWorker()
logger.info("Worker was started")
logger.info("Binding to %s ..." % config.ZEROMQ_CONNECTION_STRING)
context = zmq.asyncio.Context()
socket = context.socket(zmq.PAIR)
socket.bind(config.ZEROMQ_CONNECTION_STRING)
logger.info("Ok") logger.info("Ok")
loop.run_until_complete(server_handle_loop()) logger.info("Starting server...")
server_service = services.server_service.ServerService(handle_task)
logger.info("Ok")
loop.run_until_complete(server_service.handle_loop())

42
analytics/services/server_service.py

@ -0,0 +1,42 @@
import config
import zmq
import zmq.asyncio
import logging
import asyncio
logger = logging.getLogger('SERVER_SERVICE')
class ServerService:
def __init__(self, on_message_handler):
self.on_message_handler = on_message_handler
logger.info("Binding to %s ..." % config.ZEROMQ_CONNECTION_STRING)
self.context = zmq.asyncio.Context()
self.socket = self.context.socket(zmq.PAIR)
self.socket.bind(config.ZEROMQ_CONNECTION_STRING)
async def handle_loop(self):
while True:
received_bytes = await self.socket.recv()
text = received_bytes.decode('utf-8')
if text == 'ping':
asyncio.ensure_future(self.__handle_ping())
else:
asyncio.ensure_future(self.__handle_message(text))
async def send_message(self, string):
await self.socket.send_string(string)
async def __handle_ping(self):
await self.socket.send(b'pong')
async def __handle_message(self, text):
try:
asyncio.ensure_future(self.on_message_handler(text))
except Exception as e:
logger.error("Exception: '%s'" % str(e))
Loading…
Cancel
Save