Source code for mtq.worker

from contextlib import contextmanager
from datetime import datetime
import getpass
import logging
from multiprocessing import Process
import os
import platform
import signal
import sys
import time

from mtq.log import MongoStream, MongoHandler
from mtq.utils import handle_signals, now, setup_logging2, nulltime


[docs]class Worker(object): ''' Should create a worker from MTQConnection.new_worker ''' def __init__(self, factory, queues=(), tags=(), priority=0, poll_interval=1, exception_handler=None, log_worker_output=False, silence=False, extra_lognames=()): self.name = '%s.%s' % (platform.node(), os.getpid()) self.extra_lognames = extra_lognames self.queues = queues self.tags = tags self.priority = priority self._log_worker_output = log_worker_output self.factory = factory self.poll_interval = poll_interval self.logger = logging.getLogger('mq.Worker') self._current = None self._handler = exception_handler self._pre_call = None self._post_call = None self.silence = silence self.collection = self.factory.worker_collection worker_id = '-' @contextmanager
[docs] def register(self): ''' Internal Contextmanager, register the birth and death of this worker eg:: with worker.register(): # Work ''' self.collection = self.factory.worker_collection self.worker_id = self.collection.insert({'name': self.name, 'host': platform.node(), 'system': platform.system(), 'pid': os.getgid(), 'user': getpass.getuser(), 'started':now(), 'finished':datetime.fromtimestamp(0), 'check-in':datetime.fromtimestamp(0), 'working':True, 'queues': self.queues, 'tags': self.tags, 'log_output': bool(self._log_worker_output), 'terminate': False, 'terminate_status': 0, }) if self._log_worker_output: hdlr = MongoHandler(self.factory.logging_collection, {'worker_id':self.worker_id}) self.logger.addHandler(hdlr) try: yield self.worker_id finally: if self._log_worker_output: self.logger.removeHandler(hdlr) query = {'_id': self.worker_id} update = {'$set':{'finished':now(), 'working':False}} self.collection.update(query, update)
def check_in(self): query = {'_id': self.worker_id} update = {'$set':{'check-in':now(), 'working':True}} worker_info = self.collection.find_and_modify(query, update) if worker_info: should_exit = worker_info.get('terminate', False) status = worker_info.get('terminate_status', 0) return should_exit, status return False, 0
[docs] def work(self, one=False, batch=False, failed=False, fail_fast=False): ''' Main work function :param one: wait for the first job execute and then exit :param batch: work until the queue is empty, then exit ''' with self.register(): try: self.start_main_loop(one, batch, failed, fail_fast) except KeyboardInterrupt: self.logger.exception(None) if not self._current: return self.logger.warn('Warm shutdown requested') proc, job = self._current proc.join(timeout=job.doc.get('timeout')) return
def pop_item(self, pop_failed=False): job = self.factory.pop_item(worker_id=self.worker_id, queues=self.queues, tags=self.tags, failed=pop_failed, ) return job
[docs] def start_main_loop(self, one=False, batch=False, pop_failed=False, fail_fast=False): ''' Start the main loop and process jobs ''' self.logger.info('Starting Main Loop mogno-host=%s mongo-db=%s' % (self.factory.db.connection.host, self.factory.db.name)) self.logger.info('Starting Main Loop worker=%s _id=%s' % (self.name, self.worker_id)) self.logger.info('Listening for jobs queues=[%s] tags=[%s]' % (', '.join(self.queues), ', '.join(self.tags))) while 1: try: should_exit, status = self.check_in() if should_exit: self.logger.info("Shutdown Requested (from DB)") raise SystemExit(status) job = self.pop_item(pop_failed=pop_failed) if job is None: if batch: break time.sleep(self.poll_interval) continue self.process_job(job) if one: break self.logger.info('Listening for jobs queues=[%s] tags=[%s]' % (', '.join(self.queues), ', '.join(self.tags))) except Exception as err: if fail_fast: raise else: self.logger.exception(err) if one: break else: continue self.logger.info('Exiting Main Loop')
[docs] def process_job(self, job): ''' Process a single job in a multiprocessing.Process ''' self.logger.info('Popped Job _id=%s queue=%s tags=%s' % (job.id, job.qname, ', '.join(job.tags))) self.logger.info(job.call_str) proc = Process(target=self._process_job, args=(job,)) self._current = proc, job proc.start() timeout = job.doc.get('timeout') if timeout: self.logger.info("Job started, timing out after %s seconds" % timeout) else: self.logger.info("Job started, no time out") proc.join(timeout=job.doc.get('timeout')) if proc.is_alive(): self.logger.error('Timeout occurred: interrupting job') os.kill(proc.pid, signal.SIGALRM) # Give the process 2 min to finish proc.join(timeout=min(job.doc.get('timeout'), 2 * 60)) if proc.is_alive(): self.logger.error('Process did not shut down after interrupt: terminating job') proc.terminate() self._current = None failed = proc.exitcode != 0 if failed: self.logger.error('Job %s failed' % (job.doc['_id'])) else: self.logger.info('Job %s finished successfully' % (job.doc['_id'])) job.set_finished(failed) return failed
def _process_job(self, job): ''' ''' handle_signals() with setup_logging2(self.worker_id, job.id, lognames=self.extra_lognames): try: self._pre(job) job.apply() except: if self._handler: exc_type, exc_value, traceback = sys.exc_info() self._handler(job, exc_type, exc_value, traceback) raise finally: self._post(job) def _pre(self, job): if self._pre_call: self._pre_call(job) def _post(self, job): if self._post_call: self._post_call(job) def set_pre(self, func): self._pre_call = func def set_post(self, func): self._post_call = func def push_exception_handler(self, handler): self._handler = handler @property
[docs] def num_backlog(self): 'number of tasks this worker has to complete' return self.factory._items_cursor(queues=self.queues, tags=self.tags, ).count()
[docs]class WorkerProxy(object): 'This is a representation of an actual worker process' def __init__(self, factory, doc): self.factory = factory self.doc = doc @property def id(self): return self.doc['_id'] @property def name(self): return self.doc['name'] @property def qnames(self): return self.doc.get('queues', ()) @property def tags(self): return self.doc['tags'] @property
[docs] def num_processed(self): 'number of tasks this worker has completed' collection = self.factory.queue_collection return collection.find({'worker_id': self.id}).count()
@property
[docs] def num_backlog(self): 'number of tasks this worker has to complete' return self.factory._items_cursor(queues=self.qnames, tags=self.tags, ).count()
@property
[docs] def last_check_in(self): 'last check in time' return self.doc.get('check-in', nulltime())
def stream(self): collection = self.factory.logging_collection return MongoStream(collection, doc={'worker_id': self.id}, finished=self.finished)
[docs] def finished(self): 'test if this worker is finished' coll = self.factory.worker_collection cursor = coll.find({'_id':self.id, 'working':False}) return bool(cursor.count())