Module panopticon.post

Utilities for post-processing traces for legibility

Expand source code
#!/bin/env/python3

"""Utilities for post-processing traces for legibility"""

import json
import logging
from collections import defaultdict
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Union

from .trace import Phase

logger = logging.getLogger(__name__)

_CELL_WIDTH = 100
_CELL_PADDING = 10

_EVENTS_KEY = "traceEvents"


def flatten(
    *,
    trace_file: Optional[str] = None,
    trace_str: Optional[str] = None,
    trace_json: Optional[Union[Dict, List]] = None,
) -> Union[List, Dict]:
    """
    Destroys all the timing information in a trace to make all columns 
    equally sized and minimize empty space. This is useful when the 
    valuable part of the trace is understanding the code execution 
    and not paying attention to timing information whatsoever.

    (Note that the timing information is distorted and misleading
    because of the overhead from panopticon anyways.)

    The trace can be provided as any of a file, raw string, or 
    parsed json blob.
    """

    _validate_highlander(trace_file, trace_str, trace_json)

    if trace_file:
        with open(trace_file) as infile:
            trace_str = infile.read()

    if trace_str:
        try:
            trace_json = json.loads(trace_str)
        except json.JSONDecodeError:
            trace_json = json.loads(trace_str.rstrip().rstrip(",") + "]")

    if isinstance(trace_json, list):
        trace_json = _flatten_events(trace_json)
    else:
        trace_json[_EVENTS_KEY] = _flatten_events(trace_json[_EVENTS_KEY])

    return trace_json


def _flatten_events(events: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    """
    Walks over the events maintaining per-thread stacks and an offset 
    that keeps moving forward to allow for a minimum width.
    """

    stacks = defaultdict(list)
    offsets = defaultdict(lambda: 0)

    for i, event in enumerate(events):
        tid = event["tid"]

        if event["ph"] == Phase.Duration.START:
            events[i]["ts"] = offsets[tid]
            stacks[tid].append(i)

        elif event["ph"] == Phase.Duration.END:
            if not stacks[tid]:
                logger.warn(f"Discarding {event}")
                continue
            top = events[stacks[tid].pop()]

            if offsets[tid] == top["ts"]:
                offsets[tid] += _CELL_WIDTH
            events[i]["ts"] = offsets[tid] - _CELL_PADDING

    return events


def _validate_highlander(*args):
    values = sum(1 if x else 0 for x in args)
    if values != 1:
        raise ValueError(f"Only one of {args} can be specified")

Functions

def flatten(*, trace_file: Union[str, NoneType] = None, trace_str: Union[str, NoneType] = None, trace_json: Union[Dict, List, NoneType] = None) ‑> Union[List, Dict]

Destroys all the timing information in a trace to make all columns equally sized and minimize empty space. This is useful when the valuable part of the trace is understanding the code execution and not paying attention to timing information whatsoever.

(Note that the timing information is distorted and misleading because of the overhead from panopticon anyways.)

The trace can be provided as any of a file, raw string, or parsed json blob.

Expand source code
def flatten(
    *,
    trace_file: Optional[str] = None,
    trace_str: Optional[str] = None,
    trace_json: Optional[Union[Dict, List]] = None,
) -> Union[List, Dict]:
    """
    Destroys all the timing information in a trace to make all columns 
    equally sized and minimize empty space. This is useful when the 
    valuable part of the trace is understanding the code execution 
    and not paying attention to timing information whatsoever.

    (Note that the timing information is distorted and misleading
    because of the overhead from panopticon anyways.)

    The trace can be provided as any of a file, raw string, or 
    parsed json blob.
    """

    _validate_highlander(trace_file, trace_str, trace_json)

    if trace_file:
        with open(trace_file) as infile:
            trace_str = infile.read()

    if trace_str:
        try:
            trace_json = json.loads(trace_str)
        except json.JSONDecodeError:
            trace_json = json.loads(trace_str.rstrip().rstrip(",") + "]")

    if isinstance(trace_json, list):
        trace_json = _flatten_events(trace_json)
    else:
        trace_json[_EVENTS_KEY] = _flatten_events(trace_json[_EVENTS_KEY])

    return trace_json