Module panopticon.probe
A lighter probe implementation.
Expand source code
#!/bin/env python3
"""
A lighter probe implementation.
"""
import collections
import inspect
import re
from functools import update_wrapper
from typing import Callable
from panopticon.trace import DurationTraceEvent, Phase, Trace
from panopticon.tracer import FunctionTracer
def probe(trace: Trace) -> Callable:
tracer = _Tracer(trace)
def decorator(x):
if inspect.isclass(x):
for method_name, method in vars(x).items():
if callable(method):
setattr(
x, method_name, decorator(method),
)
return x
if inspect.isgeneratorfunction(x):
replacement = _probe_generator(tracer, x)
elif inspect.iscoroutinefunction(x):
replacement = _probe_coroutine(tracer, x)
elif inspect.isasyncgenfunction(x):
replacement = _probe_async_generator(tracer, x)
else:
replacement = _probe_function(tracer, x)
return update_wrapper(replacement, x)
return decorator
def _probe_generator(tracer, x):
def wrapper(*args, **kwargs):
_panopticon_marker = tracer # Should be unnecessary
tracer.call_backtrace(inspect.currentframe())
tracer.call_fn(x, args, kwargs)
try:
return _GeneratorProbe(
tracer,
x(*args, **kwargs),
tracer._fn_name(x),
tracer._fn_cat(x),
)
finally:
tracer.return_fn(x, None)
tracer.return_backtrace(inspect.currentframe())
return wrapper
class _GeneratorProbe(collections.abc.Generator):
def __init__(self, tracer, x, name, cat):
self._tracer = tracer
self._x = x
self._trace_args = {"name": name, "cat": cat}
def send(self):
return self._x.send()
def throw(self):
return self._x.throw()
def __next__(self):
_panopticon_marker = self._tracer
self._tracer.call_backtrace(inspect.currentframe())
self._tracer.event(ph=Phase.Duration.START, **self._trace_args)
return_value = None
try:
return_value = self._x.send(None)
return return_value
finally:
self._tracer.event(
ph=Phase.Duration.END,
args={
self._tracer._RETURN_KEY: self._tracer._safe_repr(
self._tracer._RETURN_KEY, return_value
)
},
**self._trace_args,
)
self._tracer.return_backtrace(inspect.currentframe())
def _probe_coroutine(tracer, x):
def wrapper(*args, **kwargs):
_panopticon_marker = tracer # Should be unnecessary
tracer.call_backtrace(inspect.currentframe())
tracer.call_fn(x, args, kwargs)
try:
return _CoroutineProbe(
tracer,
x(*args, **kwargs),
tracer._fn_name(x),
tracer._fn_cat(x),
)
finally:
tracer.return_fn(x, None)
tracer.return_backtrace(inspect.currentframe())
return wrapper
class _CoroutineProbe(collections.abc.Coroutine):
def __init__(self, tracer, x, name, cat):
self._tracer = tracer
self._x = x
self._trace_args = {"name": name, "cat": cat}
def send(self, val):
# TODO Instrument
self._x.send(val)
def throw(self, typ, val=None, tb=None):
# TODO Instrument
self._x.throw(typ, val, tb)
def close(self):
# TODO Instrument
self._x.close()
def __await__(self):
_panopticon_marker = self._tracer
it = self._x.__await__()
while True:
self._tracer.call_backtrace(inspect.currentframe())
self._tracer.event(ph=Phase.Duration.START, **self._trace_args)
try:
result = next(it)
yield result
except StopIteration as stop:
self._tracer.event(
ph=Phase.Duration.END,
args={
self._tracer._RETURN_KEY: self._tracer._safe_repr(
self._tracer._RETURN_KEY, stop.value
)
},
**self._trace_args,
)
break
except:
self._tracer.event(
ph=Phase.Duration.END, **self._trace_args,
)
raise
else:
self._tracer.event(
ph=Phase.Duration.END,
args={
self._tracer._RETURN_KEY: self._tracer._safe_repr(
self._tracer._RETURN_KEY, result
)
},
**self._trace_args,
)
finally:
self._tracer.return_backtrace(inspect.currentframe())
def _probe_async_generator(tracer, x):
def wrapper(*args, **kwargs):
_panopticon_marker = tracer # Should be unnecessary
tracer.call_backtrace(inspect.currentframe())
tracer.call_fn(x, args, kwargs)
try:
return _AsyncGeneratorProbe(
tracer,
x(*args, **kwargs),
tracer._fn_name(x),
tracer._fn_cat(x),
)
finally:
tracer.return_fn(x, None)
tracer.return_backtrace(inspect.currentframe())
return wrapper
class _AsyncGeneratorProbe(collections.abc.AsyncGenerator):
def __init__(self, tracer, x, name, cat):
self._tracer = tracer
self._x = x
self._trace_args = {"name": name, "cat": cat}
async def asend(self, value):
return await self._x.asend(value)
async def athrow(self, typ, val=None, tb=None):
return await self._x.athrow(typ, val, tb)
async def aclose(self):
return await self._x.aclose()
async def __anext__(self):
probed = _CoroutineProbe(
self._tracer, self._x.__anext__(), **self._trace_args
)
return await probed
def _probe_function(tracer, x):
def wrapper(*args, **kwargs):
_panopticon_marker = tracer
frame = inspect.currentframe()
tracer.call_backtrace(frame)
tracer.call_fn(x, args, kwargs)
return_value = None
try:
return_value = x(*args, **kwargs)
return return_value
finally:
tracer.return_fn(x, return_value)
tracer.return_backtrace(frame)
return wrapper
class _Tracer(FunctionTracer):
"""TODO Refactor/simplify this class"""
_FN_REGEX = re.compile(r"<function (.*?) at 0x[^ ]+>")
def start(self):
return self
def stop(self):
...
def call_fn(self, fn, args, kwargs):
self.event(
name=self._fn_name(fn),
cat=self._fn_cat(fn),
ph=Phase.Duration.START,
args=self._fn_args(fn, args, kwargs),
)
def return_fn(self, fn, return_value):
self.event(
name=self._fn_name(fn),
cat=self._fn_cat(fn),
ph=Phase.Duration.END,
args={
self._RETURN_KEY: self._safe_repr(
self._RETURN_KEY, return_value
)
},
)
def event(self, name, cat, ph, args=None):
self._trace.add_event(
DurationTraceEvent(name=name, cat=cat, ph=ph, args=args,)
)
@classmethod
def _fn_name(cls, fn):
return f"{fn.__module__}.{fn.__qualname__}"
@staticmethod
def _fn_cat(fn):
if not hasattr(fn, "__code__"):
return "<unknown>"
code = fn.__code__
return f"{code.co_filename}:{code.co_firstlineno}"
@classmethod
def _fn_args(cls, fn, args, kwargs):
bound_arguments = inspect.signature(fn).bind(*args, **kwargs)
result = {}
for key, val in bound_arguments.arguments.items():
result[key] = cls._safe_repr(key, val)
return result
def call_backtrace(self, frame):
# Invert the stack
stack = []
while frame is not None and not self._is_traced_frame(frame):
stack.append(frame)
frame = frame.f_back
while stack:
frame = stack.pop()
self(frame, "call", None)
def return_backtrace(self, frame):
while frame and not self._is_traced_frame(frame):
self(frame, "return", None)
frame = frame.f_back
def _is_traced_frame(self, frame):
return (
frame is not None
and frame.f_back is not None
and frame.f_back.f_locals.get("_panopticon_marker")
and frame.f_back.f_locals.get("_panopticon_marker").get_trace()
== self.get_trace()
)
def _name(self, frame, event, arg):
# Avoiding name cache
return "... " + super()._get_frame_name(frame) + " ..."
Functions
def probe(trace: Trace) ‑> Callable
-
Expand source code
def probe(trace: Trace) -> Callable: tracer = _Tracer(trace) def decorator(x): if inspect.isclass(x): for method_name, method in vars(x).items(): if callable(method): setattr( x, method_name, decorator(method), ) return x if inspect.isgeneratorfunction(x): replacement = _probe_generator(tracer, x) elif inspect.iscoroutinefunction(x): replacement = _probe_coroutine(tracer, x) elif inspect.isasyncgenfunction(x): replacement = _probe_async_generator(tracer, x) else: replacement = _probe_function(tracer, x) return update_wrapper(replacement, x) return decorator