import logging
import sys
import types
import threading
import inspect
from functools import wraps
from itertools import chain
from numba.core import config
class TLS(threading.local):
"""Use a subclass to properly initialize the TLS variables in all threads."""
def __init__(self):
self.tracing = False
self.indent = 0
tls = TLS()
def find_function_info(func, spec, args):
"""Return function meta-data in a tuple.
(name, type)"""
module = getattr(func, '__module__', None)
name = getattr(func, '__name__', None)
self = getattr(func, '__self__', None)
cname = None
if self:
cname = self.__name__
#cname = self.__class__.__name__
# Try to deduce the class' name even for unbound methods from their
# first argument, which we assume to be a class instance if named 'self'...
elif len(spec.args) and spec.args[0] == 'self':
cname = args[0].__class__.__name__
# ...or a class object if named 'cls'
elif len(spec.args) and spec.args[0] == 'cls':
cname = args[0].__name__
if name:
qname = []
if module and module != '__main__':
qname.append(module)
qname.append('.')
if cname:
qname.append(cname)
qname.append('.')
qname.append(name)
name = ''.join(qname)
return name, None
def chop(value):
MAX_SIZE = 320
s = repr(value)
if len(s) > MAX_SIZE:
return s[:MAX_SIZE] + '...' + s[-1]
else:
return s
def create_events(fname, spec, args, kwds):
values = dict()
if spec.defaults:
values = dict(zip(spec.args[-len(spec.defaults):],spec.defaults))
values.update(kwds)
values.update(list(zip(spec.args[:len(args)], args)))
positional = ['%s=%r'%(a, values.pop(a)) for a in spec.args]
anonymous = [str(a) for a in args[len(positional):]]
keywords = ['%s=%r'%(k, values[k]) for k in sorted(values.keys())]
params = ', '.join([f for f in chain(positional, anonymous, keywords) if f])
enter = ['>> ', tls.indent * ' ', fname, '(', params, ')']
leave = ['<< ', tls.indent * ' ', fname]
return enter, leave
def dotrace(*args, **kwds):
"""Function decorator to trace a function's entry and exit.
*args: categories in which to trace this function. Example usage:
@trace
def function(...):...
@trace('mycategory')
def function(...):...
"""
recursive = kwds.get('recursive', False)
def decorator(func):
spec = None
logger = logging.getLogger('trace')
def wrapper(*args, **kwds):
if not logger.isEnabledFor(logging.INFO) or tls.tracing:
return func(*args, **kwds)
fname, ftype = find_function_info(func, spec, args)
try:
tls.tracing = True
enter, leave = create_events(fname, spec, args, kwds)
try:
logger.info(''.join(enter))
tls.indent += 1
try:
try:
tls.tracing = False
result = func(*args, **kwds)
finally:
tls.tracing = True
except:
type, value, traceback = sys.exc_info()
leave.append(' => exception thrown\n\traise ')
mname = type.__module__
if mname != '__main__':
leave.append(mname)
leave.append('.')
leave.append(type.__name__)
if value.args:
leave.append('(')
leave.append(', '.join(chop(v) for v in value.args))
leave.append(')')
else:
leave.append('()')
raise
else:
if result is not None:
leave.append(' -> ')
leave.append(chop(result))
finally:
tls.indent -= 1
logger.info(''.join(leave))
finally:
tls.tracing = False
return result
# wrapper end
result = None
rewrap = lambda x: x
# Unwrap already wrapped functions
# (to be rewrapped again later)
if type(func) == classmethod:
rewrap = type(func)
# Note: 'func.__func__' only works in Python 3
func = func.__get__(True).__func__
elif type(func) == staticmethod:
rewrap = type(func)
# Note: 'func.__func__' only works in Python 3
func = func.__get__(True)
elif type(func) == property:
raise NotImplementedError
spec = inspect.getfullargspec(func)
return rewrap(wraps(func)(wrapper))
arg0 = len(args) and args[0] or None
# not supported yet...
if recursive:
raise NotImplementedError
if inspect.ismodule(arg0):
for n, f in inspect.getmembers(arg0, inspect.isfunction):
setattr(arg0, n, decorator(f))
for n, c in inspect.getmembers(arg0, inspect.isclass):
dotrace(c, *args, recursive=recursive)
elif inspect.isclass(arg0):
for n, f in inspect.getmembers(arg0, lambda x: (inspect.isfunction(x) or
inspect.ismethod(x))):
setattr(arg0, n, decorator(f))
if callable(arg0) or type(arg0) in (classmethod, staticmethod):
return decorator(arg0)
elif type(arg0) == property:
# properties combine up to three functions: 'get', 'set', 'del',
# so let's wrap them all.
pget, pset, pdel = None, None, None
if arg0.fget:
pget = decorator(arg0.fget)
if arg0.fset:
pset = decorator(arg0.fset)
if arg0.fdel:
pdel = decorator(arg0.fdel)
return property(pget, pset, pdel)
else:
return decorator
def notrace(*args, **kwds):
"""Just a no-op in case tracing is disabled."""
def decorator(func):
return func
arg0 = len(args) and args[0] or None
if callable(arg0) or type(arg0) in (classmethod, staticmethod):
return decorator(arg0)
else:
return decorator
def doevent(msg):
msg = ['== ', tls.indent * ' ', msg]
logger = logging.getLogger('trace')
logger.info(''.join(msg))
def noevent(msg):
pass
if config.TRACE:
logger = logging.getLogger('trace')
logger.setLevel(logging.INFO)
logger.handlers = [logging.StreamHandler()]
trace = dotrace
event = doevent
else:
trace = notrace
event = noevent