'''
Created on Aug 1, 2013
@author: sean
'''
import signal
import traceback
import sys
import logging
from datetime import datetime
from bson.errors import InvalidId
from bson.objectid import ObjectId
from contextlib import contextmanager
import io
import pytz
from mtq import errors
class ImportStringError(Exception):
pass
is_py3 = lambda: sys.version_info.major >= 3
def is_str(obj):
if is_py3():
return isinstance(obj, str)
else:
return isinstance(obj, basestring)
def is_unicode(obj):
if is_py3():
return isinstance(obj, str)
else:
return isinstance(obj, unicode)
[docs]def handle_signals():
'''
Handle signals in multiprocess.Process threads
'''
def handler(signum, frame):
signal.signal(signal.SIGINT, signal.default_int_handler)
def raise_timeout(signum, frame):
raise errors.Timeout()
def term_handler(signum, frame):
traceback.print_stack(frame)
signal.signal(signal.SIGTERM, signal.SIG_DFL)
raise SystemExit(-signum)
signal.signal(signal.SIGINT, handler)
signal.signal(signal.SIGTERM, term_handler)
signal.signal(signal.SIGALRM, raise_timeout)
[docs]def import_string(import_name, silent=False):
"""Imports an object based on a string. This is useful if you want to
use import paths as endpoints or something similar. An import path can
be specified either in dotted notation (``xml.sax.saxutils.escape``)
or with a colon as object delimiter (``xml.sax.saxutils:escape``).
If `silent` is True the return value will be `None` if the import fails.
For better debugging we recommend the new :func:`import_module`
function to be used instead.
:param import_name: the dotted name for the object to import.
:param silent: if set to `True` import errors are ignored and
`None` is returned instead.
:return: imported object
"""
# force the import name to automatically convert to strings
if is_unicode(import_name):
import_name = str(import_name)
try:
if ':' in import_name:
module, obj = import_name.split(':', 1)
elif '.' in import_name:
module, obj = import_name.rsplit('.', 1)
else:
return __import__(import_name)
# __import__ is not able to handle unicode strings in the fromlist
# if the module is a package
if is_unicode(obj) and not is_py3():
obj = obj.encode('utf-8')
try:
return getattr(__import__(module, None, None, [obj]), obj)
except (ImportError, AttributeError):
# support importing modules not yet set up by the parent module
# (or package for that matter)
modname = module + '.' + obj
__import__(modname)
return sys.modules[modname]
except ImportError as e:
if not silent:
raise (ImportStringError(import_name, e), None, sys.exc_info()[2])
[docs]def ensure_capped_collection(db, collection_name, size_mb):
'''
'''
if collection_name not in db.collection_names():
db.create_collection(collection_name, capped=True,
size=(1024.**2) * size_mb) # Mb
return db[collection_name]
@contextmanager
def stream_logging(silence=False):
from mtq.log import IOStreamLogger
stdout = sys.stdout
sys.stdout = IOStreamLogger(sys.stdout, silence)
stderr = sys.stderr
sys.stderr = IOStreamLogger(sys.stderr, silence)
yield sys.stdout, sys.stderr
sys.stdout = stdout
sys.stderr = stderr
class UnicodeFormatter(logging.Formatter):
def format(self, record):
msg = logging.Formatter.format(self, record)
# FIXME: should I be doing errors='replace'?
if hasattr(msg, 'decode'):
msg = msg.decode(errors='replace')
return msg
mgs_template = """Job %s exited with exception:
Job Log:
| %s
"""
@contextmanager
def setup_logging2(worker_id, job_id, lognames=()):
record = io.StringIO()
record_hndlr = logging.StreamHandler(record)
record_hndlr.setFormatter(UnicodeFormatter())
record_hndlr.setLevel(logging.INFO)
logger = logging.getLogger('job')
logger.setLevel(logging.INFO)
loggers = [logger] + [logging.getLogger(name) for name in lognames]
[l.addHandler(record_hndlr) for l in loggers]
logger.info('Starting Job %s' % job_id)
try:
yield loggers
except:
text = record.getvalue().replace('\n', '\n | ')
msg = mgs_template % (job_id, text,)
logger.exception(msg)
raise
else:
logger.info("Job %s finished successfully" % (job_id,))
finally:
pass
[docs]def setup_logging(worker_id, job_id, silence=False):
'''
set up logging for worker
'''
from mtq.log import mstream, MongoHandler
doc = {'worker_id':worker_id, 'job_id':job_id}
sys.stdout = mstream(collection, doc.copy(), sys.stdout, silence)
sys.sterr = mstream(collection, doc.copy(), sys.stderr, silence)
logger = logging.getLogger('job')
logger.setLevel(logging.INFO)
hndlr = MongoHandler(collection, doc.copy())
logger.addHandler(hndlr)
def now():
now = datetime.utcnow()
return now.replace(tzinfo=pytz.utc)
def nulltime():
dt = datetime.utcfromtimestamp(0)
return dt.replace(tzinfo=pytz.utc)
def config_dict(filename):
config = {}
if filename:
return vars(import_string(filename))
return config
def object_id(oid):
try:
return ObjectId(oid)
except InvalidId:
raise TypeError()
def wait_times(conn):
coll = conn.queue_collection
wait = { '$avg': { '$subtract':['$started_at_', '$enqueued_at_'] } }
raw = coll.aggregate([{'$match':{'processed':True}}, {'$group':{'_id':'$qname', 'wait': wait } } ])
result = raw['result']
return {item['_id']:item['wait'] for item in result}
def job_stats(conn, group_by='$execute.func_str', since=None):
coll = conn.queue_collection
duration = { '$avg': { '$subtract':['$finished_at_', '$started_at_'] } }
wait = { '$avg': { '$subtract':['$started_at_', '$enqueued_at_'] } }
count = {'$sum': 1}
failed = {'$sum': {'$cmp':['$failed', False]}}
queues = {'$addToSet': '$qname'}
tags = {'$addToSet': '$push'}
erliest = {'$min': '$finished_at'}
latest = {'$max': '$finished_at'}
match = {'$match':{'finished':True}}
if since:
match['$match']['finished_at'] = {'$gt':since}
raw = coll.aggregate([match, {'$group':{'_id':group_by,
'duration': duration,
'wait_in_queue': wait,
'count': count,
'queues': queues,
'tags': tags,
'failed':failed,
'latest':latest,
'erliest':erliest,
} }
])
result = raw['result']
return {item.pop('_id'):item for item in result}
def shutdown_worker(conn, worker_id=None):
coll = conn.worker_collection
query = {}
if worker_id:
query['_id'] = worker_id
else:
query = {'working':True}
print(coll.update(query, {'$set':{'terminate':True}}, multi=True))
def last_job(conn, worker_id):
coll = conn.queue_collection
cursor = coll.find({'worker_id': worker_id}).sort('enqueued_at', -1)
doc = next(cursor, None)
if doc:
from mtq.job import Job
return Job(conn, doc)