Home | History | Annotate | Line # | Download | only in analyzer
      1 import os
      2 import sys
      3 import time
      4 
      5 from subprocess import CalledProcessError, check_call
      6 from typing import List, IO, Optional, Tuple
      7 
      8 
      9 def which(command: str, paths: Optional[str] = None) -> Optional[str]:
     10     """which(command, [paths]) - Look up the given command in the paths string
     11     (or the PATH environment variable, if unspecified)."""
     12 
     13     if paths is None:
     14         paths = os.environ.get('PATH', '')
     15 
     16     # Check for absolute match first.
     17     if os.path.exists(command):
     18         return command
     19 
     20     # Would be nice if Python had a lib function for this.
     21     if not paths:
     22         paths = os.defpath
     23 
     24     # Get suffixes to search.
     25     # On Cygwin, 'PATHEXT' may exist but it should not be used.
     26     if os.pathsep == ';':
     27         pathext = os.environ.get('PATHEXT', '').split(';')
     28     else:
     29         pathext = ['']
     30 
     31     # Search the paths...
     32     for path in paths.split(os.pathsep):
     33         for ext in pathext:
     34             p = os.path.join(path, command + ext)
     35             if os.path.exists(p):
     36                 return p
     37 
     38     return None
     39 
     40 
     41 def has_no_extension(file_name: str) -> bool:
     42     root, ext = os.path.splitext(file_name)
     43     return ext == ""
     44 
     45 
     46 def is_valid_single_input_file(file_name: str) -> bool:
     47     root, ext = os.path.splitext(file_name)
     48     return ext in (".i", ".ii", ".c", ".cpp", ".m", "")
     49 
     50 
     51 def time_to_str(time: float) -> str:
     52     """
     53     Convert given time in seconds into a human-readable string.
     54     """
     55     return f"{time:.2f}s"
     56 
     57 
     58 def memory_to_str(memory: int) -> str:
     59     """
     60     Convert given number of bytes into a human-readable string.
     61     """
     62     if memory:
     63         try:
     64             import humanize
     65             return humanize.naturalsize(memory, gnu=True)
     66         except ImportError:
     67             # no formatter installed, let's keep it in bytes
     68             return f"{memory}B"
     69 
     70     # If memory is 0, we didn't succeed measuring it.
     71     return "N/A"
     72 
     73 
     74 def check_and_measure_call(*popenargs, **kwargs) -> Tuple[float, int]:
     75     """
     76     Run command with arguments.  Wait for command to complete and measure
     77     execution time and peak memory consumption.
     78     If the exit code was zero then return, otherwise raise
     79     CalledProcessError.  The CalledProcessError object will have the
     80     return code in the returncode attribute.
     81 
     82     The arguments are the same as for the call and check_call functions.
     83 
     84     Return a tuple of execution time and peak memory.
     85     """
     86     peak_mem = 0
     87     start_time = time.time()
     88 
     89     try:
     90         import psutil as ps
     91 
     92         def get_memory(process: ps.Process) -> int:
     93             mem = 0
     94 
     95             # we want to gather memory usage from all of the child processes
     96             descendants = list(process.children(recursive=True))
     97             descendants.append(process)
     98 
     99             for subprocess in descendants:
    100                 try:
    101                     mem += subprocess.memory_info().rss
    102                 except (ps.NoSuchProcess, ps.AccessDenied):
    103                     continue
    104 
    105             return mem
    106 
    107         with ps.Popen(*popenargs, **kwargs) as process:
    108             # while the process is running calculate resource utilization.
    109             while (process.is_running() and
    110                    process.status() != ps.STATUS_ZOMBIE):
    111                 # track the peak utilization of the process
    112                 peak_mem = max(peak_mem, get_memory(process))
    113                 time.sleep(.5)
    114 
    115             if process.is_running():
    116                 process.kill()
    117 
    118         if process.returncode != 0:
    119             cmd = kwargs.get("args")
    120             if cmd is None:
    121                 cmd = popenargs[0]
    122             raise CalledProcessError(process.returncode, cmd)
    123 
    124     except ImportError:
    125         # back off to subprocess if we don't have psutil installed
    126         peak_mem = 0
    127         check_call(*popenargs, **kwargs)
    128 
    129     return time.time() - start_time, peak_mem
    130 
    131 
    132 def run_script(script_path: str, build_log_file: IO, cwd: str,
    133                out=sys.stdout, err=sys.stderr, verbose: int = 0):
    134     """
    135     Run the provided script if it exists.
    136     """
    137     if os.path.exists(script_path):
    138         try:
    139             if verbose == 1:
    140                 out.write(f"  Executing: {script_path}\n")
    141 
    142             check_call(f"chmod +x '{script_path}'", cwd=cwd,
    143                        stderr=build_log_file,
    144                        stdout=build_log_file,
    145                        shell=True)
    146 
    147             check_call(f"'{script_path}'", cwd=cwd,
    148                        stderr=build_log_file,
    149                        stdout=build_log_file,
    150                        shell=True)
    151 
    152         except CalledProcessError:
    153             err.write(f"Error: Running {script_path} failed. "
    154                       f"See {build_log_file.name} for details.\n")
    155             sys.exit(-1)
    156 
    157 
    158 def is_comment_csv_line(entries: List[str]) -> bool:
    159     """
    160     Treat CSV lines starting with a '#' as a comment.
    161     """
    162     return len(entries) > 0 and entries[0].startswith("#")
    163