import re
import os
import shutil
import subprocess
import time
from typing import Any
[docs]
def print_with_cache(msg: str, prev_msg: str) -> str:
# return immediately if no message is given
if not msg:
return prev_msg
# previous message is cached so that new message is displayed only
# when it's different from the previous one (avoid redundant output)
if msg != prev_msg:
# only show the msg if differenf from the cached one
print(msg, flush=True, end="")
return msg
return prev_msg
[docs]
def watch_files(files, timeout=1000, check_dt=1):
# wait for file in files to appear, check every check_dt seconds
# if timeout seconds passed but still file not found, raise error
if isinstance(files, list):
file_list = files
else:
file_list = [files]
elapsed_t = 0
while file_list:
file_list = [f for f in file_list if not os.path.exists(f)]
time.sleep(check_dt)
elapsed_t += check_dt
if elapsed_t > timeout:
raise RuntimeError(f"watch_files: timed out waiting for files {file_list}")
[docs]
def watch_log(logfile: str, keyword: str, timeout: int=1000, check_dt: int=1) -> None:
# wait for keyword to appear in a logfile (indicating success in completion)
# check every check_dt seconds
# if logfile size grows (some active output is happening), reset the timer
# if timeout is reached, raise error
elapsed_t = 0
n0 = count_lines_in_file(logfile)
while not find_keyword_in_file(logfile, keyword):
time.sleep(check_dt)
elapsed_t += check_dt
n1 = count_lines_in_file(logfile)
if n1 > n0:
elapsed_t = 0
n0 = n1
if elapsed_t > timeout:
raise RuntimeError(f"watch_log: {logfile} remain stagnant for {timeout} seconds, while waiting for keyword '{keyword}'")
[docs]
def find_keyword_in_file(file: str, keyword: str) -> bool:
p = subprocess.run(f"grep '{keyword}' {file}", shell=True, capture_output=True, text=True)
if p.stderr:
raise RuntimeError(p.stderr)
else:
if p.stdout:
return True
return False
[docs]
def count_lines_in_file(file: str) -> int:
p = subprocess.run(f"wc -l {file}", shell=True, capture_output=True, text=True)
if p.stderr:
raise RuntimeError(p.stderr)
else:
n = int(p.stdout.split(' ')[0])
return n
[docs]
class Progress:
"""
Progress tracker and displayer. Used by Context.logger to show runtime progress.
"""
interactive: bool
debug: bool
call_stack: list[dict[str, Any]]
call_stack_max_level: int|None
formatter: Formatter
def __init__(self, interactive: bool=True,
is_notebook: bool=False,
cols: int=80,
debug: bool=False,
call_stack: list[dict]|None=None,
call_stack_max_level: int|None=None,
anchor: int=50,
tabspace: int=4,
progress_bar_width: int=10,
io_interval: float=0.1) -> None:
self.interactive = interactive
self.io_interval = io_interval
self._last_updated = time.time()
self.debug = debug
self.call_stack = []
if call_stack:
self.call_stack = call_stack
self.call_stack_max_level = call_stack_max_level
if not self.interactive:
self.call_stack_max_level = None
self.fmt = Formatter(interactive, is_notebook, cols, anchor, tabspace, progress_bar_width)
[docs]
def new_node(self, func_name: str|None=None) -> dict:
node = {
'name': func_name,
'substeps': 0,
'header': '',
'flag': 'waiting',
'current_task': 0,
'total_tasks': 1,
'message': '',
'elapsed_time': None,
}
return node
@property
def node(self) -> dict:
if not self.call_stack:
return self.new_node('')
return self.call_stack[-1]
[docs]
def within_max_level(self, level: int) -> bool:
if self.call_stack_max_level is None:
return True
if level <= 2:
return True
return level <= self.call_stack_max_level
[docs]
def is_leaf(self, node) -> bool:
if node['substeps'] == 0:
if self.call_stack_max_level and self.call_stack_max_level < 2:
return False
return True
return False
@property
def level(self) -> int:
return len(self.call_stack)
[docs]
def set_flag(self, flag: str):
self.node['flag'] = flag
[docs]
def get_timer_msg(self, node):
elapsed = node.get('elapsed_time')
timer_msg = f"{elapsed:7.2f}s" if elapsed is not None else ""
return timer_msg
def _format_line(self, node: dict,
level: int,
include_indent: bool=True,
include_name: bool=True,
include_padding: bool=True,
is_branch: bool=True,
trailer: str="") -> str:
"""
Maintains order: [Indent] [Name] [Padding] [Flag] [Trailer (PBar/Time)] [Message]
"""
indent = self.fmt.indent(level, branch=is_branch) if include_indent else ""
name = node['name'] if include_name else ""
if self.within_max_level(level):
header = indent+name
padding = self.fmt.padding(level, name) if include_padding else ""
else:
header = node['header']
padding = f"... "
stat_flag = self.fmt.stat_flag.get(node['flag'], node['flag'].upper())
message = f"({node['message']})" if node['message'] else ""
return f"{header}{padding}{stat_flag} {trailer} {message}"
[docs]
def push(self, func_name: str):
parent = self.node
node = self.new_node(func_name)
self.call_stack.append(node)
if self.call_stack_max_level and self.call_stack_max_level < 2 and self.level > 1:
return ""
if self.debug:
return f"\nENTERING: {func_name}\n"
newline = ''
if self.call_stack:
if self.within_max_level(self.level):
if parent['substeps'] == 0:
newline = '\n'
parent['substeps'] += 1
else:
newline = self.fmt.clear_line
if self.within_max_level(self.level):
self.node['header'] = f"{self.fmt.indent(self.level)}{func_name}"
header = f"{self.node['header']}: "
if self.interactive:
header = self.fmt.truncate(header)
return newline+header
self.node['header'] = f"{parent['header']} > {func_name}"
header = f"{self.node['header']}: "
if self.interactive:
header = self.fmt.truncate(header)
return newline+header
[docs]
def pop(self):
if not self.call_stack:
return ''
node, level = self.node, self.level
within_max_level = self.within_max_level(level)
self.call_stack.pop()
if not within_max_level and node['message']:
split = ' | ' if self.node['message'] else ''
self.node['message'] += split+node['message']
timer_msg = self.get_timer_msg(node)
stat_flag = self.fmt.stat_flag.get(node['flag'], node['flag'].upper())
if self.call_stack_max_level and self.call_stack_max_level < 2 and level > 1:
return ""
if self.debug:
return f"EXITING: {node['name']} {stat_flag} {timer_msg}\n\n"
# Handle the vertical branch line for parents
addline = f"{self.fmt.indent(level, branch=False)}\n" if (node['substeps'] > 0 and within_max_level) else ""
if not self.interactive:
is_branch = (node['substeps'] == 0)
res = self._format_line(node, level, include_name=False, include_padding=False, include_indent=not is_branch, is_branch=is_branch, trailer=timer_msg)
return res+f"\n{addline}"
if self.is_leaf(node):
# Leaf node: clear line, show full name + result
res = self._format_line(node, level, include_name=True, is_branch=True, trailer=timer_msg)
res = self.fmt.truncate(res)
if within_max_level:
res += f"\n{addline}"
return f"{self.fmt.clear_line}{res}"
else:
# Parent node: don't clear, don't show name/padding
res = self._format_line(node, level, include_name=False, include_padding=False, is_branch=False, trailer=timer_msg)
return self.fmt.truncate(res)+f"\n{addline}"
[docs]
def update(self) -> str:
if self.call_stack_max_level and self.call_stack_max_level < 2:
return ""
if self.debug:
return ""
node, level = self.node, self.level
total, current = node['total_tasks'], node['current_task']
# throttle output to remain within a fixed rate
now = time.time()
is_important = (current == 0) or (current >= total)
if not is_important and (now - self._last_updated) < self.io_interval:
return ""
self._last_updated = now
if not self.interactive:
prev_bin = (100 * (current - 1) // total) // 10
curr_bin = (100 * current // total) // 10
return f"{10 * curr_bin}%..." if curr_bin > prev_bin else ''
# Prepare the 'trailer' (Progress Bar)
pbar = self.fmt.progress_bar(current, total) if node['flag'] == 'running' else ""
res = self._format_line(node, level, include_name=True, trailer=pbar)
if self.interactive:
res = self.fmt.truncate(res)
return f"{self.fmt.clear_line}{res}"
[docs]
def log(self, msg: str, flag: str) -> str:
"""
Safely injects a global message without breaking the tree.
"""
indent = ''
if self.within_max_level(self.level):
indent = self.fmt.indent(self.level+1, branch=False)
else:
if self.call_stack_max_level:
indent = self.fmt.indent(self.call_stack_max_level, branch=False)
event_pin = self.fmt.event_flag.get(flag, flag.upper()+':')
if self.debug:
return f"{event_pin} {msg}\n"
if self.is_leaf(self.node):
if self.node['flag'] == 'running':
return f"\n{indent}\n{event_pin} {msg}\n"
return f"\n{indent}\n{event_pin} {msg}"
return f"{indent}\n{event_pin} {msg}\n"