diff --git a/tests/recipes/test_recipes.py b/tests/recipes/test_recipes.py index 0d6851b0..0fe05724 100644 --- a/tests/recipes/test_recipes.py +++ b/tests/recipes/test_recipes.py @@ -28,7 +28,9 @@ class TestRecipes: - recipe_class_list = [SeismologyRecipe, + recipe_class_list = [ + GenomeRecipe, + SeismologyRecipe, MontageRecipe, RnaseqRecipe, BwaRecipe, @@ -48,7 +50,9 @@ def test_recipes(self, recipe_class) -> None: recipe = recipe_class.from_num_tasks(num_tasks=200, runtime_factor=1.1, input_file_size_factor=1.5, output_file_size_factor=0.8) - workflow = WorkflowGenerator(recipe).build_workflow() + workflows = WorkflowGenerator(recipe).build_workflows(1) + assert len(workflows) == 1 + @pytest.mark.unit diff --git a/tests/test_helpers.py b/tests/test_helpers.py new file mode 100644 index 00000000..18d0983f --- /dev/null +++ b/tests/test_helpers.py @@ -0,0 +1,91 @@ +import pathlib +import shutil +import tarfile +import os +import io +import sys +import docker +from docker.errors import ImageNotFound + +def _create_fresh_local_dir(path: str) -> pathlib.Path: + dirpath = pathlib.Path(path) + if dirpath.exists(): + shutil.rmtree(dirpath) + dirpath.mkdir(parents=True, exist_ok=True) + return dirpath + +def _remove_local_dir_if_it_exists(path: str) -> None: + dirpath = pathlib.Path(path) + if dirpath.exists(): + shutil.rmtree(dirpath) + + +def _make_tarfile_of_wfcommons(): + source_dir = os.getcwd() # This assumes the testing is run from the root + tar_stream = io.BytesIO() + with tarfile.open(fileobj=tar_stream, mode='w') as tar: + tar.add(source_dir, arcname=os.path.basename(source_dir)) + tar_stream.seek(0) + return tar_stream + +def _install_WfCommons_on_container(container): + # sys.stderr.write("Installing WfCommons on the container...\n") + # Copy the WfCommons code to it (removing stuff that should be removed) + target_path = '/tmp/' # inside container + tar_data = _make_tarfile_of_wfcommons() + container.put_archive(target_path, tar_data) + exit_code, output = container.exec_run("sudo /bin/rm -rf /tmp/WfCommons/build/", stdout=True, stderr=True) + exit_code, output = container.exec_run("sudo /bin/rm -rf /tmp/WfCommons/*.egg-info/", stdout=True, stderr=True) + exit_code, output = container.exec_run("sudo /bin/rm -rf /tmp/WfCommons/bin/cpu-benchmark.o", stdout=True, + stderr=True) + exit_code, output = container.exec_run("sudo /bin/rm -rf /tmp/WfCommons/bin/cpu-benchmark", stdout=True, + stderr=True) + + # Install WfCommons on the container (to install wfbench and cpu-benchmark really) + exit_code, output = container.exec_run("sudo python3 -m pip install . --break-system-packages", + workdir="/tmp/WfCommons", stdout=True, stderr=True) + +def _start_docker_container(backend, mounted_dir, working_dir, bin_dir, command=None): + if command is None: + command = ["sleep", "infinity"] + # Pulling the Docker image + client = docker.from_env() + image_name = f"wfcommons/wfcommons-testing-{backend}" + + try: + image = client.images.get(image_name) + sys.stderr.write(f"Image '{image_name}' is available locally\n") + except ImageNotFound: + sys.stderr.write(f"Pulling image '{image_name}'...\n") + client.images.pull(image_name) + + # Launch the docker container to actually run the translated workflow + sys.stderr.write("Starting Docker container...\n") + container = client.containers.run( + image=image_name, + command=command, + volumes={mounted_dir: {'bind': mounted_dir, 'mode': 'rw'}}, + working_dir=working_dir, + tty=True, + detach=True + ) + + # Installing WfCommons on container + _install_WfCommons_on_container(container) + + # Copy over the wfbench and cpu-benchmark executables to where they should go on the container + if bin_dir: + exit_code, output = container.exec_run(["sh", "-c", "sudo cp -f `which wfbench` " + bin_dir], + stdout=True, stderr=True) + exit_code, output = container.exec_run(["sh", "-c", "sudo cp -f `which cpu-benchmark` " + bin_dir], + stdout=True, stderr=True) + + return container + +def _get_total_size_of_directory(directory_path: str): + total_size = 0 + for dirpath, dirnames, filenames in os.walk(directory_path): + for filename in filenames: + filepath = os.path.join(dirpath, filename) + total_size += os.path.getsize(filepath) + return total_size \ No newline at end of file diff --git a/tests/translators/test_translators.py b/tests/translators/test_translators.py index 145cab94..895d51d1 100644 --- a/tests/translators/test_translators.py +++ b/tests/translators/test_translators.py @@ -11,15 +11,13 @@ import pathlib import pytest import shutil -import docker -from docker.errors import ImageNotFound -import io -import tarfile -import os import sys import json import time +from tests.test_helpers import _create_fresh_local_dir +from tests.test_helpers import _remove_local_dir_if_it_exists +from tests.test_helpers import _start_docker_container from wfcommons import BlastRecipe from wfcommons.wfbench import WorkflowBenchmark from wfcommons.wfbench import DaskTranslator @@ -33,67 +31,67 @@ from wfcommons.wfinstances import PegasusLogsParser - - -def _start_docker_container(backend, mounted_dir, working_dir, bin_dir, command=["sleep", "infinity"]): - # Pulling the Docker image - client = docker.from_env() - image_name = f"wfcommons/wfcommons-testing-{backend}" - - try: - image = client.images.get(image_name) - sys.stderr.write(f"Image '{image_name}' is available locally\n") - except ImageNotFound: - sys.stderr.write(f"Pulling image '{image_name}'...\n") - client.images.pull(image_name) - - # Launch the docker container to actually run the translated workflow - sys.stderr.write("Starting Docker container...\n") - container = client.containers.run( - image_name, - command=command, - volumes={mounted_dir: {'bind': mounted_dir, 'mode': 'rw'}}, - working_dir=working_dir, - tty=True, - detach=True - ) - - # Installing WfCommons on container - _install_WfCommons_on_container(container) - - # Copy over the wfbench and cpu-benchmark executables to where they should go on the container - if bin_dir: - exit_code, output = container.exec_run(["sh", "-c", "sudo cp -f `which wfbench` " + bin_dir], - stdout=True, stderr=True) - exit_code, output = container.exec_run(["sh", "-c", "sudo cp -f `which cpu-benchmark` " + bin_dir], - stdout=True, stderr=True) - - return container - -def _make_tarfile_of_wfcommons(): - source_dir = os.getcwd() # This assumes the testing is run from the root - tar_stream = io.BytesIO() - with tarfile.open(fileobj=tar_stream, mode='w') as tar: - tar.add(source_dir, arcname=os.path.basename(source_dir)) - tar_stream.seek(0) - return tar_stream - -def _install_WfCommons_on_container(container): - # sys.stderr.write("Installing WfCommons on the container...\n") - # Copy the WfCommons code to it (removing stuff that should be removed) - target_path = '/tmp/' # inside container - tar_data = _make_tarfile_of_wfcommons() - container.put_archive(target_path, tar_data) - exit_code, output = container.exec_run("sudo /bin/rm -rf /tmp/WfCommons/build/", stdout=True, stderr=True) - exit_code, output = container.exec_run("sudo /bin/rm -rf /tmp/WfCommons/*.egg-info/", stdout=True, stderr=True) - exit_code, output = container.exec_run("sudo /bin/rm -rf /tmp/WfCommons/bin/cpu-benchmark.o", stdout=True, - stderr=True) - exit_code, output = container.exec_run("sudo /bin/rm -rf /tmp/WfCommons/bin/cpu-benchmark", stdout=True, - stderr=True) - - # Install WfCommons on the container (to install wfbench and cpu-benchmark really) - exit_code, output = container.exec_run("sudo python3 -m pip install . --break-system-packages", - workdir="/tmp/WfCommons", stdout=True, stderr=True) +# def _start_docker_container(backend, mounted_dir, working_dir, bin_dir, command=None): +# if command is None: +# command = ["sleep", "infinity"] +# # Pulling the Docker image +# client = docker.from_env() +# image_name = f"wfcommons/wfcommons-testing-{backend}" +# +# try: +# image = client.images.get(image_name) +# sys.stderr.write(f"Image '{image_name}' is available locally\n") +# except ImageNotFound: +# sys.stderr.write(f"Pulling image '{image_name}'...\n") +# client.images.pull(image_name) +# +# # Launch the docker container to actually run the translated workflow +# sys.stderr.write("Starting Docker container...\n") +# container = client.containers.run( +# image=image_name, +# command=command, +# volumes={mounted_dir: {'bind': mounted_dir, 'mode': 'rw'}}, +# working_dir=working_dir, +# tty=True, +# detach=True +# ) +# +# # Installing WfCommons on container +# _install_WfCommons_on_container(container) +# +# # Copy over the wfbench and cpu-benchmark executables to where they should go on the container +# if bin_dir: +# exit_code, output = container.exec_run(["sh", "-c", "sudo cp -f `which wfbench` " + bin_dir], +# stdout=True, stderr=True) +# exit_code, output = container.exec_run(["sh", "-c", "sudo cp -f `which cpu-benchmark` " + bin_dir], +# stdout=True, stderr=True) +# +# return container + +# def _make_tarfile_of_wfcommons(): +# source_dir = os.getcwd() # This assumes the testing is run from the root +# tar_stream = io.BytesIO() +# with tarfile.open(fileobj=tar_stream, mode='w') as tar: +# tar.add(source_dir, arcname=os.path.basename(source_dir)) +# tar_stream.seek(0) +# return tar_stream +# +# def _install_WfCommons_on_container(container): +# # sys.stderr.write("Installing WfCommons on the container...\n") +# # Copy the WfCommons code to it (removing stuff that should be removed) +# target_path = '/tmp/' # inside container +# tar_data = _make_tarfile_of_wfcommons() +# container.put_archive(target_path, tar_data) +# exit_code, output = container.exec_run("sudo /bin/rm -rf /tmp/WfCommons/build/", stdout=True, stderr=True) +# exit_code, output = container.exec_run("sudo /bin/rm -rf /tmp/WfCommons/*.egg-info/", stdout=True, stderr=True) +# exit_code, output = container.exec_run("sudo /bin/rm -rf /tmp/WfCommons/bin/cpu-benchmark.o", stdout=True, +# stderr=True) +# exit_code, output = container.exec_run("sudo /bin/rm -rf /tmp/WfCommons/bin/cpu-benchmark", stdout=True, +# stderr=True) +# +# # Install WfCommons on the container (to install wfbench and cpu-benchmark really) +# exit_code, output = container.exec_run("sudo python3 -m pip install . --break-system-packages", +# workdir="/tmp/WfCommons", stdout=True, stderr=True) def _create_workflow_benchmark(): # Create a workflow benchmark object to generate specifications based on a recipe (in /tmp/, whatever) @@ -281,8 +279,8 @@ def test_translator(self, backend) -> None: # Create a local translation directory str_dirpath = "/tmp/" + backend + "_translated_workflow/" dirpath = pathlib.Path(str_dirpath) - if dirpath.exists(): - shutil.rmtree(dirpath) + # dirpath = _create_fresh_local_dir(str_dirpath) + _remove_local_dir_if_it_exists(str_dirpath) # Perform the translation sys.stderr.write("\nTranslating workflow...\n") diff --git a/tests/unit/common/test_workflow.py b/tests/unit/common/test_workflow.py index cc1e5d0e..257acd86 100644 --- a/tests/unit/common/test_workflow.py +++ b/tests/unit/common/test_workflow.py @@ -116,6 +116,11 @@ def test_workflow_json_generation(self): # Create an instance from the JSON File and write it back to a JSON instance = Instance(pathlib.Path("/tmp") / local_file_name) + + # Testing the "iterator" capability + assert(len(list(instance)) == len(instance.instance["workflow"]["specification"]["tasks"])) + + # Test writing to JSON instance.workflow.write_json(pathlib.Path("/tmp/written_workflow.json")) # Get the two jsons as objects diff --git a/tests/wfbench/test_wfbench.py b/tests/wfbench/test_wfbench.py index b45fb17f..6de238bd 100644 --- a/tests/wfbench/test_wfbench.py +++ b/tests/wfbench/test_wfbench.py @@ -11,117 +11,162 @@ import pytest import pathlib -import shutil +import sys import json +from tests.test_helpers import _create_fresh_local_dir +from tests.test_helpers import _start_docker_container +from tests.test_helpers import _remove_local_dir_if_it_exists +from tests.test_helpers import _get_total_size_of_directory from wfcommons import BlastRecipe from wfcommons.common import Workflow -from wfcommons.wfbench import WorkflowBenchmark - +from wfcommons.wfbench import WorkflowBenchmark, BashTranslator + +def _directory_content_as_expected(dirpath: pathlib.Path, + workflow: Workflow, + num_tasks: int, + cpu_work: int, + percent_cpu: float): + workflow_file_name = f"{workflow.name.lower()}-{num_tasks}.json" + workflow_file_is_there = (dirpath / workflow_file_name).exists() + to_create_file_is_there = (dirpath / "to_create.txt").exists() + return workflow_file_is_there and to_create_file_is_there + + +def _workflow_as_expected(dirpath: pathlib.Path, + workflow: Workflow, + num_tasks: int, + cpu_work: int, + percent_cpu: float): + # Get the generated JSON + json_path = dirpath / f"{workflow.name.lower()}-{num_tasks}.json" + with json_path.open("r") as f: + generated_json = json.load(f) + + # Check the number of tasks + assert(len(workflow.tasks) == len(generated_json['workflow']['specification']['tasks'])) + + # For each task check sanity + for generated_task in generated_json['workflow']['specification']['tasks']: + assert(generated_task['id'] in workflow.tasks) + workflow_task = workflow.tasks[generated_task['id']] + # Check input files + assert(len(generated_task['inputFiles']) == len(workflow_task.input_files)) + for file in workflow_task.input_files: + assert(file.file_id in generated_task['inputFiles']) + # Check output files + assert(len(generated_task['outputFiles']) == len(workflow_task.output_files)) + for file in workflow_task.input_files: + assert(file.file_id in generated_task['inputFiles']) + + # TODO: Implement more sanity checks + + return True + + +def _to_create_file_as_expected(dirpath: pathlib.Path, + workflow: Workflow, + num_tasks: int, + cpu_work: int, + percent_cpu: float): + + # Build a set of output files + outputfile_set = set({}) + for task_name in workflow.tasks: + for f in workflow.tasks[task_name].output_files: + outputfile_set.add(f.file_id) + + # Build a dict of input files that must be created files with sizes + inputfile_dict = dict({}) + for task_name in workflow.tasks: + for f in workflow.tasks[task_name].input_files: + if f.file_id not in outputfile_set: + inputfile_dict[f.file_id] = f.size + + # Open the "to_create.txt" file and go line by line and check names/sizes + json_path = dirpath / "to_create.txt" + with json_path.open("r") as f: + for line in f.readlines(): + [filename, size] = line.strip().split(" ") + assert(filename in inputfile_dict) + assert(inputfile_dict[filename] == int(size)) + + return True + +def _actual_data_files_as_expected(dirpath: pathlib.Path, + workflow: Workflow, + data_spec): + # Inspect the data footprint generated + if isinstance(data_spec, int): + total_bytes = _get_total_size_of_directory(str(dirpath / "data")) + sys.stderr.write(f"Total observed data footprint is {total_bytes} bytes\n") + assert (abs(data_spec * 1000 * 1000 - total_bytes) < 100) + else: + sys.stderr.write(f"Unsupported data spec FOR NOW\n") + + # Inspect the input/output files existence + for task in workflow.tasks.values(): + for f in task.input_files: + filename = f.file_id.split("/")[-1] + assert (pathlib.Path.exists(dirpath / "data" / filename)) + for f in task.output_files: + filename = f.file_id.split("/")[-1] + assert (pathlib.Path.exists(dirpath / "data" / filename)) class TestWfBench: @pytest.mark.unit + # @pytest.mark.skip def test_create_from_recipe(self) -> None: """ Very minimal testing here for creating from recipe """ # create a workflow benchmark object to generate specifications based on a recipe - benchmark = WorkflowBenchmark(recipe=BlastRecipe, num_tasks=500) + benchmark = WorkflowBenchmark(recipe=BlastRecipe, num_tasks=49) + + # Create the data_specification options + fixed_total_footprint_in_mb = 5 + # TODO: This seems really broken right now + # per_type_footprint = {} + # for task_type in ["blastall", "split_fasta", None]: + # per_type_footprint[task_type] = "1" # string??? + + for data_spec in [fixed_total_footprint_in_mb]: + benchmark.create_benchmark(_create_fresh_local_dir(f"/tmp/benchmark"), cpu_work=1, data=data_spec, percent_cpu=0.6) + + # Run the benchmark with the Bash translator + # Create a local translation directory + str_dirpath = "/tmp/bash_translated_benchmark/" + _remove_local_dir_if_it_exists(str_dirpath) + dirpath = pathlib.Path(str_dirpath) + + # Perform the translation + sys.stderr.write("\nTranslating workflow...\n") + translator = BashTranslator(benchmark.workflow) + translator.translate(output_folder=dirpath) + + # Start the Docker container + sys.stderr.write("Starting Docker container...\n") + container = _start_docker_container("bash", str_dirpath, str_dirpath, str_dirpath + "bin/") + + # Run the workflow + sys.stderr.write("Running workflow...\n") + exit_code, output = container.exec_run(cmd="/bin/bash ./run_workflow.sh", stdout=True, stderr=True) + + # Kill the container + container.remove(force=True) - # generate a specification based on performance characteristics - dirpath = pathlib.Path("/tmp/benchmark/") - if dirpath.exists(): - shutil.rmtree(dirpath) - dirpath.mkdir(parents=True, exist_ok=True) - path = benchmark.create_benchmark(dirpath, cpu_work=100, data=10, percent_cpu=0.6) - - - @staticmethod - def _directory_content_as_expected(dirpath: pathlib.Path, - workflow: Workflow, - num_tasks: int, - cpu_work: int, - percent_cpu: float): - workflow_file_name = f"{workflow.name.lower()}-{num_tasks}.json" - workflow_file_is_there = (dirpath / workflow_file_name).exists() - to_create_file_is_there = (dirpath / "to_create.txt").exists() - return workflow_file_is_there and to_create_file_is_there - - - @staticmethod - def _workflow_as_expected(dirpath: pathlib.Path, - workflow: Workflow, - num_tasks: int, - cpu_work: int, - percent_cpu: float): - # Get the generated JSON - json_path = dirpath / f"{workflow.name.lower()}-{num_tasks}.json" - with json_path.open("r") as f: - generated_json = json.load(f) - - # Check the number of tasks - assert(len(workflow.tasks) == len(generated_json['workflow']['specification']['tasks'])) - - # For each task check sanity - for generated_task in generated_json['workflow']['specification']['tasks']: - assert(generated_task['id'] in workflow.tasks) - workflow_task = workflow.tasks[generated_task['id']] - # Check input files - assert(len(generated_task['inputFiles']) == len(workflow_task.input_files)) - for file in workflow_task.input_files: - assert(file.file_id in generated_task['inputFiles']) - # Check output files - assert(len(generated_task['outputFiles']) == len(workflow_task.output_files)) - for file in workflow_task.input_files: - assert(file.file_id in generated_task['inputFiles']) - - # TODO: Implement more sanity checks - - return True - - - @staticmethod - def _to_create_file_as_expected(dirpath: pathlib.Path, - workflow: Workflow, - num_tasks: int, - cpu_work: int, - percent_cpu: float): - - # Build a set of output files - outputfile_set = set({}) - for task_name in workflow.tasks: - for f in workflow.tasks[task_name].output_files: - outputfile_set.add(f.file_id) - - # Build a dict of input files that must be created files with sizes - inputfile_dict = dict({}) - for task_name in workflow.tasks: - for f in workflow.tasks[task_name].input_files: - if f.file_id not in outputfile_set: - inputfile_dict[f.file_id] = f.size - - # Open the "to_create.txt" file and go line by line and check names/sizes - json_path = dirpath / "to_create.txt" - with json_path.open("r") as f: - for line in f.readlines(): - [filename, size] = line.strip().split(" ") - assert(filename in inputfile_dict) - assert(inputfile_dict[filename] == int(size)) - - return True + # Inspect the data after execution + _actual_data_files_as_expected(dirpath, benchmark.workflow, data_spec) @pytest.mark.unit def test_create_from_instance(self) -> None: workflow = BlastRecipe.from_num_tasks(500).build_workflow() benchmark = WorkflowBenchmark(recipe=BlastRecipe, num_tasks=500) - dirpath = pathlib.Path("/tmp/benchmark/") - if dirpath.exists(): - shutil.rmtree(dirpath) - dirpath.mkdir(parents=True, exist_ok=True) + dirpath = _create_fresh_local_dir("/tmp/benchmark/") path = benchmark.create_benchmark_from_synthetic_workflow(dirpath, workflow, cpu_work=100, percent_cpu=0.6) - assert(self._directory_content_as_expected(dirpath, workflow, 500, 100, 0.6)) - assert(self._workflow_as_expected(dirpath, workflow, 500, 100, 0.6)) - assert(self._to_create_file_as_expected(dirpath, workflow, 500, 100, 0.6)) + assert(_directory_content_as_expected(dirpath, workflow, 500, 100, 0.6)) + assert(_workflow_as_expected(dirpath, workflow, 500, 100, 0.6)) + assert(_to_create_file_as_expected(dirpath, workflow, 500, 100, 0.6)) diff --git a/tests/wfchef/test_wfchef.py b/tests/wfchef/test_wfchef.py index 2fc1668d..c3311836 100644 --- a/tests/wfchef/test_wfchef.py +++ b/tests/wfchef/test_wfchef.py @@ -15,6 +15,7 @@ import subprocess import sys +from tests.test_helpers import _create_fresh_local_dir from wfcommons.wfchef.chef import create_recipe from wfcommons.wfchef.chef import uninstall_recipe from wfcommons.wfchef.chef import ls_recipe @@ -28,10 +29,7 @@ def test_create_recipe(self) -> None: Just calling the create_recipe function from chef.py directly (i.e., bypassing main()) """ - dirpath = pathlib.Path("/tmp/recipe/") - if dirpath.exists(): - shutil.rmtree(dirpath) - dirpath.mkdir(parents=True, exist_ok=True) + dirpath = _create_fresh_local_dir("/tmp/recipe/") # Put a few JSON workflows in /tmp urls = ["https://raw.githubusercontent.com/wfcommons/WfInstances/refs/heads/main/makeflow/blast/blast-chameleon-small-001.json", diff --git a/wfcommons/common/workflow.py b/wfcommons/common/workflow.py index dbaafb87..c0d3a39e 100644 --- a/wfcommons/common/workflow.py +++ b/wfcommons/common/workflow.py @@ -41,7 +41,7 @@ class Workflow(nx.DiGraph): :param executed_at: Workflow start timestamp in the ISO 8601 format. :type executed_at: Optional[str] :param makespan: Workflow makespan in seconds. - :type makespan: Optional[int] + :type makespan: Optional[float] :param author_name: Author name. :type author_name: Optional[str] :param author_email: Author email. @@ -59,7 +59,7 @@ def __init__(self, runtime_system_version: Optional[str] = None, runtime_system_url: Optional[str] = None, executed_at: Optional[str] = None, - makespan: Optional[int] = 0.0, + makespan: Optional[float] = 0.0, author_name: Optional[str] = None, author_email: Optional[str] = None, author_institution: Optional[str] = None, @@ -74,15 +74,15 @@ def __init__(self, self.runtime_system_version: Optional[str] = str(__version__) if not runtime_system_version else runtime_system_version self.runtime_system_url: Optional[str] = f"https://docs.wfcommons.org/en/v{__version__}/" if not runtime_system_url else runtime_system_url self.executed_at: Optional[str] = str(datetime.now().astimezone().isoformat()) if not executed_at else executed_at - self.makespan: Optional[int] = makespan + self.makespan: Optional[float] = makespan self.author_name: Optional[str] = author_name if author_name else str(getpass.getuser()) self.author_email: Optional[str] = author_email if author_email else "support@wfcommons.org" self.author_institution: Optional[str] = None self.author_country: Optional[str] = None - self.tasks: Task = {} + self.tasks: dict[str, Task] = {} self.tasks_parents = {} self.tasks_children = {} - self.workflow_id: str = None + self.workflow_id: str|None = None self.workflow_json = {} super().__init__(name=name, makespan=self.makespan, executed_at=self.executed_at) @@ -127,7 +127,7 @@ def write_json(self, json_file_path: Optional[pathlib.Path] = None) -> None: with open(json_file_path, "w") as outfile: outfile.write(json.dumps(self.workflow_json, indent=4)) - def generate_json(self) -> dict: + def generate_json(self) -> None: """ Generate a JSON representation of the workflow instance. @@ -219,7 +219,7 @@ def write_dot(self, dot_file_path: Optional[pathlib.Path] = None) -> None: """ if not dot_file_path: dot_file_path = pathlib.Path(f"{self.name.lower()}.dot") - nx.nx_agraph.write_dot(self, dot_file_path) + nx.nx_agraph.write_dot(self, str(dot_file_path)) def read_dot(self, dot_file_path: Optional[pathlib.Path] = None) -> None: """ diff --git a/wfcommons/wfbench/bench.py b/wfcommons/wfbench/bench.py index 5388a11c..b5961fc2 100644 --- a/wfcommons/wfbench/bench.py +++ b/wfcommons/wfbench/bench.py @@ -52,10 +52,10 @@ def __init__(self, """Create an object that represents a workflow benchmark generator.""" self.logger: Logger = logging.getLogger( __name__) if logger is None else logger - self.recipe = recipe + self.recipe: Type[WfChefWorkflowRecipe] = recipe self.num_tasks = num_tasks self.with_flowcept = with_flowcept - self.workflow: Workflow = None + self.workflow: [Workflow|None] = None def create_benchmark_from_input_file(self, save_dir: pathlib.Path, @@ -341,7 +341,7 @@ def create_benchmark(self, return json_path - def _creating_lock_files(self, lock_files_folder: Optional[pathlib.Path]) -> Tuple[pathlib.Path, pathlib.Path]: + def _creating_lock_files(self, lock_files_folder: Optional[pathlib.Path]) -> Tuple[pathlib.Path | None, pathlib.Path | None]: """ Creating the lock files """ @@ -509,7 +509,7 @@ def _output_files(self, data: Dict[str, str]) -> Dict[str, Dict[str, int]]: def _calculate_input_files(self): """ Calculate total number of files needed. - This mehtod is used if the user provides total datafootprint. + This method is used if the user provides total data footprint. """ tasks_need_input = 0 tasks_dont_need_input = 0 @@ -635,7 +635,7 @@ def generate_input_file(self, path: pathlib.Path) -> None: input("Please fill up the input file and press ENTER to continue...") -def generate_sys_data(num_files: int, tasks: Dict[str, int], save_dir: pathlib.Path) -> None: +def generate_sys_data(num_files: int, tasks: Dict[str, int], save_dir: pathlib.Path) -> List[str]: """Generate workflow's input data :param num_files: number of each file to be generated. @@ -658,12 +658,12 @@ def generate_sys_data(num_files: int, tasks: Dict[str, int], save_dir: pathlib.P return names -def assigning_correct_files(task: Dict[str, str]) -> List[str]: - files = [] - for file in task["files"]: - if file["link"] == "input": - files.append(file["name"]) - return files +# def assigning_correct_files(task: Dict[str, str]) -> List[str]: +# files = [] +# for file in task["files"]: +# if file["link"] == "input": +# files.append(file["name"]) +# return files def cleanup_sys_files() -> None: diff --git a/wfcommons/wfbench/translator/bash.py b/wfcommons/wfbench/translator/bash.py index 66d8f351..9a28b839 100644 --- a/wfcommons/wfbench/translator/bash.py +++ b/wfcommons/wfbench/translator/bash.py @@ -85,7 +85,7 @@ def translate(self, output_folder: pathlib.Path) -> None: self._copy_binary_files(output_folder) self._generate_input_files(output_folder) - def _bash_wftasks_codelines(self) -> None: + def _bash_wftasks_codelines(self) -> list[str]: codelines = [] # Parsing each steps by Workflow levels diff --git a/wfcommons/wfbench/translator/dask.py b/wfcommons/wfbench/translator/dask.py index 361ec32c..1b3da62d 100644 --- a/wfcommons/wfbench/translator/dask.py +++ b/wfcommons/wfbench/translator/dask.py @@ -142,6 +142,7 @@ def _parse_tasks(self, task_name: str) -> list[str]: :return: The :rtype: list[str] """ + noindent_python_codelines = [] if task_name not in self.parsed_tasks: # check for dependencies for parent in self.task_parents[task_name]: diff --git a/wfcommons/wfbench/translator/parsl.py b/wfcommons/wfbench/translator/parsl.py index 4b147a6e..11f7c7b7 100644 --- a/wfcommons/wfbench/translator/parsl.py +++ b/wfcommons/wfbench/translator/parsl.py @@ -96,7 +96,7 @@ def translate(self, output_folder: pathlib.Path) -> None: self._copy_binary_files(output_folder) self._generate_input_files(output_folder) - def _parsl_wftasks_codelines(self) -> None: + def _parsl_wftasks_codelines(self) -> list[str]: codelines = ["task_arr = []\n"] # Parsing each steps by Workflow levels diff --git a/wfcommons/wfchef/chef.py b/wfcommons/wfchef/chef.py index 453383dc..31b3e6a5 100644 --- a/wfcommons/wfchef/chef.py +++ b/wfcommons/wfchef/chef.py @@ -84,9 +84,9 @@ def find_err(workflow: pathlib.Path, :param workflow: name (for samples available in WfCommons) or path to the real workflow instances. :type workflow: pathlib.Path :param err_savepath: path to save the err (rmse) of all instances available into a csv. - :type real_graph: Optional[pathlib.Path] + :type err_savepath: Optional[pathlib.Path] :param always_update: flag to set if the err needs to be updated or not (True: if new instances are added, False: otherwise). - :type real_graph: Optional[bool] + :type always_update: Optional[bool] :param runs: number of times to repeat the err calculation process (due to randomization). :type runs: Optional[bool] @@ -154,7 +154,7 @@ def analyzer_summary(path_to_instances: pathlib.Path) -> Dict: task_types = set() for path in path_to_instances.glob("*.json"): - instance = Instance(input_instance=str(path)) + instance = Instance(input_instance=path) analyzer.append_instance(instance) graph = create_graph(path) for node in graph.nodes: diff --git a/wfcommons/wfchef/find_microstructures.py b/wfcommons/wfchef/find_microstructures.py index 46e784aa..fbe24383 100644 --- a/wfcommons/wfchef/find_microstructures.py +++ b/wfcommons/wfchef/find_microstructures.py @@ -25,7 +25,7 @@ this_dir = pathlib.Path(__file__).resolve().parent -def comb(n: int, k: int) -> int: +def comb(n: int, k: int) -> float: """ Calculates the combination of two integers. @@ -35,7 +35,7 @@ def comb(n: int, k: int) -> int: :type k: int. :return: combination of two integers. - :rtype: int. + :rtype: float. """ return math.factorial(n) / (math.factorial(k) * math.factorial(n - k)) @@ -49,7 +49,7 @@ def get_children(graph: nx.DiGraph, node: str) -> List[str]: Gets the children of a node. :param graph: graph that contains the node. - :type graph: netwrokX DiGraph. + :type graph: networkX DiGraph. :param node: a node. :type node: str. @@ -64,7 +64,7 @@ def get_parents(graph: nx.DiGraph, node: str) -> List[str]: Gets the parents of a node. :param graph: graph that contains the node. - :type graph: netwrokX DiGraph. + :type graph: networkX DiGraph. :param node: a node. :type node: str. @@ -79,7 +79,7 @@ def get_relatives(graph: nx.DiGraph, node: str) -> Set[str]: Gets all node's relatives (children and parents). :param graph: graph that contains the node. - :type graph: netwrokX DiGraph. + :type graph: networkX DiGraph. :param node: a node. :type node: str. @@ -94,11 +94,11 @@ def find_microstructure(graph: nx.DiGraph, n1: str, n2: str): Detects a pattern (microstructure). :param graph: graph. - :type graph: netwrokX DiGraph. + :type graph: networkX DiGraph. :param n1: a node in graph. - :type node: str. - :param n1: a different node in graph. - :type node: str. + :type n1: str. + :param n2: a different node in graph. + :type n2: str. :return: sets of n1 related nodes, n2 related nodes, the nodes in common between n1 and n2 and all the nodes involved in the process. @@ -138,9 +138,9 @@ def find_microstructures(graph: nx.DiGraph, verbose: bool = False): Detects the patterns (microstructures) that are used for replication and graph expansion. :param graph: graph. - :type graph: netwrokX DiGraph. + :type graph: networkX DiGraph. :param verbose: if set, prints status messages. - :type verbose: netwrokX DiGraph. + :type verbose: networkX DiGraph. :return: patterns (microstructures) :rtype: Set[str]. @@ -186,7 +186,7 @@ def sort_graphs(workflow_path: Union[pathlib.Path], :param workflow_path: path to the JSON instances. :type workflow_path: pathlib.Path. :param verbose: if set, prints status messages. - :type verbose: netwrokX DiGraph. + :type verbose: networkX DiGraph. :return: sorted graphs :rtype: List[networkX.DiGraph]. diff --git a/wfcommons/wfchef/recipes/blast/recipe.py b/wfcommons/wfchef/recipes/blast/recipe.py index 1e78355a..6fd4f34b 100644 --- a/wfcommons/wfchef/recipes/blast/recipe.py +++ b/wfcommons/wfchef/recipes/blast/recipe.py @@ -39,13 +39,15 @@ class BlastRecipe(WfChefWorkflowRecipe): def __init__(self, data_footprint: Optional[int] = 0, num_tasks: Optional[int] = 3, - exclude_graphs: Set[str] = set(), + exclude_graphs: Set[str] = None, runtime_factor: Optional[float] = 1.0, input_file_size_factor: Optional[float] = 1.0, output_file_size_factor: Optional[float] = 1.0, logger: Optional[Logger] = None, base_method: BaseMethod = BaseMethod.ERROR_TABLE, **kwargs) -> None: + if exclude_graphs is None: + exclude_graphs = set() super().__init__( name="Blast", data_footprint=data_footprint, diff --git a/wfcommons/wfchef/recipes/bwa/recipe.py b/wfcommons/wfchef/recipes/bwa/recipe.py index 4bb6bc8d..5ba0c47c 100644 --- a/wfcommons/wfchef/recipes/bwa/recipe.py +++ b/wfcommons/wfchef/recipes/bwa/recipe.py @@ -39,13 +39,15 @@ class BwaRecipe(WfChefWorkflowRecipe): def __init__(self, data_footprint: Optional[int] = 0, num_tasks: Optional[int] = 3, - exclude_graphs: Set[str] = set(), + exclude_graphs: Set[str] = None, runtime_factor: Optional[float] = 1.0, input_file_size_factor: Optional[float] = 1.0, output_file_size_factor: Optional[float] = 1.0, logger: Optional[Logger] = None, base_method: BaseMethod = BaseMethod.ERROR_TABLE, **kwargs) -> None: + if exclude_graphs is None: + exclude_graphs = set() super().__init__( name="Bwa", data_footprint=data_footprint, diff --git a/wfcommons/wfchef/recipes/cycles/recipe.py b/wfcommons/wfchef/recipes/cycles/recipe.py index ba68f46e..018cd501 100644 --- a/wfcommons/wfchef/recipes/cycles/recipe.py +++ b/wfcommons/wfchef/recipes/cycles/recipe.py @@ -39,13 +39,15 @@ class CyclesRecipe(WfChefWorkflowRecipe): def __init__(self, data_footprint: Optional[int] = 0, num_tasks: Optional[int] = 3, - exclude_graphs: Set[str] = set(), + exclude_graphs: Set[str] = None, runtime_factor: Optional[float] = 1.0, input_file_size_factor: Optional[float] = 1.0, output_file_size_factor: Optional[float] = 1.0, logger: Optional[Logger] = None, base_method: BaseMethod = BaseMethod.ERROR_TABLE, **kwargs) -> None: + if exclude_graphs is None: + exclude_graphs = set() super().__init__( name="Cycles", data_footprint=data_footprint, diff --git a/wfcommons/wfchef/recipes/epigenomics/recipe.py b/wfcommons/wfchef/recipes/epigenomics/recipe.py index 361fa5b7..c5e06656 100644 --- a/wfcommons/wfchef/recipes/epigenomics/recipe.py +++ b/wfcommons/wfchef/recipes/epigenomics/recipe.py @@ -39,13 +39,15 @@ class EpigenomicsRecipe(WfChefWorkflowRecipe): def __init__(self, data_footprint: Optional[int] = 0, num_tasks: Optional[int] = 3, - exclude_graphs: Set[str] = set(), + exclude_graphs: Set[str] = None, runtime_factor: Optional[float] = 1.0, input_file_size_factor: Optional[float] = 1.0, output_file_size_factor: Optional[float] = 1.0, logger: Optional[Logger] = None, base_method: BaseMethod = BaseMethod.ERROR_TABLE, **kwargs) -> None: + if exclude_graphs is None: + exclude_graphs = set() super().__init__( name="Epigenomics", data_footprint=data_footprint, diff --git a/wfcommons/wfchef/recipes/genome/recipe.py b/wfcommons/wfchef/recipes/genome/recipe.py index 9684cb20..4dd106f1 100644 --- a/wfcommons/wfchef/recipes/genome/recipe.py +++ b/wfcommons/wfchef/recipes/genome/recipe.py @@ -39,13 +39,15 @@ class GenomeRecipe(WfChefWorkflowRecipe): def __init__(self, data_footprint: Optional[int] = 0, num_tasks: Optional[int] = 3, - exclude_graphs: Set[str] = set(), + exclude_graphs: Set[str] = None, runtime_factor: Optional[float] = 1.0, input_file_size_factor: Optional[float] = 1.0, output_file_size_factor: Optional[float] = 1.0, logger: Optional[Logger] = None, base_method: BaseMethod = BaseMethod.ERROR_TABLE, **kwargs) -> None: + if exclude_graphs is None: + exclude_graphs = set() super().__init__( name="Genome", data_footprint=data_footprint, diff --git a/wfcommons/wfchef/recipes/montage/recipe.py b/wfcommons/wfchef/recipes/montage/recipe.py index 7279966e..c32d91b1 100644 --- a/wfcommons/wfchef/recipes/montage/recipe.py +++ b/wfcommons/wfchef/recipes/montage/recipe.py @@ -39,13 +39,15 @@ class MontageRecipe(WfChefWorkflowRecipe): def __init__(self, data_footprint: Optional[int] = 0, num_tasks: Optional[int] = 3, - exclude_graphs: Set[str] = set(), + exclude_graphs: Set[str] = None, runtime_factor: Optional[float] = 1.0, input_file_size_factor: Optional[float] = 1.0, output_file_size_factor: Optional[float] = 1.0, logger: Optional[Logger] = None, base_method: BaseMethod = BaseMethod.ERROR_TABLE, **kwargs) -> None: + if exclude_graphs is None: + exclude_graphs = set() super().__init__( name="Montage", data_footprint=data_footprint, diff --git a/wfcommons/wfchef/recipes/rnaseq/recipe.py b/wfcommons/wfchef/recipes/rnaseq/recipe.py index de42cce8..3fab75ec 100644 --- a/wfcommons/wfchef/recipes/rnaseq/recipe.py +++ b/wfcommons/wfchef/recipes/rnaseq/recipe.py @@ -39,13 +39,15 @@ class RnaseqRecipe(WfChefWorkflowRecipe): def __init__(self, data_footprint: Optional[int] = 0, num_tasks: Optional[int] = 3, - exclude_graphs: Set[str] = set(), + exclude_graphs: Set[str] = None, runtime_factor: Optional[float] = 1.0, input_file_size_factor: Optional[float] = 1.0, output_file_size_factor: Optional[float] = 1.0, logger: Optional[Logger] = None, base_method: BaseMethod = BaseMethod.ERROR_TABLE, **kwargs) -> None: + if exclude_graphs is None: + exclude_graphs = set() super().__init__( name="Rnaseq", data_footprint=data_footprint, diff --git a/wfcommons/wfchef/recipes/seismology/recipe.py b/wfcommons/wfchef/recipes/seismology/recipe.py index 347046f0..5781f841 100644 --- a/wfcommons/wfchef/recipes/seismology/recipe.py +++ b/wfcommons/wfchef/recipes/seismology/recipe.py @@ -39,13 +39,15 @@ class SeismologyRecipe(WfChefWorkflowRecipe): def __init__(self, data_footprint: Optional[int] = 0, num_tasks: Optional[int] = 3, - exclude_graphs: Set[str] = set(), + exclude_graphs: Set[str] = None, runtime_factor: Optional[float] = 1.0, input_file_size_factor: Optional[float] = 1.0, output_file_size_factor: Optional[float] = 1.0, logger: Optional[Logger] = None, base_method: BaseMethod = BaseMethod.ERROR_TABLE, **kwargs) -> None: + if exclude_graphs is None: + exclude_graphs = set() super().__init__( name="Seismology", data_footprint=data_footprint, diff --git a/wfcommons/wfchef/recipes/soykb/recipe.py b/wfcommons/wfchef/recipes/soykb/recipe.py index 01fc8eea..eedd5f4e 100644 --- a/wfcommons/wfchef/recipes/soykb/recipe.py +++ b/wfcommons/wfchef/recipes/soykb/recipe.py @@ -39,13 +39,15 @@ class SoykbRecipe(WfChefWorkflowRecipe): def __init__(self, data_footprint: Optional[int] = 0, num_tasks: Optional[int] = 3, - exclude_graphs: Set[str] = set(), + exclude_graphs: Set[str] = None, runtime_factor: Optional[float] = 1.0, input_file_size_factor: Optional[float] = 1.0, output_file_size_factor: Optional[float] = 1.0, logger: Optional[Logger] = None, base_method: BaseMethod = BaseMethod.ERROR_TABLE, **kwargs) -> None: + if exclude_graphs is None: + exclude_graphs = set() super().__init__( name="Soykb", data_footprint=data_footprint, diff --git a/wfcommons/wfchef/recipes/srasearch/recipe.py b/wfcommons/wfchef/recipes/srasearch/recipe.py index 5d7d0973..c1ec4b10 100644 --- a/wfcommons/wfchef/recipes/srasearch/recipe.py +++ b/wfcommons/wfchef/recipes/srasearch/recipe.py @@ -39,13 +39,15 @@ class SrasearchRecipe(WfChefWorkflowRecipe): def __init__(self, data_footprint: Optional[int] = 0, num_tasks: Optional[int] = 3, - exclude_graphs: Set[str] = set(), + exclude_graphs: Set[str] = None, runtime_factor: Optional[float] = 1.0, input_file_size_factor: Optional[float] = 1.0, output_file_size_factor: Optional[float] = 1.0, logger: Optional[Logger] = None, base_method: BaseMethod = BaseMethod.ERROR_TABLE, **kwargs) -> None: + if exclude_graphs is None: + exclude_graphs = set() super().__init__( name="Srasearch", data_footprint=data_footprint, diff --git a/wfcommons/wfchef/skeletons/recipe.py b/wfcommons/wfchef/skeletons/recipe.py index 9288d968..e3789550 100644 --- a/wfcommons/wfchef/skeletons/recipe.py +++ b/wfcommons/wfchef/skeletons/recipe.py @@ -39,13 +39,15 @@ class SkeletonRecipe(WfChefWorkflowRecipe): def __init__(self, data_footprint: Optional[int] = 0, num_tasks: Optional[int] = 3, - exclude_graphs: Set[str] = set(), + exclude_graphs: Set[str] = None, runtime_factor: Optional[float] = 1.0, input_file_size_factor: Optional[float] = 1.0, output_file_size_factor: Optional[float] = 1.0, logger: Optional[Logger] = None, base_method: BaseMethod = BaseMethod.ERROR_TABLE, **kwargs) -> None: + if exclude_graphs is None: + exclude_graphs = set() super().__init__( name="Skeleton", data_footprint=data_footprint, diff --git a/wfcommons/wfchef/utils.py b/wfcommons/wfchef/utils.py index 8d21934d..f96379f7 100644 --- a/wfcommons/wfchef/utils.py +++ b/wfcommons/wfchef/utils.py @@ -90,8 +90,8 @@ def annotate(g: nx.DiGraph) -> None: Annotates a networkX DiGraph with metadata such as the tasks top-down type hash, bottom-up type hash, and type-hash. - :param path: name (for samples available in WfCommons) or the path to graphs JSON. - :type path: str or pathlib.Path. + :param g: networkX DiGraph. + :type g: nx.DiGraph. :return: annotated graph. :rtype: networkX DiGraph. @@ -151,7 +151,7 @@ def draw(g: nx.DiGraph, linewidths: int = 5, subgraph: Set[str] = set()) -> Tuple[plt.Figure, plt.Axes]: """ - Plots a netwrokX DiGraph. + Plots a networkX DiGraph. :param g: graph to be plotted. :type g: networkX DiGraph. diff --git a/wfcommons/wfchef/wfchef_abstract_recipe.py b/wfcommons/wfchef/wfchef_abstract_recipe.py index 7c90075a..3927b749 100644 --- a/wfcommons/wfchef/wfchef_abstract_recipe.py +++ b/wfcommons/wfchef/wfchef_abstract_recipe.py @@ -58,7 +58,7 @@ class WfChefWorkflowRecipe(WorkflowRecipe): def __init__(self, name: str, data_footprint: Optional[int], num_tasks: Optional[int], - exclude_graphs: Set[str] = set(), + exclude_graphs: Set[str] = None, runtime_factor: Optional[float] = 1.0, input_file_size_factor: Optional[float] = 1.0, output_file_size_factor: Optional[float] = 1.0, @@ -66,6 +66,8 @@ def __init__(self, name: str, this_dir: Union[str, pathlib.Path] = None, base_method: Optional[Enum] = BaseMethod.ERROR_TABLE) -> None: """Create an object of the workflow recipe.""" + if exclude_graphs is None: + exclude_graphs = set() super().__init__( name=name, data_footprint=data_footprint, @@ -174,7 +176,7 @@ def build_workflow(self, workflow_name: Optional[str] = None) -> Workflow: :rtype: Workflow """ workflow = Workflow(name=self.name + "-synthetic-instance" if not workflow_name else workflow_name, - makespan=0) + makespan=0.0) graph = self.generate_nx_graph() task_ids = {} diff --git a/wfcommons/wfinstances/instance.py b/wfcommons/wfinstances/instance.py index ca9f703d..a51d82bc 100644 --- a/wfcommons/wfinstances/instance.py +++ b/wfcommons/wfinstances/instance.py @@ -88,7 +88,7 @@ def __init__(self, input_instance: pathlib.Path, # Global properties self.executed_at: datetime = dateutil.parser.parse( self.instance["workflow"]["execution"]["executedAt"]) - self.makespan: int = self.instance["workflow"]["execution"]["makespanInSeconds"] + self.makespan: float = self.instance["workflow"]["execution"]["makespanInSeconds"] # Machines if "machines" in self.instance["workflow"]["execution"].keys(): diff --git a/wfcommons/wfinstances/logs/makeflow.py b/wfcommons/wfinstances/logs/makeflow.py index 336c8fc4..2b9abe0b 100644 --- a/wfcommons/wfinstances/logs/makeflow.py +++ b/wfcommons/wfinstances/logs/makeflow.py @@ -13,7 +13,7 @@ import math import pathlib -from datetime import datetime +from datetime import datetime, timezone from logging import Logger from typing import List, Optional @@ -84,8 +84,8 @@ def build_workflow(self, workflow_name: Optional[str] = None) -> Workflow: # create base workflow instance object self.workflow = Workflow(name=self.workflow_name, description=self.description, - wms_name=self.wms_name, - wms_url=self.wms_url) + runtime_system_name=self.wms_name, + runtime_system_url=self.wms_url) # parse workflow file self._parse_workflow_file() @@ -124,6 +124,9 @@ def _parse_workflow_file(self) -> None: list_files.extend(self._create_files(outputs, FileLink.OUTPUT, task_name)) list_files.extend(self._create_files(inputs, FileLink.INPUT, task_name)) + input_files = [f for f in list_files if f.link == FileLink.INPUT] + output_files = [f for f in list_files if f.link == FileLink.OUTPUT] + # create task args = ' '.join(line.replace('LOCAL', '').replace('perl', '').strip().split()) task = Task(name=task_name, @@ -134,7 +137,8 @@ def _parse_workflow_file(self) -> None: program=prefix, args=args.split(), cores=1, - files=list_files, + input_files=input_files, + output_files=output_files, logger=self.logger) self.workflow.add_node(task_name, task=task) self.args_map[args] = task @@ -173,11 +177,11 @@ def _create_files(self, files_list: List[str], link: FileLink, task_name: str) - elif file_path.is_file(): size = int(math.ceil(file_path.stat().st_size / 1000)) # B to KB - file_obj_in = File(name=file, + file_obj_in = File(file_id=file, size=size, link=FileLink.INPUT, logger=self.logger) - file_obj_out = File(name=file, + file_obj_out = File(file_id=file, size=size, link=FileLink.OUTPUT, logger=self.logger) @@ -200,9 +204,10 @@ def _parse_makeflow_log_file(self): for line in f: if 'STARTED' in line: start_time = int(line.split()[2]) - self.workflow.executed_at = datetime.utcfromtimestamp(start_time / 1000000).strftime( + self.workflow.executed_at = datetime.fromtimestamp(start_time / 1000000, tz=timezone.utc).strftime( '%Y-%m-%dT%H:%M:%S+00:00') + elif 'COMPLETED' in line: self.workflow.makespan = float('%.2f' % ((int(line.split()[2]) - start_time) / 1000000)) @@ -228,7 +233,7 @@ def _parse_resource_monitor_logs(self): task.bytes_written = int(data['bytes_written'][0] * 1000) # MB to KB task.avg_cpu = float('%.4f' % (float(data['cpu_time'][0]) / float(data['wall_time'][0]) * 100)) task.machine = Machine(name=data['host'], - cpu={'count': int(data['machine_cpus'][0]), 'speed': 0, 'vendor': ''}, + cpu={'coreCount': int(data['machine_cpus'][0]), 'speedInMHz': 0, 'vendor': ''}, logger=self.logger) # workflow diff --git a/wfcommons/wfinstances/logs/nextflow.py b/wfcommons/wfinstances/logs/nextflow.py index a89bc6d5..93a98c0c 100644 --- a/wfcommons/wfinstances/logs/nextflow.py +++ b/wfcommons/wfinstances/logs/nextflow.py @@ -118,8 +118,8 @@ def _parse_execution_timeline_file(self) -> None: for p in tasks_map[index - 1]: self.workflow.add_edge(p, c) - self.workflow.makespan = round( - (int(timeline_data['endingMillis']) - int(timeline_data['beginningMillis'])) / 1024) + self.workflow.makespan = float( + (int(timeline_data['endingMillis']) - int(timeline_data['beginningMillis'])) / 1000) def _read_data(self, file_format: str) -> Dict: """ diff --git a/wfcommons/wfinstances/logs/pegasusrec.py b/wfcommons/wfinstances/logs/pegasusrec.py index ee3ae437..382832d2 100644 --- a/wfcommons/wfinstances/logs/pegasusrec.py +++ b/wfcommons/wfinstances/logs/pegasusrec.py @@ -398,6 +398,7 @@ def _parse_dag(self): self.workflow.add_node(task_name, task = Task( name=task_name, + task_id=task_name, task_type=TaskType.AUXILIARY, runtime=0, args=[],