Module panopticon.tracer
The actual tracer
Expand source code
#!/bin/env python3
"""The actual tracer"""
import abc
import dis
import logging
import os
import sys
import threading
from typing import Any, Dict, Optional
import opcode
from .trace import (
DurationTraceEvent,
FlowBindingPoint,
FlowTraceEvent,
Phase,
Trace,
)
logger = logging.getLogger(__name__)
class Tracer(abc.ABC):
def __init__(self, trace=None):
self._trace = trace or Trace()
def start(self):
threading.setprofile(self) # Avoid noise
sys.setprofile(self)
return self
def stop(self):
sys.setprofile(None)
threading.setprofile(None)
def get_trace(self):
return self._trace
def __enter__(self):
return self.start()
def __exit__(self, exc_type, exc_value, traceback):
self.stop()
def __call__(self, frame, event, arg):
if self._skip(frame):
return
self._call(frame, event, arg)
@staticmethod
def _skip(frame):
"""Skip anything belonging to the Panopticon module.
TODO: Also skip any children triggered from here."""
path = frame.f_code.co_filename
package_path, module = os.path.split(path)
package = os.path.basename(package_path)
return package == "panopticon"
@abc.abstractmethod
def _call(self, frame, event, arg):
...
class FunctionTracer(Tracer):
_RETURN_KEY = "[return value]"
def __init__(self, trace=None, capture_args=None):
super().__init__(trace)
self._state = threading.local()
self._state.active = None
self._capture_args = capture_args
self._name_cache = {}
def stop(self):
super().stop()
# Prevent leaking frames
self._name_cache.clear()
def _call(self, frame, event, arg):
code = frame.f_code
if event == "call" or event == "c_call":
ph = Phase.Duration.START
elif event == "return" or event == "c_return":
ph = Phase.Duration.END
else:
ph = None
# Names on .*return events are superfluous but helpful
# for debugging and testing.
if event == "c_call" or event == "c_return":
name = str(arg)
cat = "c function"
elif event == "call" or event == "return":
name = self._name(frame, event, arg)
cat = f"{code.co_filename}:{code.co_firstlineno}"
else:
name = None
cat = None
if ph:
self._trace.add_event(
DurationTraceEvent(
name=name,
cat=cat,
ph=ph,
args=self._capture_arguments(frame, event, arg),
)
)
def _capture_arguments(
self, frame, event, arg
) -> Optional[Dict[str, Any]]:
if not self._capture_args or not self._capture_args(frame, event, arg):
return None
if event == "call":
return {
key: self._safe_repr(key, val)
for key, val in frame.f_locals.items()
}
if event == "return":
return {self._RETURN_KEY: self._safe_repr(self._RETURN_KEY, arg)}
return None
@staticmethod
def _safe_repr(key, val) -> str:
try:
return repr(val)
except:
logger.exception(f"Couldn't represent value for {key}")
try:
return str(val)
logger.exception(f"Couldn't stringify value for {key}")
except:
return "<couldn't convert>"
def _name(self, frame, event, arg):
name = self._name_cache.get(frame)
if not name:
name = self._get_frame_name(frame)
if event == "c_return" or event == "return":
self._name_cache.pop(frame, None)
else:
self._name_cache[frame] = name
return name
@classmethod
def _get_frame_name(cls, frame):
code = frame.f_code
classname = cls._get_class_name(frame)
classname = "." + classname if classname else ""
module = cls._get_module_name(frame)
return f"{module}{classname}.{code.co_name}"
@classmethod
def _get_class_name(cls, frame) -> Optional[str]:
"""Heuristics to extract classname for a method"""
code_name = frame.f_code.co_name
local_self = frame.f_locals.get("self")
if (
local_self is not None
and hasattr(local_self, code_name)
and callable(getattr(local_self, code_name))
):
return type(local_self).__name__
return None
@classmethod
def _get_module_name(cls, frame) -> str:
"""Some heuristics to get useful names for modules"""
code = frame.f_code
filename = code.co_filename
module, _ = os.path.splitext(os.path.basename(filename))
if module == "__init__" or module == "__main__":
module = (
os.path.basename(os.path.split(filename)[0]) + "." + module
)
return module
_CODE_FLAGS = {}
for flag, name in dis.COMPILER_FLAG_NAMES.items():
_CODE_FLAGS[name] = flag
class AsyncioTracer(FunctionTracer):
RETURN_OPCODE = opcode.opmap["RETURN_VALUE"] # 83
CONTINUABLE_CODE_TYPES = [
"GENERATOR",
"ASYNC_GENERATOR",
"COROUTINE",
"ITERABLE_COROUTINE",
]
CONTINUABLE_CODE_FLAGS = 0
for flag in CONTINUABLE_CODE_TYPES:
CONTINUABLE_CODE_FLAGS |= _CODE_FLAGS[flag]
def __init__(self, trace=None, capture_args=None):
super().__init__(trace, capture_args)
self._ids = set()
def _call(self, frame, event, arg):
code = frame.f_code
frame_id = id(frame)
if event == "return" and self._is_continuable_code(code):
if self._is_frame_finished(frame, arg):
self._ids.discard(frame_id)
else:
self._ids.add(frame_id)
self._trace.add_event(
FlowTraceEvent(
name=code.co_name,
cat=self._code_category(code),
ph=Phase.Flow.START,
bp=FlowBindingPoint.ENCLOSING,
id=frame_id,
)
)
super()._call(frame, event, arg)
# Emit the end point after starting the run
if id(frame) in self._ids and event == "call":
self._trace.add_event(
FlowTraceEvent(
name=code.co_name,
cat=self._code_category(code),
ph=Phase.Flow.END,
bp=FlowBindingPoint.ENCLOSING,
id=id(frame),
)
)
def _name(self, frame, event, arg):
name = self._name_cache.get(frame)
if not name:
name = self._get_frame_name(frame)
if self._is_frame_finished(frame, arg):
self._name_cache.pop(frame, None)
else:
self._name_cache[frame] = name
return name
@classmethod
def _is_frame_finished(cls, frame, arg):
code = frame.f_code
offset = frame.f_lasti
return code.co_code[offset] == cls.RETURN_OPCODE
@classmethod
def _is_continuable_code(cls, code):
return code.co_flags & cls.CONTINUABLE_CODE_FLAGS > 0
@classmethod
def _code_category(cls, code):
for flag in cls.CONTINUABLE_CODE_TYPES:
if _CODE_FLAGS[flag] & code.co_flags > 0:
return flag
return "UNKNOWN"
Classes
class AsyncioTracer (trace=None, capture_args=None)
-
Helper class that provides a standard way to create an ABC using inheritance.
Expand source code
class AsyncioTracer(FunctionTracer): RETURN_OPCODE = opcode.opmap["RETURN_VALUE"] # 83 CONTINUABLE_CODE_TYPES = [ "GENERATOR", "ASYNC_GENERATOR", "COROUTINE", "ITERABLE_COROUTINE", ] CONTINUABLE_CODE_FLAGS = 0 for flag in CONTINUABLE_CODE_TYPES: CONTINUABLE_CODE_FLAGS |= _CODE_FLAGS[flag] def __init__(self, trace=None, capture_args=None): super().__init__(trace, capture_args) self._ids = set() def _call(self, frame, event, arg): code = frame.f_code frame_id = id(frame) if event == "return" and self._is_continuable_code(code): if self._is_frame_finished(frame, arg): self._ids.discard(frame_id) else: self._ids.add(frame_id) self._trace.add_event( FlowTraceEvent( name=code.co_name, cat=self._code_category(code), ph=Phase.Flow.START, bp=FlowBindingPoint.ENCLOSING, id=frame_id, ) ) super()._call(frame, event, arg) # Emit the end point after starting the run if id(frame) in self._ids and event == "call": self._trace.add_event( FlowTraceEvent( name=code.co_name, cat=self._code_category(code), ph=Phase.Flow.END, bp=FlowBindingPoint.ENCLOSING, id=id(frame), ) ) def _name(self, frame, event, arg): name = self._name_cache.get(frame) if not name: name = self._get_frame_name(frame) if self._is_frame_finished(frame, arg): self._name_cache.pop(frame, None) else: self._name_cache[frame] = name return name @classmethod def _is_frame_finished(cls, frame, arg): code = frame.f_code offset = frame.f_lasti return code.co_code[offset] == cls.RETURN_OPCODE @classmethod def _is_continuable_code(cls, code): return code.co_flags & cls.CONTINUABLE_CODE_FLAGS > 0 @classmethod def _code_category(cls, code): for flag in cls.CONTINUABLE_CODE_TYPES: if _CODE_FLAGS[flag] & code.co_flags > 0: return flag return "UNKNOWN"
Ancestors
- FunctionTracer
- Tracer
- abc.ABC
Class variables
var CONTINUABLE_CODE_FLAGS
var CONTINUABLE_CODE_TYPES
var RETURN_OPCODE
var flag
class FunctionTracer (trace=None, capture_args=None)
-
Helper class that provides a standard way to create an ABC using inheritance.
Expand source code
class FunctionTracer(Tracer): _RETURN_KEY = "[return value]" def __init__(self, trace=None, capture_args=None): super().__init__(trace) self._state = threading.local() self._state.active = None self._capture_args = capture_args self._name_cache = {} def stop(self): super().stop() # Prevent leaking frames self._name_cache.clear() def _call(self, frame, event, arg): code = frame.f_code if event == "call" or event == "c_call": ph = Phase.Duration.START elif event == "return" or event == "c_return": ph = Phase.Duration.END else: ph = None # Names on .*return events are superfluous but helpful # for debugging and testing. if event == "c_call" or event == "c_return": name = str(arg) cat = "c function" elif event == "call" or event == "return": name = self._name(frame, event, arg) cat = f"{code.co_filename}:{code.co_firstlineno}" else: name = None cat = None if ph: self._trace.add_event( DurationTraceEvent( name=name, cat=cat, ph=ph, args=self._capture_arguments(frame, event, arg), ) ) def _capture_arguments( self, frame, event, arg ) -> Optional[Dict[str, Any]]: if not self._capture_args or not self._capture_args(frame, event, arg): return None if event == "call": return { key: self._safe_repr(key, val) for key, val in frame.f_locals.items() } if event == "return": return {self._RETURN_KEY: self._safe_repr(self._RETURN_KEY, arg)} return None @staticmethod def _safe_repr(key, val) -> str: try: return repr(val) except: logger.exception(f"Couldn't represent value for {key}") try: return str(val) logger.exception(f"Couldn't stringify value for {key}") except: return "<couldn't convert>" def _name(self, frame, event, arg): name = self._name_cache.get(frame) if not name: name = self._get_frame_name(frame) if event == "c_return" or event == "return": self._name_cache.pop(frame, None) else: self._name_cache[frame] = name return name @classmethod def _get_frame_name(cls, frame): code = frame.f_code classname = cls._get_class_name(frame) classname = "." + classname if classname else "" module = cls._get_module_name(frame) return f"{module}{classname}.{code.co_name}" @classmethod def _get_class_name(cls, frame) -> Optional[str]: """Heuristics to extract classname for a method""" code_name = frame.f_code.co_name local_self = frame.f_locals.get("self") if ( local_self is not None and hasattr(local_self, code_name) and callable(getattr(local_self, code_name)) ): return type(local_self).__name__ return None @classmethod def _get_module_name(cls, frame) -> str: """Some heuristics to get useful names for modules""" code = frame.f_code filename = code.co_filename module, _ = os.path.splitext(os.path.basename(filename)) if module == "__init__" or module == "__main__": module = ( os.path.basename(os.path.split(filename)[0]) + "." + module ) return module
Ancestors
- Tracer
- abc.ABC
Subclasses
- panopticon.probe._Tracer
- AsyncioTracer
Methods
def stop(self)
-
Expand source code
def stop(self): super().stop() # Prevent leaking frames self._name_cache.clear()
class Tracer (trace=None)
-
Helper class that provides a standard way to create an ABC using inheritance.
Expand source code
class Tracer(abc.ABC): def __init__(self, trace=None): self._trace = trace or Trace() def start(self): threading.setprofile(self) # Avoid noise sys.setprofile(self) return self def stop(self): sys.setprofile(None) threading.setprofile(None) def get_trace(self): return self._trace def __enter__(self): return self.start() def __exit__(self, exc_type, exc_value, traceback): self.stop() def __call__(self, frame, event, arg): if self._skip(frame): return self._call(frame, event, arg) @staticmethod def _skip(frame): """Skip anything belonging to the Panopticon module. TODO: Also skip any children triggered from here.""" path = frame.f_code.co_filename package_path, module = os.path.split(path) package = os.path.basename(package_path) return package == "panopticon" @abc.abstractmethod def _call(self, frame, event, arg): ...
Ancestors
- abc.ABC
Subclasses
Methods
def get_trace(self)
-
Expand source code
def get_trace(self): return self._trace
def start(self)
-
Expand source code
def start(self): threading.setprofile(self) # Avoid noise sys.setprofile(self) return self
def stop(self)
-
Expand source code
def stop(self): sys.setprofile(None) threading.setprofile(None)