diff options
Diffstat (limited to 'test/utils.py')
-rw-r--r-- | test/utils.py | 258 |
1 files changed, 129 insertions, 129 deletions
diff --git a/test/utils.py b/test/utils.py index 00ece562..bdb36e6e 100644 --- a/test/utils.py +++ b/test/utils.py @@ -32,154 +32,154 @@ SIGNAMES = dict((k, v) for v, k in reversed(sorted(signal.__dict__.items())) class Error(Exception): - pass + pass class Executable(object): - def __init__(self, exe, *before_args, **kwargs): - self.exe = exe - self.before_args = list(before_args) - self.after_args = [] - self.basename = kwargs.get('basename', - os.path.basename(exe)).replace('.exe', '') - self.error_cmdline = kwargs.get('error_cmdline', True) - self.clean_stdout = kwargs.get('clean_stdout') - self.clean_stderr = kwargs.get('clean_stderr') - self.stdout_handle = self._ForwardHandle(kwargs.get('forward_stdout')) - self.stderr_handle = self._ForwardHandle(kwargs.get('forward_stderr')) - self.verbose = False - - def _ForwardHandle(self, forward): - return None if forward else subprocess.PIPE - - def _RunWithArgsInternal(self, *args, **kwargs): - cmd = [self.exe] + self.before_args + list(args) + self.after_args - cmd_str = ' '.join(cmd) - if self.verbose: - print(cmd_str) - - if self.error_cmdline: - err_cmd_str = cmd_str.replace('.exe', '') - else: - err_cmd_str = self.basename - - stdout = '' - stderr = '' - error = None - try: - process = subprocess.Popen(cmd, stdout=self.stdout_handle, - stderr=self.stderr_handle, **kwargs) - stdout, stderr = process.communicate() - if stdout: - stdout = stdout.decode('utf-8', 'ignore') - if stderr: - stderr = stderr.decode('utf-8', 'ignore') - if self.clean_stdout: - stdout = self.clean_stdout(stdout) - if self.clean_stderr: - stderr = self.clean_stderr(stderr) - if process.returncode < 0: - # Terminated by signal - signame = SIGNAMES.get(-process.returncode, '<unknown>') - error = Error('Signal raised running "%s": %s\n%s' % (err_cmd_str, - signame, stderr)) - elif process.returncode > 0: - error = Error('Error running "%s":\n%s' % (err_cmd_str, stderr)) - except OSError as e: - error = Error('Error running "%s": %s' % (err_cmd_str, str(e))) - return stdout, stderr, error - - def RunWithArgsForStdout(self, *args, **kwargs): - stdout, stderr, error = self._RunWithArgsInternal(*args, **kwargs) - if error: - raise error - return stdout - - def RunWithArgs(self, *args, **kwargs): - stdout, stderr, error = self._RunWithArgsInternal(*args, **kwargs) - if stdout: - sys.stdout.write(stdout) - if error: - raise error - - def AppendArg(self, arg): - self.after_args.append(arg) - - def AppendOptionalArgs(self, option_dict): - for option, value in option_dict.items(): - if value: - if value is True: - self.AppendArg(option) + def __init__(self, exe, *before_args, **kwargs): + self.exe = exe + self.before_args = list(before_args) + self.after_args = [] + self.basename = kwargs.get('basename', + os.path.basename(exe)).replace('.exe', '') + self.error_cmdline = kwargs.get('error_cmdline', True) + self.clean_stdout = kwargs.get('clean_stdout') + self.clean_stderr = kwargs.get('clean_stderr') + self.stdout_handle = self._ForwardHandle(kwargs.get('forward_stdout')) + self.stderr_handle = self._ForwardHandle(kwargs.get('forward_stderr')) + self.verbose = False + + def _ForwardHandle(self, forward): + return None if forward else subprocess.PIPE + + def _RunWithArgsInternal(self, *args, **kwargs): + cmd = [self.exe] + self.before_args + list(args) + self.after_args + cmd_str = ' '.join(cmd) + if self.verbose: + print(cmd_str) + + if self.error_cmdline: + err_cmd_str = cmd_str.replace('.exe', '') else: - self.AppendArg('%s=%s' % (option, value)) + err_cmd_str = self.basename + + stdout = '' + stderr = '' + error = None + try: + process = subprocess.Popen(cmd, stdout=self.stdout_handle, + stderr=self.stderr_handle, **kwargs) + stdout, stderr = process.communicate() + if stdout: + stdout = stdout.decode('utf-8', 'ignore') + if stderr: + stderr = stderr.decode('utf-8', 'ignore') + if self.clean_stdout: + stdout = self.clean_stdout(stdout) + if self.clean_stderr: + stderr = self.clean_stderr(stderr) + if process.returncode < 0: + # Terminated by signal + signame = SIGNAMES.get(-process.returncode, '<unknown>') + error = Error('Signal raised running "%s": %s\n%s' % (err_cmd_str, + signame, stderr)) + elif process.returncode > 0: + error = Error('Error running "%s":\n%s' % (err_cmd_str, stderr)) + except OSError as e: + error = Error('Error running "%s": %s' % (err_cmd_str, str(e))) + return stdout, stderr, error + + def RunWithArgsForStdout(self, *args, **kwargs): + stdout, stderr, error = self._RunWithArgsInternal(*args, **kwargs) + if error: + raise error + return stdout + + def RunWithArgs(self, *args, **kwargs): + stdout, stderr, error = self._RunWithArgsInternal(*args, **kwargs) + if stdout: + sys.stdout.write(stdout) + if error: + raise error + + def AppendArg(self, arg): + self.after_args.append(arg) + + def AppendOptionalArgs(self, option_dict): + for option, value in option_dict.items(): + if value: + if value is True: + self.AppendArg(option) + else: + self.AppendArg('%s=%s' % (option, value)) @contextlib.contextmanager def TempDirectory(out_dir, prefix=None): - if out_dir: - out_dir_is_temp = False - if not os.path.exists(out_dir): - os.makedirs(out_dir) - else: - out_dir = tempfile.mkdtemp(prefix=prefix) - out_dir_is_temp = True + if out_dir: + out_dir_is_temp = False + if not os.path.exists(out_dir): + os.makedirs(out_dir) + else: + out_dir = tempfile.mkdtemp(prefix=prefix) + out_dir_is_temp = True - try: - yield out_dir - finally: - if out_dir_is_temp: - shutil.rmtree(out_dir) + try: + yield out_dir + finally: + if out_dir_is_temp: + shutil.rmtree(out_dir) def ChangeExt(path, new_ext): - return os.path.splitext(path)[0] + new_ext + return os.path.splitext(path)[0] + new_ext def ChangeDir(path, new_dir): - return os.path.join(new_dir, os.path.basename(path)) + return os.path.join(new_dir, os.path.basename(path)) def Hexdump(data): - if type(data) is str: - data = bytearray(data, 'ascii') - - DUMP_OCTETS_PER_LINE = 16 - DUMP_OCTETS_PER_GROUP = 2 - - p = 0 - end = len(data) - lines = [] - while p < end: - line_start = p - line_end = p + DUMP_OCTETS_PER_LINE - line = '%07x: ' % p - while p < line_end: - for i in xrange(DUMP_OCTETS_PER_GROUP): - if p < end: - line += '%02x' % data[p] - else: - line += ' ' - p += 1 - line += ' ' - line += ' ' - p = line_start - for i in xrange(DUMP_OCTETS_PER_LINE): - if p >= end: - break - x = data[p] - if x >= 32 and x < 0x7f: - line += '%c' % x - else: - line += '.' - p += 1 - line += '\n' - lines.append(line) - - return lines + if type(data) is str: + data = bytearray(data, 'ascii') + + DUMP_OCTETS_PER_LINE = 16 + DUMP_OCTETS_PER_GROUP = 2 + + p = 0 + end = len(data) + lines = [] + while p < end: + line_start = p + line_end = p + DUMP_OCTETS_PER_LINE + line = '%07x: ' % p + while p < line_end: + for i in xrange(DUMP_OCTETS_PER_GROUP): + if p < end: + line += '%02x' % data[p] + else: + line += ' ' + p += 1 + line += ' ' + line += ' ' + p = line_start + for i in xrange(DUMP_OCTETS_PER_LINE): + if p >= end: + break + x = data[p] + if x >= 32 and x < 0x7f: + line += '%c' % x + else: + line += '.' + p += 1 + line += '\n' + lines.append(line) + + return lines def GetModuleFilenamesFromSpecJSON(json_filename): - with open(json_filename) as json_file: - json_data = json.load(json_file) - return [m['filename'] for m in json_data['commands'] if 'filename' in m] + with open(json_filename) as json_file: + json_data = json.load(json_file) + return [m['filename'] for m in json_data['commands'] if 'filename' in m] |