diff --git a/benchkit/communication/docker.py b/benchkit/communication/docker.py index 0cbf2e45..154a0ab0 100644 --- a/benchkit/communication/docker.py +++ b/benchkit/communication/docker.py @@ -105,6 +105,7 @@ def shell( timeout=timeout, output_is_log=output_is_log, ignore_ret_codes=ignore_ret_codes, + ignore_any_error_code=ignore_any_error_code, ) return output diff --git a/benchkit/shell/command_execution/command_process.py b/benchkit/shell/command_execution/command_process.py new file mode 100644 index 00000000..039464a3 --- /dev/null +++ b/benchkit/shell/command_execution/command_process.py @@ -0,0 +1,106 @@ +# Copyright (C) 2025 Vrije Universiteit Brussel. All rights reserved. +# SPDX-License-Identifier: MIT + +from __future__ import annotations # Otherwise Queue comlains about typing + +import itertools +from multiprocessing import Queue +from subprocess import CalledProcessError, Popen, TimeoutExpired +from threading import Thread +from typing import Iterable, Optional, Tuple + +from benchkit.shell.command_execution.io.output import Output + + +class CommandProcess: + """Encaptulation of the Popen process with functions to use it in an asyncronous way""" + def __init__( + self, + popen_object: Popen[bytes], + output: Output, + timeout: Optional[int], + success_code: int = 0, + ignore_exit_codes: Optional[Iterable[int]] = None, + ): + + self.__popen_object: Popen[bytes] = popen_object + self.__output: Output = output + self.__timeout: Optional[int] = timeout + self.__retcode_queue: Queue[Tuple[int, Optional[Exception]]] = Queue() + self.success_code: int = success_code + self.ignore_exit_codes: Iterable[int] = ( + (success_code,) if ignore_exit_codes is None else ignore_exit_codes + ) + + # add the success_code to the return codes to ignore + if self.success_code not in self.ignore_exit_codes: + self.ignore_exit_codes = itertools.chain([success_code], self.ignore_exit_codes) + + self.retcode: Optional[int] = None + self.error: Optional[Exception] = None + self.process: Thread = self.__wait_async() + + @staticmethod + def __wait_func( + subprocess: Popen[bytes], + queue: Queue[Tuple[int, Optional[Exception]]], + timeout: Optional[int], + ignore_exit_codes: Iterable[int], + ) -> None: + try: + retcode = subprocess.wait(timeout) + if retcode not in ignore_exit_codes: + queue.put( + ( + retcode, + CalledProcessError( + retcode, + subprocess.args, + ), + ) + ) + else: + queue.put((retcode, None)) + except TimeoutExpired as exc: + # TODO: we can add some form of logging here to warn the user if something went wrong + subprocess.terminate() + subprocess.wait(1) + queue.put((-1, exc)) + + def __wait_async(self) -> Thread: + waiting_thread = Thread( + target=self.__wait_func, + args=( + self.__popen_object, + self.__retcode_queue, + self.__timeout, + self.ignore_exit_codes, + ), + ) + waiting_thread.start() + return waiting_thread + + def get_output(self) -> Output: + """get the Output object related to this process + can be used as input for other processes""" + return self.__output + + def get_return_code(self) -> int: + """halt until the process has a return code + if the return code is not ignored + or the waittime was exceded throw an error instead""" + if self.error is not None: + raise self.error + if self.retcode: + return self.retcode + self.process.join() + self.retcode, self.error = self.__retcode_queue.get() + if self.error is not None: + raise self.error + return self.retcode + + # TODO: check how this interacts with ssh + # THIS DOES NOT SEND IT TO THE RIGHT ONE -> move abstraction higher + def signal(self, signalcode: int) -> None: + self.__popen_object.send_signal(signalcode) + self.__popen_object.wait(1) diff --git a/benchkit/shell/command_execution/execute.py b/benchkit/shell/command_execution/execute.py new file mode 100755 index 00000000..9084a34c --- /dev/null +++ b/benchkit/shell/command_execution/execute.py @@ -0,0 +1,104 @@ +# Copyright (C) 2024 Vrije Universiteit Brussel. All rights reserved. +# SPDX-License-Identifier: MIT + +# Otherwise os.PathLike[Any] complains +from __future__ import annotations + +import pathlib +import subprocess +from typing import Dict, Iterable, List, Optional + +from benchkit.shell.command_execution.command_process import CommandProcess +from benchkit.shell.command_execution.io.hooks.hook import ( + IOHook, + IOWriterHook, + OutputHook, +) +from benchkit.shell.command_execution.io.stream import ( + EmptyIOStream, + ReadableIOStream, + WritableIOStream, +) +from benchkit.shell.command_execution.io.output import popen_get_output + + +def execute_command( + # needed for starting the command + command: List[str], + # This dir can only be a path on the local machine + current_dir: Optional[pathlib.Path] = None, + # Do we want to add os.environ to this? + environment: Optional[Dict[str, str]] = None, + # needed for construction and evaluation of output + timeout: Optional[int] = None, + ignore_ret_codes: Optional[Iterable[int]] = None, + success_value: int = 0, + # working with the IOStreams of the command + std_input: Optional[ReadableIOStream] = None, + ordered_input_hooks: Optional[List[IOHook]] = None, + ordered_output_hooks: Optional[List[OutputHook]] = None, +) -> CommandProcess: + + if environment is None: + environment = {} + + process = subprocess.Popen( + command, + cwd=current_dir, + env=environment, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + stdin=subprocess.PIPE, + ) + try: + # 1) manipulate the input stream using the ordered input hooks + if ordered_input_hooks is not None: + if std_input is None: + std_input = EmptyIOStream() + for inhook in ordered_input_hooks: + inhook.start_hook_function(std_input) + std_input = inhook.get_outgoing_io_stream() + + # 2) Write the input to the command + # hookfunction to write the ReadableIOStream given as input to stdin + def pasalong(input_stream: ReadableIOStream, _: WritableIOStream) -> None: + if process.stdin is not None: + outline = input_stream.read(1) + while outline: + process.stdin.write(outline) + process.stdin.flush() + outline = input_stream.read(1) + process.stdin.close() + + if std_input is not None: + hook = IOWriterHook(pasalong) + hook.start_hook_function(std_input) + if process.stdin is not None: + process.stdin.close() + + # 3) manipulate teh output stream using the orderd output hooks + command_output = popen_get_output(process.stdout, process.stderr) + + if ordered_output_hooks is not None: + for outhook in ordered_output_hooks: + command_output = outhook.attatch(command_output) + + # close all the main thread file descriptors + if process.stdout is not None: + process.stdout.close() + if process.stderr is not None: + process.stderr.close() + if process.stdin is not None: + process.stdin.close() + + # 4) construct the object we can use to monitor the process + return CommandProcess( + process, command_output, timeout, success_value, ignore_ret_codes + ) + + except Exception: + # make sure the process is terminated for cleanup + # TODO: this needs some test cases + process.terminate() + process.wait() + raise diff --git a/benchkit/shell/command_execution/io/hooks/basic_hooks.py b/benchkit/shell/command_execution/io/hooks/basic_hooks.py new file mode 100644 index 00000000..2d44c9c6 --- /dev/null +++ b/benchkit/shell/command_execution/io/hooks/basic_hooks.py @@ -0,0 +1,107 @@ +# Copyright (C) 2025 Vrije Universiteit Brussel. All rights reserved. +# SPDX-License-Identifier: MIT + +from __future__ import annotations # Otherwise Queue comlains about typing + +from multiprocessing import Queue +from typing import Any +from pathlib import Path + +from benchkit.shell.command_execution.io.hooks.hook import ( + IOReaderHook, + IOResultHook, + IOWriterHook, + OutputHook, +) +from benchkit.shell.command_execution.io.stream import ( + ReadableIOStream, + StringIOStream, + WritableIOStream, + try_converting_bystring_to_readable_characters, +) + + +def create_voiding_result_hook() -> IOResultHook: + def hook_function( + input_object: ReadableIOStream, _: WritableIOStream, result_queue: Queue[Any] + ): + # we do not write to the out stream thus this is "voiding" + outlines: bytes = b"" + outline = input_object.read(10) + while outline: + outlines += outline + outline = input_object.read(10) + result_queue.put(outlines) + + return IOResultHook(hook_function) + +def stream_prepend_hook(stream:StringIOStream): + def hook_function( + input_object: ReadableIOStream, output_object: WritableIOStream, + ): + outline = stream.read(10) + while outline: + output_object.write(outline) + outline = input_object.read(10) + outline = input_object.read(10) + while outline: + output_object.write(outline) + outline = input_object.read(10) + + return IOWriterHook(hook_function) + +def write_to_file_hook(path:Path,mode:str="a"): + def hook_function( + input_object: ReadableIOStream, + ): + with path.open(mode=f'{mode}b', buffering=0) as file: + outline = input_object.read(10) + while outline: + file.write(outline) + outline = input_object.read(10) + file.flush() + + return IOReaderHook(hook_function) + + + +def create_stream_line_logger_hook(formating_string: str) -> IOReaderHook: + def hook_function_line(input_object: ReadableIOStream): + byt = input_object.read_line() + while byt: + print( + formating_string.format(f"{try_converting_bystring_to_readable_characters(byt)}"), + end="", + ) + byt = input_object.read_line() + + return IOReaderHook(hook_function_line) + + +# TODO: Voiding can be done be done better but this will do for now +# problem: if there are hooks on the output they will wait for input still +# can be resolved by making use of EmptyIOStream +# Needs to be done on a higher level than hooks +def void_input(input_object: ReadableIOStream, _: WritableIOStream): + outline = input_object.read(10) + while outline: + outline = input_object.read(10) + + +def logger_line_hook(outformat: str, errformat: str): + return OutputHook( + create_stream_line_logger_hook(outformat), + create_stream_line_logger_hook(errformat), + ) + + +def void_hook(): + return OutputHook(IOWriterHook(void_input), IOWriterHook(void_input)) + + +def std_out_result_void_err(): + output_hook_object = create_voiding_result_hook() + + voiding_result_hook = OutputHook(output_hook_object, IOWriterHook(void_input)) + + return (output_hook_object, voiding_result_hook) diff --git a/benchkit/shell/command_execution/io/hooks/hook.py b/benchkit/shell/command_execution/io/hooks/hook.py new file mode 100644 index 00000000..0e30e5bb --- /dev/null +++ b/benchkit/shell/command_execution/io/hooks/hook.py @@ -0,0 +1,164 @@ +# Copyright (C) 2025 Vrije Universiteit Brussel. All rights reserved. +# SPDX-License-Identifier: MIT + +from __future__ import annotations # Otherwise Queue comlains about typing + +from abc import ABC, abstractmethod +from multiprocessing import Process, Queue +from typing import Any, Callable, Optional + +from benchkit.shell.command_execution.io.stream import ( + EmptyIOStream, + PipeIOStream, + ReadableIOStream, + WritableIOStream, +) +from benchkit.shell.command_execution.io.output import Output + + +class IOHook(ABC): + """basic interface that each hook needs to implement""" + def __init__(self,name:str): + self._output = PipeIOStream() + self.name=name + + @abstractmethod + def start_hook_function(self, input_stream: ReadableIOStream) -> None: + pass + + def get_outgoing_io_stream(self) -> ReadableIOStream: + return self._output + + +class IOWriterHook(IOHook): + """Hook that expects a function of the form Callable[[ReadableIOStream, PipeIOStream] + intended as a general purpouse stream manupulator""" + + def __init__(self, hook_function: Callable[[ReadableIOStream, PipeIOStream], None], name:Optional[str] = None): + self.hook_function = hook_function + if not name: + name = self.hook_function.__name__ + super().__init__(name) + + def start_hook_function(self, input_stream: ReadableIOStream) -> None: + # A process is spawned to keep the hookfunction running on the stream + p = Process( + target=self.hook_function, + args=(input_stream, self._output), + name=self.name, + ) + p.start() + + # Close the file descriptor of the main thread, the one from the process will still be alive + self._output.end_writing() + + +class IOReaderHook(IOHook): + + def __init__(self, hook_function: Callable[[ReadableIOStream], None], name:Optional[str] = None): + self.hook_function = hook_function + self._stream_duplicate = PipeIOStream() + if not name: + name = self.hook_function.__name__ + super().__init__(name) + + @staticmethod + def __pas_along_original_stream( + input_stream: ReadableIOStream, output1_stream: WritableIOStream, output2_stream: WritableIOStream + ): + data = input_stream.read(1) + while data: + output1_stream.write(data) + output2_stream.write(data) + data = input_stream.read(1) + + def start_hook_function(self, input_stream: ReadableIOStream) -> None: + + # A process is spawned to duplicate the input stream for the reading function + duplication_process = Process( + target=self.__pas_along_original_stream, + args=( + input_stream, + self._output, + self._stream_duplicate, + ), + name=self.name + " pasalong", + ) + + # A process is spawned to keep the hookfunction running on the stream + reader_hook_process = Process( + target=self.hook_function, + args=(self._stream_duplicate,), + name=self.name, + ) + + duplication_process.start() + reader_hook_process.start() + # Close the file descriptor of the main thread, the one from the process will still be alive + self._output.end_writing() + self._stream_duplicate.end_writing() + + +class IOResultHook(IOHook): + """Hook that expects a function of the form + Callable[[ReadableIOStream, PipeIOStream, Queue[Any]] + can be used as a writer hook with the added functionality of + being being able to use the queue as output""" + def __init__(self, hook_function: Callable[[ReadableIOStream, PipeIOStream, Queue[Any]], None], name:Optional[str] = None): + self.__queue: Queue[Any] = Queue() + self.hook_function = hook_function + if not name: + name = self.hook_function.__name__ + super().__init__(name) + + def start_hook_function(self, input_stream: ReadableIOStream) -> None: + p = Process( + target=self.hook_function, + args=(input_stream, self._output, self.__queue), + name=self.name, + ) + p.start() + + # Close the file descriptor of the main thread, the one from the process will still be alive + self._output.end_writing() + + def get_result(self) -> Any: + return self.__queue.get() + + +class OutputHook: + def __init__(self, std_out_hook: IOHook | None, std_err_hook: IOHook | None): + self._std_out_hook = std_out_hook + self._std_err_hook = std_err_hook + + def attatch(self, output: Output) -> Output: + """attatch the hooks to the IOStreams or pass them allong if there is no hook""" + std_out = output.std_out + std_err = output.std_err + if self._std_out_hook: + self._std_out_hook.start_hook_function(output.std_out) + std_out = self._std_out_hook.get_outgoing_io_stream() + if self._std_err_hook: + self._std_err_hook.start_hook_function(output.std_err) + std_err = self._std_err_hook.get_outgoing_io_stream() + return Output(std_out, std_err) + + +class MergeErrToOut(OutputHook): + def __init__(self) -> None: + self.std_out = PipeIOStream() + self._std_err_hook:IOWriterHook = IOWriterHook(self.__mergehookfunction) + self._std_out_hook:IOWriterHook = IOWriterHook(self.__mergehookfunction) + + def __mergehookfunction(self, input_object: ReadableIOStream, _: WritableIOStream): + outline = input_object.read_line() + while outline: + self.std_out.write(outline) + outline = input_object.read_line() + + def attatch(self, output: Output) -> Output: + self._std_err_hook.start_hook_function(output.std_out) + self._std_out_hook.start_hook_function(output.std_err) + self.std_out.end_writing() + + return Output(self.std_out, EmptyIOStream()) diff --git a/benchkit/shell/command_execution/io/output.py b/benchkit/shell/command_execution/io/output.py new file mode 100644 index 00000000..07f5c36b --- /dev/null +++ b/benchkit/shell/command_execution/io/output.py @@ -0,0 +1,28 @@ +# Copyright (C) 2024 Vrije Universiteit Brussel. All rights reserved. +# SPDX-License-Identifier: MIT + +from abc import ABC +from typing import IO + +from benchkit.shell.command_execution.io.stream import ( + EmptyIOStream, + ReadableIOStream, + PopenIOStream +) + +class Output(ABC): + """interface to communicate with command output on all platforms, + functions are due to compatibility""" + + def __init__(self, std_out: ReadableIOStream | None, std_err: ReadableIOStream | None): + if std_out is None: + std_out = EmptyIOStream() + self.std_out: ReadableIOStream = std_out + if std_err is None: + std_err = EmptyIOStream() + self.std_err: ReadableIOStream = std_err + + +def popen_get_output(out: IO[bytes] | None, err: IO[bytes] | None) -> Output: + """Helper function to convert popen handles to an Output""" + return Output(PopenIOStream(out) if out else None, PopenIOStream(err) if err else None) diff --git a/benchkit/shell/command_execution/io/stream.py b/benchkit/shell/command_execution/io/stream.py new file mode 100644 index 00000000..8efe51bc --- /dev/null +++ b/benchkit/shell/command_execution/io/stream.py @@ -0,0 +1,121 @@ +# Copyright (C) 2024 Vrije Universiteit Brussel. All rights reserved. +# SPDX-License-Identifier: MIT + +import os +from abc import ABC, abstractmethod +from typing import IO + + +class WritableIOStream(ABC): + """Interface to write any form of data that can be made + compatible by implementing write and end_writing""" + + @abstractmethod + def write(self, bytes_to_write: bytes) -> None: + """write bytes to the given IOStream needs to be implemented depending on what it is""" + + @abstractmethod + def end_writing(self) -> None: + """signal that the IOStream can be closed""" + + +class ReadableIOStream(ABC): + """interface to read from anny form of data that can be made + compatible by implementing _read_bytes""" + + def __init__(self) -> None: + self.__buffer: bytes = b"" + + @abstractmethod + def _read_bytes(self, amount_of_bytes: int) -> bytes: + pass + + def read(self, amount_of_bytes: int) -> bytes: + """reads at most amount_of_bytes from the available stdout + returns the current buffer before it attemts to read more bytes""" + if self.__buffer: + ret = self.__buffer + self.__buffer = b"" + return ret + return self._read_bytes(amount_of_bytes) + + def read_line(self) -> bytes: + """reads one line overflows into __buffer""" + new_byt = self.read(10) + byt = b'' + while new_byt: + byt += new_byt + sp = byt.split(b"\n", 1) + if len(sp) > 1: + self.__buffer = sp[1] + return sp[0] + b"\n" + new_byt = self.read(10) + return byt + + +class PopenIOStream(ReadableIOStream): + """Class that can interact with the stdout type objects given by Popen""" + def __init__(self, stream: IO[bytes]): + self.__stream = stream + super().__init__() + + def _read_bytes(self, amount_of_bytes: int) -> bytes: + if self.__stream: + return self.__stream.read(amount_of_bytes) + return b"s" + + +class StringIOStream(ReadableIOStream): + """Class to convert a string to an IOStream so they can be interchanged""" + def __init__(self, string: str, encoding: str = "utf-8"): + self.byte_string = string.encode(encoding) + self.length = len(self.byte_string) + self.index = 0 + super().__init__() + + def _read_bytes(self, amount_of_bytes: int): + if self.index + amount_of_bytes < self.length: + return_byte_string = self.byte_string[self.index : self.index + amount_of_bytes] + self.index += amount_of_bytes + return return_byte_string + else: + return_byte_string = self.byte_string[self.index :] + self.index = self.length + return return_byte_string + + +class EmptyIOStream(ReadableIOStream): + "Class to create an empty IOStream" + def __init__(self): + super().__init__() + + def _read_bytes(self, amount_of_bytes: int): + return b"" + + +class PipeIOStream(ReadableIOStream, WritableIOStream): + """A readable and writable IOStream that is used to communicate between hooks mostly""" + + def __init__(self) -> None: + self.reader, self.writer = os.pipe() + os.set_inheritable(self.reader, True) + os.set_inheritable(self.writer, True) + super().__init__() + + def write(self, bytes_to_write: bytes) -> None: + os.write(self.writer, bytes_to_write) + + def end_writing(self) -> None: + os.close(self.writer) + + def _read_bytes(self, amount_of_bytes: int) -> bytes: + return os.read(self.reader, amount_of_bytes) + + +def try_converting_bystring_to_readable_characters(bytestring: bytes) -> str | bytes: + """ function that will try to convert a bytestring to string + if it fails it will return back the bytestring so nothing is lost""" + try: + return bytestring.decode("utf-8") + except UnicodeDecodeError: + return bytestring diff --git a/benchkit/shell/shell.py b/benchkit/shell/shell.py index 468ffa29..1502fe30 100644 --- a/benchkit/shell/shell.py +++ b/benchkit/shell/shell.py @@ -4,13 +4,49 @@ Interactions with a shell. """ +import shlex +import signal import subprocess import sys -from typing import Iterable, Optional +from typing import Iterable, List, Optional +from benchkit.shell.command_execution.execute import execute_command +from benchkit.shell.command_execution.command_process import CommandProcess +from benchkit.shell.command_execution.io.hooks.basic_hooks import ( + logger_line_hook, + std_out_result_void_err, + void_hook, + void_input, +) +from benchkit.shell.command_execution.io.hooks.hook import ( + IOWriterHook, + MergeErrToOut, + OutputHook, +) +from benchkit.shell.command_execution.io.stream import ( + EmptyIOStream, + PipeIOStream, + ReadableIOStream, + StringIOStream, +) from benchkit.shell.utils import get_args, print_header from benchkit.utils.types import Command, Environment, PathType +USE_NEW_SHELL = True + +def split_on_pipe(sub_commands:List[str]): + full_list:List[List[str]] = [] + sub_command:List[str] = [] + for s in sub_commands: + if s == "|": + full_list.append(sub_command) + sub_command = [] + else: + sub_command.append(s) + full_list.append(sub_command) + return full_list + + def pipe_shell_out( command: Command, @@ -33,6 +69,70 @@ def pipe_shell_out( Returns: str: the output of the piped command. """ + if USE_NEW_SHELL: + # create a list for all processes that will be run + processes: List[CommandProcess] = [] + + # no input posible in the begining so we create an empty stream for the first command + input_stream: ReadableIOStream = EmptyIOStream() + + # create a hook for the last process to gather the result of the piped command + gather_result_object, gather_result_hook = std_out_result_void_err() + + # break our command in pieces in case there are pipes + command_split = shlex.split(command) if isinstance(command,str) else command + command_string = command_string = shlex.join(command_split) + commands = split_on_pipe(command_split) + + if print_command: + print(f"\033[32m[Full piped command | {command_string}]\033[0m") + + # enumerate all commands and start a process for each of them + for idx, com in enumerate(commands): + # break the command for the execution + command = shlex.split(com) if isinstance(com, str) else com + command_string = shlex.join(command) + + # TODO: this technicaly is not the deafault but it stays here for demonstration reasons for a bit + # log each command + output_hooks: List[OutputHook] = [] + log = logger_line_hook( + f"\033[34m[OUT | {command_string}]\033[0m" + " {}", + f"\033[91m[ERR | {command_string}]\033[0m" + " {}", + ) + output_hooks.append(log) + + if idx == len(commands) - 1: + # for the last command we merge err and out and gather it + output_hooks.append(MergeErrToOut()) + output_hooks.append(gather_result_hook) + else: + # for all other commands we void error and out will be used by next command + void_err = OutputHook(None,IOWriterHook(void_input)) + output_hooks.append(void_err) + + # print the command we are about to start + if print_command: + print(f"\033[32m[START | {command_string}]\033[0m") + process = execute_command( + command=command, + std_input=input_stream, + current_dir=current_dir, + ordered_output_hooks=output_hooks, + # If ignore_ret_codes is empty we swap it over to None instead + ignore_ret_codes=ignore_ret_codes if not any(True for _ in ignore_ret_codes) else None, + ) + # remember the process + processes.append(process) + + # Link the output stream to the next by making it the input stream + input_stream = process.get_output().std_out + + # wait for all procces to finish + for p in processes: + p.get_return_code() + + return gather_result_object.get_result().decode("utf-8") arguments = get_args(command) if print_command: print_header( @@ -143,6 +243,60 @@ def shell_out( Returns: str: the output of the shell command that completed successfully. """ + if USE_NEW_SHELL: + command = shlex.split(command) if isinstance(command, str) else command + command_string = shlex.join(command) + + # convert string input to an IOStream + std_input_io = StringIOStream(std_input) if std_input is not None else None + output_hooks: List[OutputHook] = [] + + # if we need to ignore any error code we just add all of them to the ignore list + # the overhead of doing this is minimal and it keeps the code cleaner + if ignore_any_error_code: + ignore_ret_codes = (x for x in range(256)) + + # add hook to log the output of the command + if output_is_log: + log = logger_line_hook( + f"\033[34m[OUT | {command_string}]\033[0m" + " {}", + f"\033[91m[ERR | {command_string}]\033[0m" + " {}", + ) + output_hooks.append(log) + + # Print the input string + if print_input: + print(f"\033[32m[START | {command_string}]\033[0m") + + # Original implementation considered the error to be part of the output, + # we merge them together here (done line wise) + merge = MergeErrToOut() + output_hooks.append(merge) + + # gather the entire stdout stream into a variable + output_hook_object, voiding_result_hook = std_out_result_void_err() + output_hooks.append(voiding_result_hook) + + # this will make sure we clear all our outputs in the end + # otherwise the command might block + output_hooks.append(void_hook()) + + process = execute_command( + command=command, + std_input=std_input_io, + current_dir=current_dir, + environment=environment, + timeout=timeout, + ordered_output_hooks=output_hooks, + ignore_ret_codes=ignore_ret_codes, + ) + # this line is here to check if the program failed + process.get_return_code() + + # decode to turn bytestream of into the desired string + # this can fail but is in line with original implementation + return output_hook_object.get_result().decode("utf-8") + arguments = get_args(command) print_header( arguments=arguments, @@ -255,48 +409,72 @@ def shell_interactive( print_file_shell_cmd: bool = True, ignore_ret_codes: Iterable[int] = (), ) -> None: - """ - Run a shell command that is interactive (with prompts, etc.). + if USE_NEW_SHELL: + # ok so, you can not exit this, + # you exit the shell you exit the benchmark - Args: - command (Command): - the command to run. - current_dir (Optional[PathType], optional): - directory where to run the command. If None, the current directory is used. - Defaults to None. - environment (Environment, optional): - environment variables to pass to the command. - Defaults to None. - shell (bool, optional): - whether to run the command in a shell environment (like "bash") or as a real command - given to "exec". - Defaults to False. - print_input (bool, optional): - whether to print the command. TODO should be renamed "print_command" - Defaults to True. - print_env (bool, optional): - whether to print the environment variables when they are defined. - Defaults to True. - print_curdir (bool, optional): - whether to print the current directory if provided. - Defaults to True. - print_shell_cmd (bool, optional): - whether to print the complete shell command, ready to be copy-pasted in a terminal. - Defaults to False. - print_file_shell_cmd (bool, optional): - whether to print the shell command in a log file (`/tmp/benchkit.sh`). - Defaults to True. - ignore_ret_codes (Iterable[int], optional): - collection of error return codes to ignore if they are triggered. - This allows to avoid an exception to be raised for commands that do not end with 0 even - if they are successful. - Defaults to (). + # TODO: workaround: we can add make custom hooks for this function that survive the interupt signal once + # using try: except Keyinterupt + # + + command = shlex.split(command) if isinstance(command, str) else command + + # convert string input to an IOStream + std_input_io = PipeIOStream() + output_hooks = [] + + # add hook to log the output of the command + log = logger_line_hook( + "> {}", + "! {}", + ) + output_hooks.append(log) + + output_hooks.append(void_hook()) + + # Print the input string + if print_input: + print(f"\033[32m[START | {shlex.join(command)}]\033[0m") + + process = execute_command( + command=command, + std_input=std_input_io, + current_dir=current_dir, + environment=environment, + ordered_output_hooks=output_hooks, + # If ignore_ret_codes is empty we swap it over to None instead + ignore_ret_codes=ignore_ret_codes if not any(True for _ in ignore_ret_codes) else None, + ) + + # We want that the first interupt signal + # goes to the process we are interacting with + original_sigint_handler = signal.getsignal(signal.SIGINT) + + # def signal_handler(sig, frame): + # process.signal(sig) + # sys.stdin.close() + # signal.signal(signal.SIGINT, original_sigint_handler) + + # signal.signal(signal.SIGINT, signal_handler) + + # use our stdin as the interaction for the process + + try: + outline = sys.stdin.read(1).encode("utf-8") + while outline: + std_input_io.write(outline) + outline = sys.stdin.read(1).encode("utf-8") + # The implementation of sigint will error above code + # This is intended as the exit method + except Exception: + pass + # except KeyboardInterrupt: + # pass + # Cleanly close the input file + std_input_io.endWriting() + + return None - Raises: - subprocess.CalledProcessError: - if the command exited with a non-zero exit code that is not ignored in - `ignore_ret_codes`. - """ arguments = get_args(command) print_header( arguments=arguments, diff --git a/tests/command_execution/configure.sh b/tests/command_execution/configure.sh new file mode 100755 index 00000000..63b6c293 --- /dev/null +++ b/tests/command_execution/configure.sh @@ -0,0 +1,4 @@ +#!/bin/sh +set -e + +../../scripts/install_venv.sh diff --git a/tests/command_execution/dependency-paths.txt b/tests/command_execution/dependency-paths.txt new file mode 100644 index 00000000..cca6f8a0 --- /dev/null +++ b/tests/command_execution/dependency-paths.txt @@ -0,0 +1,2 @@ +../../benchkit +./execute_command \ No newline at end of file diff --git a/tests/command_execution/execute_command/other_tests.py b/tests/command_execution/execute_command/other_tests.py new file mode 100644 index 00000000..fdda5e62 --- /dev/null +++ b/tests/command_execution/execute_command/other_tests.py @@ -0,0 +1,200 @@ +# Copyright (C) 2025 Vrije Universiteit Brussel. All rights reserved. +# SPDX-License-Identifier: MIT + +from __future__ import annotations # Otherwise Queue comlains about typing + +from multiprocessing import Queue +from pathlib import Path +import shlex +import subprocess +from time import sleep +from typing import Any + +from benchkit.shell.command_execution.io.hooks.basic_hooks import logger_line_hook, std_out_result_void_err, void_hook +from benchkit.shell.command_execution.io.hooks.hook import MergeErrToOut +from tests.command_execution.execute_command.util import script_path_string + +from benchkit.shell.command_execution.execute import execute_command + +from benchkit.shell.command_execution.io.stream import PipeIOStream, ReadableIOStream, StringIOStream, WritableIOStream +from benchkit.shell.shell import pipe_shell_out, shell_interactive, shell_out, split_on_pipe + + +def shell_test(): + command = [script_path_string("runForever")] + + log_ls = logger_line_hook( + f"\033[34m[OUT | ls]\033[0m" + " {}", + f"\033[91m[ERR | ls]\033[0m" + " {}", + ) + + outobj, outhook = std_out_result_void_err() + + merge = MergeErrToOut() + + ls_command = execute_command(command, + timeout=2, + ordered_output_hooks=[ + merge, + log_ls, + # outhook, + void_hook(), + ] + ) + try: + ls_command.get_return_code() + except: + sleep(5) + + + # log_ls = logger_line_hook( + # f"\033[34m[OUT | sudo]\033[0m" + " {}", + # f"\033[91m[ERR | sudo]\033[0m" + " {}", + # ) + # filewriterIO = write_to_file_hook(Path("/tmp/testfile.txt")) + # filewriter = OutputHook(filewriterIO,None) + + # a = execute_command( + # shlex.split("ssh aaronb@soft67.vub.ac.be 'sudo -S -k ls'"), + # ordered_input_hooks=[stream_prepend_hook(StringIOStream("123456789"))], + # ordered_output_hooks=[log_ls,MergeErrToOut(),filewriter,void_hook()] + # ) + + # a.get_return_code() + + # sleep(1) + + + # a = pipe_shell_out('sudo perf stat ls') + # print(a) + # a = pipe_shell_out([script_path_string("runForever"), '|', 'cat']) + # print(a) + + # shell_interactive("ssh aaronb@soft24.vub.ac.be 'sh'") + + +def testhalt(): + + # ------------------------------------------------------- + # pid hook example + # for end + # ------------------------------------------------------- + + def stdout_pid_result_filter(inputStream:ReadableIOStream,OutputStream:WritableIOStream,queue:Queue[Any]): + first_line = inputStream.read_line() + queue.put(first_line) + outline = inputStream.read(10) + while outline: + OutputStream.write(outline) + outline = inputStream.read(10) + + pid_stream_hook = IOResultHook(stdout_pid_result_filter) + + + pid_output_hook = OutputHook(pid_stream_hook,None) + + # ------------------------------------------------------- + # commands work + # first step + # get return codes/errors + # ------------------------------------------------------- + + # process_dir = execute_command(["mkdir", "test"],ordered_output_hooks=[void_hook()]) + # print(process_dir.get_return_code()) + + # execute_command(["command that does not exist"]) + + # wrong_retcode = execute_command(["cat", "wafel"], + # # success_value=1, + # # ignore_ret_codes=(1,), + # ordered_output_hooks=[void_hook()]) + # print(wrong_retcode.get_return_code()) + + # ------------------------------------------------------- + # ls + # -> does not work + # -> voidhook + # -> drawio + # -> would be nice to see what it is doing + # -> log_ls + # -> show + # -> drawio + # -> would be nice to get output + # -> output + # -> drawio + # -> move over to pipe cat + # ------------------------------------------------------- + + # command = ["ls"] + command = [script_path_string("runForever")] + + log_ls = logger_line_hook( + f"\033[34m[OUT | ls]\033[0m" + " {}", + f"\033[91m[ERR | ls]\033[0m" + " {}", + ) + + outobj, outhook = std_out_result_void_err() + + merge = MergeErrToOut() + + ls_command = execute_command(command, + ordered_output_hooks=[ + merge, + log_ls, + # outhook, + # void_hook(), + ] + ) + + # print(outobj.get_result()) + # print(ls_command.get_return_code()) + + ls_out_stream = ls_command.get_output().std_out + + log_cat = logger_line_hook( + f"\033[34m[OUT | cat]\033[0m" + " {}", + f"\033[91m[ERR | cat]\033[0m" + " {}", + ) + + # cat_command_string = ["cat"] + # cat_command_string = shlex.split("ssh aaronb@soft24.vub.ac.be 'cat'") + cat_command_string = shlex.split("ssh aaronb@soft24.vub.ac.be 'echo $$; cat'") + + cat_command = execute_command(cat_command_string, + std_input=ls_out_stream, + ordered_output_hooks=[ + pid_output_hook, + log_cat, + # outhook, + void_hook(), + ] + ) + + print(f'-------\n{pid_stream_hook.get_result()}\n----------') + + ls_command.get_return_code() + cat_command.get_return_code() + + + + + # these can not be reused + # log = logger_line_hook( + # f"\033[34m[OUT | ]\033[0m" + " {}", + # f"\033[91m[ERR | ]\033[0m" + " {}", + # ) + + # a = execute_command( + # shlex.split("ssh aaronb@soft24.vub.ac.be 'cd test; echo $$; exec sudo -S env varname=varvalue printenv varname'"), + # ordered_output_hooks=[log] + # ) + + + + # r = a.get_return_code() + # print(r) + + +if __name__ == "__main__": + # testhalt() + shell_test() diff --git a/tests/command_execution/execute_command/shell_scripts/fillErrThenOut.sh b/tests/command_execution/execute_command/shell_scripts/fillErrThenOut.sh new file mode 100755 index 00000000..69646dc3 --- /dev/null +++ b/tests/command_execution/execute_command/shell_scripts/fillErrThenOut.sh @@ -0,0 +1,10 @@ +#!/bin/bash + +x=1 + +while [ $x -le 3000 ]; +do + echo "std_out spam - $x - Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat." 1>&2; + (( x++ )) +done +echo "finished" \ No newline at end of file diff --git a/tests/command_execution/execute_command/shell_scripts/fillOutThenErr.sh b/tests/command_execution/execute_command/shell_scripts/fillOutThenErr.sh new file mode 100755 index 00000000..a995bb88 --- /dev/null +++ b/tests/command_execution/execute_command/shell_scripts/fillOutThenErr.sh @@ -0,0 +1,10 @@ +#!/bin/bash + +x=1 + +while [ $x -le 3000 ]; +do + echo "std_out spam - $x - Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat."; + (( x++ )) +done +echo "finished" 1>&2 \ No newline at end of file diff --git a/tests/command_execution/execute_command/shell_scripts/returnExitCode.sh b/tests/command_execution/execute_command/shell_scripts/returnExitCode.sh new file mode 100755 index 00000000..e53195ed --- /dev/null +++ b/tests/command_execution/execute_command/shell_scripts/returnExitCode.sh @@ -0,0 +1,7 @@ +#!/bin/bash + +read -s -p "" input + +echo "$input" + +exit $input \ No newline at end of file diff --git a/tests/command_execution/execute_command/shell_scripts/runForever.sh b/tests/command_execution/execute_command/shell_scripts/runForever.sh new file mode 100755 index 00000000..4ea637f1 --- /dev/null +++ b/tests/command_execution/execute_command/shell_scripts/runForever.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +while true ; do date -Ins;sleep 1; done \ No newline at end of file diff --git a/tests/command_execution/execute_command/shell_scripts/waitThenPrint.sh b/tests/command_execution/execute_command/shell_scripts/waitThenPrint.sh new file mode 100755 index 00000000..a7713767 --- /dev/null +++ b/tests/command_execution/execute_command/shell_scripts/waitThenPrint.sh @@ -0,0 +1 @@ +sleep 1; echo done sleeping \ No newline at end of file diff --git a/tests/command_execution/execute_command/shell_scripts/writeBack.sh b/tests/command_execution/execute_command/shell_scripts/writeBack.sh new file mode 100755 index 00000000..e4858341 --- /dev/null +++ b/tests/command_execution/execute_command/shell_scripts/writeBack.sh @@ -0,0 +1,5 @@ +#!/bin/bash + +read -s -p "" input + +echo "$input" \ No newline at end of file diff --git a/tests/command_execution/execute_command/test_execute_command_function.py b/tests/command_execution/execute_command/test_execute_command_function.py new file mode 100644 index 00000000..2a543152 --- /dev/null +++ b/tests/command_execution/execute_command/test_execute_command_function.py @@ -0,0 +1,306 @@ +import os +import pathlib +from subprocess import CalledProcessError +import unittest + +from benchkit.shell.command_execution.io.stream import StringIOStream +from tests.command_execution.execute_command.util import TestTimeout, generate_test_hook_lists, get_arguments_dict_list, script_path_string, timeout +from benchkit.shell.command_execution.execute import execute_command + +class FunctionalExecutionTests(unittest.TestCase): + + # @unittest.skip("disabled for debugging") + def test_echo(self) -> None: + """Basic tests to see if the command-line executes the command and can return output""" + + # standard arguments + arguments_list = get_arguments_dict_list( + { + 'current_dir':[None, pathlib.Path(__file__).parent.resolve(),pathlib.Path('/')], + 'environment':[None,{'test':'test'},{'a':'12','b':'11'},{}], + 'timeout':[None,1,10,99999999999], + 'ignore_ret_codes':[None,(1,),(1,7,),()], + } + ) + for arguments in arguments_list: + + # hook based argumens + hooklist = generate_test_hook_lists(force_output=True) + for input_hooks,output_hooks,result_hook_object in hooklist: + try: + # execution + with timeout(5): + execute_command( + ["echo", "benchkit_echo_test", str(arguments)], + ordered_output_hooks=output_hooks, + ordered_input_hooks=input_hooks, + **arguments, + ) + + # result gathering + output = result_hook_object.get_result() + expected_output = f"benchkit_echo_test {str(arguments)}\n".encode("utf-8") + self.assertEqual( + output, + expected_output, + "shell does not provide the right output in the result", + ) + except TestTimeout: + self.fail( + "execution timed out" + ) + + # @unittest.skip("disabled for debugging") + def test_environment(self) -> None: + """Test to see if the env of the command is correcly set to the given env""" + + # standard arguments + arguments_list = get_arguments_dict_list( + { + 'current_dir':[None, pathlib.Path(__file__).parent.resolve(),pathlib.Path('/')], + 'environment':[None,{'test':'test'},{'a':'12','b':'11'},{}], + 'timeout':[None,1,10,99999999999], + 'ignore_ret_codes':[None,(1,),(1,7,),()], + } + ) + for arguments in arguments_list: + + # hook based argumens + hooklist = generate_test_hook_lists(force_output=True) + for input_hooks,output_hooks,result_hook_object in hooklist: + try: + # execution + with timeout(5): + execute_command( + ["env"], + ordered_output_hooks=output_hooks, + ordered_input_hooks=input_hooks, + **arguments, + ) + env = result_hook_object.get_result() + if arguments['environment']: + res ="" + for key,val in arguments['environment'].items(): + res += (f'{key}={val}\n') + res_bytes = res.encode('utf-8') + self.assertEqual( + env, + res_bytes, + "shell does not have the right env", + ) + if arguments['environment'] == {} or arguments['environment'] is None: + self.assertEqual( + env, + b'', + "shell does not have the right env", + ) + except TestTimeout: + self.fail( + "execution timed out" + ) + + # @unittest.skip("disabled for debugging") + def test_dir(self) -> None: + """Test to see if the correct directory is used when running commands""" + def expected_full_path(path_lib:pathlib.Path): + expected_path = os.getcwd() + path = str(path_lib) + while True: + if path.startswith('/'): + return path + elif path.startswith('../'): + path = path.removeprefix('../') + expected_path = expected_path[:expected_path.rindex('/')] + elif path.startswith('..'): + path = path.removeprefix('..') + expected_path = expected_path[:expected_path.rindex('/')] + elif path.startswith('./'): + path = path.removeprefix('./') + elif path.startswith('.'): + path = path.removeprefix('.') + else: + if path == '': + return expected_path + expected_path = expected_path + "/" + path + return expected_path + + # standard arguments + arguments_list = get_arguments_dict_list( + { + 'current_dir':[None, pathlib.Path(__file__).parent.resolve(),pathlib.Path('/tmp'),pathlib.Path('./'),pathlib.Path('../../')], + 'environment':[None,{'test':'test'},{'a':'12','b':'11'},{}], + 'timeout':[None,1,10,99999999999], + 'ignore_ret_codes':[None,(1,),(1,7,),()], + } + ) + for arguments in arguments_list: + # hook based argumens + hooklist = generate_test_hook_lists(force_output=True) + for input_hooks,output_hooks,result_hook_object in hooklist: + try: + with timeout(5): + # execution + execute_command( + ["pwd"], + ordered_output_hooks=output_hooks, + ordered_input_hooks=input_hooks, + **arguments, + ) + if arguments['current_dir'] is None: + arguments['current_dir'] = './' + + curr_dir = result_hook_object.get_result() + expected_path = expected_full_path(arguments['current_dir']) + self.assertEqual( + curr_dir, + f'{expected_path}\n'.encode('utf-8'), + "the paths do not match" + ) + except TestTimeout: + self.fail( + "execution timed out" + ) + + + # @unittest.skip("disabled for debugging") + def test_input(self): + """testing the use of the std_input parameter""" + # standard arguments + arguments_list = get_arguments_dict_list( + { + 'current_dir':[None, pathlib.Path(__file__).parent.resolve(),pathlib.Path('/')], + 'environment':[None,{'test':'test'},{'a':'12','b':'11'},{}], + 'timeout':[None,1,10,99999999999], + 'ignore_ret_codes':[None,(1,),(1,7,),()], + } + ) + for arguments in arguments_list: + + # hook based argumens + hooklist = generate_test_hook_lists(force_output=True) + for input_hooks,output_hooks,result_hook_object in hooklist: + try: + # execution + with timeout(5): + execute_command( + [script_path_string("writeBack")], + std_input=StringIOStream(f"benchkit input test {str(arguments)}\n"), + ordered_input_hooks=input_hooks, + ordered_output_hooks=output_hooks, + **arguments, + ) + out = result_hook_object.get_result() + + self.assertEqual( + out, + f"benchkit input test {str(arguments)}\n".encode('utf-8'), + f"recieved{out}", + ) + except TestTimeout: + self.fail( + "execution timed out" + ) + + + # @unittest.skip("disabled for debugging") + def test_ignore_ret_codes_are_ignored(self) -> None: + # standard arguments + arguments_list = get_arguments_dict_list( + { + 'current_dir':[None, pathlib.Path(__file__).parent.resolve(),pathlib.Path('/')], + 'environment':[None,{'test':'test'},{'a':'12','b':'11'},{}], + 'timeout':[None,1,10,99999999999], + "ignore_ret_codes": [ + (), + (1,4,), + (2, 7,), + (4,5,), + (63,), + (0,), + ], + } + ) + for arguments in arguments_list: + # hook based argumens + hooklist = generate_test_hook_lists() + for input_hooks,output_hooks,_ in hooklist: + + try: + with timeout(20): + retcode_to_output = ( + arguments["ignore_ret_codes"][len(arguments["ignore_ret_codes"]) - 1 % 3] + if len(arguments["ignore_ret_codes"]) > 0 + else 0 + ) + command_process = execute_command( + [script_path_string("returnExitCode")], + ordered_output_hooks=output_hooks, + ordered_input_hooks=input_hooks, + **arguments, + std_input=StringIOStream(f"{retcode_to_output}\n"), + ) + retcode = command_process.get_return_code() + self.assertEqual(retcode, + retcode_to_output, + "Mistake in the test wrong return code was returned by programm") + except TestTimeout: + self.fail( + "execution timed out" + ) + except CalledProcessError: + self.fail( + f"process trew an error with retcode {retcode_to_output} and ignored list {arguments['ignore_ret_codes']}" + ) + + # @unittest.skip("disabled for debugging") + def test_ignore_ret_codes_dont_ignore_other(self) -> None: + # standard arguments + arguments_list = get_arguments_dict_list( + { + 'current_dir':[None, pathlib.Path(__file__).parent.resolve(),pathlib.Path('/')], + 'environment':[None,{'test':'test'},{'a':'12','b':'11'},{}], + 'timeout':[None,1,10,99999999999], + "ignore_ret_codes": [ + (), + (2, 7,), + (63,), + (0,), + ], + "success_value": [0, 1, 53, 19], + } + ) + for arguments in arguments_list: + # hook based argumens + hooklist = generate_test_hook_lists() + for input_hooks,output_hooks,_ in hooklist: + try: + retcode_to_output = ( + 3 + + arguments["success_value"] + + ( + arguments["ignore_ret_codes"][len(arguments["ignore_ret_codes"]) - 1 % 3] + if len(arguments["ignore_ret_codes"]) > 0 + else arguments["success_value"] + ) + ) + with self.assertRaises(CalledProcessError, + msg=f"process did not trow an error with retcode {retcode_to_output}, ignored list {arguments['ignore_ret_codes']} and succes value {arguments['success_value']}"): + with timeout(20): + command_process = execute_command( + [script_path_string("returnExitCode")], + ordered_output_hooks=output_hooks, + ordered_input_hooks=input_hooks, + **arguments, + std_input=StringIOStream(f"{retcode_to_output}\n"), + ) + retcode = command_process.get_return_code() + self.assertEqual(retcode, + retcode_to_output, + "Mistake in the test wrong return code was returned by programm") + except TestTimeout: + self.fail( + "execution timed out" + ) + +if __name__ == "__main__": + unittest.main() diff --git a/tests/command_execution/execute_command/test_execute_command_runtime.py b/tests/command_execution/execute_command/test_execute_command_runtime.py new file mode 100644 index 00000000..ce95f99a --- /dev/null +++ b/tests/command_execution/execute_command/test_execute_command_runtime.py @@ -0,0 +1,118 @@ + + +import pathlib +from subprocess import TimeoutExpired +import tracemalloc +import unittest +from benchkit.shell.command_execution.execute import execute_command +from tests.command_execution.execute_command.util import TestTimeout, generate_test_hook_lists, get_arguments_dict_list, script_path_string, timeout + +tracemalloc.start() + +class RuntimeExecutionTests(unittest.TestCase): + + # @unittest.skip("disabled for debugging") + def test_timeout(self) -> None: + """test to see if the command times out after the given time period""" + + # standard arguments + arguments_list = get_arguments_dict_list( + { + 'current_dir':[None, pathlib.Path(__file__).parent.resolve(),pathlib.Path('/')], + 'environment':[None,{'test':'test'},{'a':'12','b':'11'},{}], + 'timeout':[2], + 'ignore_ret_codes':[None,(1,),(1,7,),()], + } + ) + for arguments in arguments_list: + # hook based argumens + hooklist = generate_test_hook_lists() + for input_hooks,output_hooks,_ in hooklist: + try: + # execution + with timeout(20): + with self.assertRaises(TimeoutExpired): + p = execute_command( + [script_path_string("runForever")], + ordered_output_hooks=output_hooks, + ordered_input_hooks=input_hooks, + **arguments, + ) + + # result gathering + p.get_return_code() + + except TestTimeout: + self.fail( + "execution timed out, but not by the timeout argument" + ) + + # @unittest.skip("disabled for debugging") + def test_fill_std_err(self) -> None: + """test to see if the command times out after the given time period""" + + # standard arguments + arguments_list = get_arguments_dict_list( + { + 'current_dir':[None, pathlib.Path(__file__).parent.resolve(),pathlib.Path('/')], + 'environment':[None,{'test':'test'},{'a':'12','b':'11'},{}], + 'timeout':[None,20,99999999999], + 'ignore_ret_codes':[None,(1,),(1,7,),()], + } + ) + for arguments in arguments_list: + # hook based argumens + hooklist = generate_test_hook_lists() + for input_hooks,output_hooks,_ in hooklist: + try: + # execution + with timeout(22): + p = execute_command( + [script_path_string("fillOutThenErr")], + ordered_output_hooks=output_hooks, + ordered_input_hooks=input_hooks, + **arguments, + ) + + # result gathering + p.get_return_code() + + except TestTimeout: + self.fail( + "execution timed out" + ) + + # @unittest.skip("disabled for debugging") + def test_fill_std_out(self) -> None: + """test to see if the command times out after the given time period""" + + # standard arguments + arguments_list = get_arguments_dict_list( + { + 'current_dir':[None, pathlib.Path(__file__).parent.resolve(),pathlib.Path('/')], + 'environment':[None,{'test':'test'},{'a':'12','b':'11'},{}], + 'timeout':[None,20,99999999999], + 'ignore_ret_codes':[None,(1,),(1,7,),()], + } + ) + for arguments in arguments_list: + # hook based argumens + hooklist = generate_test_hook_lists() + for input_hooks,output_hooks,_ in hooklist: + try: + # execution + with timeout(22): + p = execute_command( + [script_path_string("fillErrThenOut")], + ordered_output_hooks=output_hooks, + ordered_input_hooks=input_hooks, + **arguments, + ) + + # result gathering + p.get_return_code() + + except TestTimeout: + self.fail( + "execution timed out" + ) diff --git a/tests/command_execution/execute_command/util.py b/tests/command_execution/execute_command/util.py new file mode 100644 index 00000000..05d4a054 --- /dev/null +++ b/tests/command_execution/execute_command/util.py @@ -0,0 +1,114 @@ +# Copyright (C) 2025 Vrije Universiteit Brussel. All rights reserved. +# SPDX-License-Identifier: MIT + +import itertools +import pathlib +import signal +from typing import Any, Dict, List, Optional, Tuple + +from benchkit.shell.command_execution.io.stream import ReadableIOStream, WritableIOStream +from benchkit.shell.command_execution.io.hooks.basic_hooks import create_stream_line_logger_hook, create_voiding_result_hook, logger_line_hook, void_hook +from benchkit.shell.command_execution.io.hooks.hook import IOHook, IOResultHook, IOWriterHook, OutputHook + + +class TestTimeout(Exception): + pass + + +class timeout: + def __init__(self, seconds:int, error_message:Optional[str]=None): + if error_message is None: + error_message = "test timed out after {}s.".format(seconds) + self.seconds = seconds + self.error_message = error_message + + def handle_timeout(self, signum, frame): + raise TestTimeout(self.error_message) + + def __enter__(self): + signal.signal(signal.SIGALRM, self.handle_timeout) + signal.alarm(self.seconds) + + def __exit__(self, exc_type, exc_val, exc_tb): + signal.alarm(0) + + +def script_path_string(script_name: str): + folder = pathlib.Path(__file__).parent.resolve() + print(folder) + return str(folder / f"./shell_scripts/{script_name}.sh") + +def get_arguments_dict_list( + overwrite_arguments_dict: Dict[str, Any] | None = None, +): + if overwrite_arguments_dict is None: + overwrite_arguments_dict = {} + + arguments_dict:Dict[str,Any] = {} + + for argument_key in overwrite_arguments_dict: + arguments_dict[argument_key] = overwrite_arguments_dict[argument_key] + + keys:List[str] = [] + arguments:List[Any] = [] + + for key, arugments in arguments_dict.items(): + keys.append(key) + arguments += [arugments] + argument_permutations = itertools.product(*arguments) + result_list:List[Dict[str,Any]] = [] + for argument_permutation in list(argument_permutations): + result_list.append(dict(zip(keys, argument_permutation))) + return result_list + +def generate_test_hook_lists(force_output:bool=False,dont_void_output:bool=False) -> List[Tuple[List[IOHook],List[OutputHook],Optional[IOResultHook]]]: + + def useless_func(input_object: ReadableIOStream, output_object: WritableIOStream): + outline = input_object.read(10) + while outline: + output_object.write(outline) + outline = input_object.read(10) + + def gen_useless_input(): + return IOWriterHook(useless_func) + + def gen_logging_input(): + return create_stream_line_logger_hook("log_input" + " {}") + + def gen_useless_output(): + return OutputHook(IOWriterHook(useless_func),IOWriterHook(useless_func)) + + def gen_logging_output(): + return logger_line_hook( + "\033[34m[OUT | ]\033[0m" + " {}", + "\033[91m[ERR | ]\033[0m" + " {}", + ) + + def gen_result_output(): + output_hook_object = create_voiding_result_hook() + voiding_result_hook = OutputHook(output_hook_object,None) + return voiding_result_hook, output_hook_object + + hooklist:List[Tuple[List[IOHook],List[OutputHook],Optional[IOResultHook]]] = [] + + for option in itertools.product([True,False],[True,False],[True,False],[True,False],[True,False]): + output_hooks:List[OutputHook] = [] + input_hooks:List[IOHook] = [] + output_object = None + if option[0]: + input_hooks.append(gen_useless_input()) + if option[1]: + input_hooks.append(gen_logging_input()) + if option[2]: + output_hooks.append(gen_useless_output()) + if option[3]: + output_hooks.append(gen_logging_output()) + if option[4] or force_output: + hook, obj = gen_result_output() + output_object = obj + output_hooks.append(hook) + if not dont_void_output: + output_hooks.append(void_hook()) + + hooklist.append((input_hooks, output_hooks, output_object)) + return hooklist \ No newline at end of file diff --git a/tests/command_execution/readme.md b/tests/command_execution/readme.md new file mode 100644 index 00000000..b2f21a57 --- /dev/null +++ b/tests/command_execution/readme.md @@ -0,0 +1,16 @@ +# This folder contains the files for testing the ast-shell implementation of benchkit + +## Configure venv + +There is a `configure.sh` script. If run from this directory it wil create a working venv that can be started using `. ./venv/bin/activate` + +## Running the tests + +Run `python -m unittest ast_shell -v` in this folder to run the current tests. +It will spam the terminal to make it not do this add `-b` to the command. + +other_tests.py is a file containing some personal testing code and will be converted to proper test cases at a later date + +## Todo + +- Smaller unit tests for the internal structures \ No newline at end of file