Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions tests/recipes/test_recipes.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@

class TestRecipes:

recipe_class_list = [SeismologyRecipe,
recipe_class_list = [
GenomeRecipe,
SeismologyRecipe,
MontageRecipe,
RnaseqRecipe,
BwaRecipe,
Expand All @@ -48,7 +50,9 @@ def test_recipes(self, recipe_class) -> None:

recipe = recipe_class.from_num_tasks(num_tasks=200, runtime_factor=1.1, input_file_size_factor=1.5,
output_file_size_factor=0.8)
workflow = WorkflowGenerator(recipe).build_workflow()
workflows = WorkflowGenerator(recipe).build_workflows(1)
assert len(workflows) == 1



@pytest.mark.unit
Expand Down
91 changes: 91 additions & 0 deletions tests/test_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import pathlib
import shutil
import tarfile
import os
import io
import sys
import docker
from docker.errors import ImageNotFound

def _create_fresh_local_dir(path: str) -> pathlib.Path:
dirpath = pathlib.Path(path)
if dirpath.exists():
shutil.rmtree(dirpath)
dirpath.mkdir(parents=True, exist_ok=True)
return dirpath

def _remove_local_dir_if_it_exists(path: str) -> None:
dirpath = pathlib.Path(path)
if dirpath.exists():
shutil.rmtree(dirpath)


def _make_tarfile_of_wfcommons():
source_dir = os.getcwd() # This assumes the testing is run from the root
tar_stream = io.BytesIO()
with tarfile.open(fileobj=tar_stream, mode='w') as tar:
tar.add(source_dir, arcname=os.path.basename(source_dir))
tar_stream.seek(0)
return tar_stream

def _install_WfCommons_on_container(container):
# sys.stderr.write("Installing WfCommons on the container...\n")
# Copy the WfCommons code to it (removing stuff that should be removed)
target_path = '/tmp/' # inside container
tar_data = _make_tarfile_of_wfcommons()
container.put_archive(target_path, tar_data)
exit_code, output = container.exec_run("sudo /bin/rm -rf /tmp/WfCommons/build/", stdout=True, stderr=True)
exit_code, output = container.exec_run("sudo /bin/rm -rf /tmp/WfCommons/*.egg-info/", stdout=True, stderr=True)
exit_code, output = container.exec_run("sudo /bin/rm -rf /tmp/WfCommons/bin/cpu-benchmark.o", stdout=True,
stderr=True)
exit_code, output = container.exec_run("sudo /bin/rm -rf /tmp/WfCommons/bin/cpu-benchmark", stdout=True,
stderr=True)

# Install WfCommons on the container (to install wfbench and cpu-benchmark really)
exit_code, output = container.exec_run("sudo python3 -m pip install . --break-system-packages",
workdir="/tmp/WfCommons", stdout=True, stderr=True)

def _start_docker_container(backend, mounted_dir, working_dir, bin_dir, command=None):
if command is None:
command = ["sleep", "infinity"]
# Pulling the Docker image
client = docker.from_env()
image_name = f"wfcommons/wfcommons-testing-{backend}"

try:
image = client.images.get(image_name)
sys.stderr.write(f"Image '{image_name}' is available locally\n")
except ImageNotFound:
sys.stderr.write(f"Pulling image '{image_name}'...\n")
client.images.pull(image_name)

# Launch the docker container to actually run the translated workflow
sys.stderr.write("Starting Docker container...\n")
container = client.containers.run(
image=image_name,
command=command,
volumes={mounted_dir: {'bind': mounted_dir, 'mode': 'rw'}},
working_dir=working_dir,
tty=True,
detach=True
)

# Installing WfCommons on container
_install_WfCommons_on_container(container)

# Copy over the wfbench and cpu-benchmark executables to where they should go on the container
if bin_dir:
exit_code, output = container.exec_run(["sh", "-c", "sudo cp -f `which wfbench` " + bin_dir],
stdout=True, stderr=True)
exit_code, output = container.exec_run(["sh", "-c", "sudo cp -f `which cpu-benchmark` " + bin_dir],
stdout=True, stderr=True)

return container

def _get_total_size_of_directory(directory_path: str):
total_size = 0
for dirpath, dirnames, filenames in os.walk(directory_path):
for filename in filenames:
filepath = os.path.join(dirpath, filename)
total_size += os.path.getsize(filepath)
return total_size
134 changes: 66 additions & 68 deletions tests/translators/test_translators.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,13 @@
import pathlib
import pytest
import shutil
import docker
from docker.errors import ImageNotFound
import io
import tarfile
import os
import sys
import json
import time

from tests.test_helpers import _create_fresh_local_dir
from tests.test_helpers import _remove_local_dir_if_it_exists
from tests.test_helpers import _start_docker_container
from wfcommons import BlastRecipe
from wfcommons.wfbench import WorkflowBenchmark
from wfcommons.wfbench import DaskTranslator
Expand All @@ -33,67 +31,67 @@

from wfcommons.wfinstances import PegasusLogsParser



def _start_docker_container(backend, mounted_dir, working_dir, bin_dir, command=["sleep", "infinity"]):
# Pulling the Docker image
client = docker.from_env()
image_name = f"wfcommons/wfcommons-testing-{backend}"

try:
image = client.images.get(image_name)
sys.stderr.write(f"Image '{image_name}' is available locally\n")
except ImageNotFound:
sys.stderr.write(f"Pulling image '{image_name}'...\n")
client.images.pull(image_name)

# Launch the docker container to actually run the translated workflow
sys.stderr.write("Starting Docker container...\n")
container = client.containers.run(
image_name,
command=command,
volumes={mounted_dir: {'bind': mounted_dir, 'mode': 'rw'}},
working_dir=working_dir,
tty=True,
detach=True
)

# Installing WfCommons on container
_install_WfCommons_on_container(container)

# Copy over the wfbench and cpu-benchmark executables to where they should go on the container
if bin_dir:
exit_code, output = container.exec_run(["sh", "-c", "sudo cp -f `which wfbench` " + bin_dir],
stdout=True, stderr=True)
exit_code, output = container.exec_run(["sh", "-c", "sudo cp -f `which cpu-benchmark` " + bin_dir],
stdout=True, stderr=True)

return container

def _make_tarfile_of_wfcommons():
source_dir = os.getcwd() # This assumes the testing is run from the root
tar_stream = io.BytesIO()
with tarfile.open(fileobj=tar_stream, mode='w') as tar:
tar.add(source_dir, arcname=os.path.basename(source_dir))
tar_stream.seek(0)
return tar_stream

def _install_WfCommons_on_container(container):
# sys.stderr.write("Installing WfCommons on the container...\n")
# Copy the WfCommons code to it (removing stuff that should be removed)
target_path = '/tmp/' # inside container
tar_data = _make_tarfile_of_wfcommons()
container.put_archive(target_path, tar_data)
exit_code, output = container.exec_run("sudo /bin/rm -rf /tmp/WfCommons/build/", stdout=True, stderr=True)
exit_code, output = container.exec_run("sudo /bin/rm -rf /tmp/WfCommons/*.egg-info/", stdout=True, stderr=True)
exit_code, output = container.exec_run("sudo /bin/rm -rf /tmp/WfCommons/bin/cpu-benchmark.o", stdout=True,
stderr=True)
exit_code, output = container.exec_run("sudo /bin/rm -rf /tmp/WfCommons/bin/cpu-benchmark", stdout=True,
stderr=True)

# Install WfCommons on the container (to install wfbench and cpu-benchmark really)
exit_code, output = container.exec_run("sudo python3 -m pip install . --break-system-packages",
workdir="/tmp/WfCommons", stdout=True, stderr=True)
# def _start_docker_container(backend, mounted_dir, working_dir, bin_dir, command=None):
# if command is None:
# command = ["sleep", "infinity"]
# # Pulling the Docker image
# client = docker.from_env()
# image_name = f"wfcommons/wfcommons-testing-{backend}"
#
# try:
# image = client.images.get(image_name)
# sys.stderr.write(f"Image '{image_name}' is available locally\n")
# except ImageNotFound:
# sys.stderr.write(f"Pulling image '{image_name}'...\n")
# client.images.pull(image_name)
#
# # Launch the docker container to actually run the translated workflow
# sys.stderr.write("Starting Docker container...\n")
# container = client.containers.run(
# image=image_name,
# command=command,
# volumes={mounted_dir: {'bind': mounted_dir, 'mode': 'rw'}},
# working_dir=working_dir,
# tty=True,
# detach=True
# )
#
# # Installing WfCommons on container
# _install_WfCommons_on_container(container)
#
# # Copy over the wfbench and cpu-benchmark executables to where they should go on the container
# if bin_dir:
# exit_code, output = container.exec_run(["sh", "-c", "sudo cp -f `which wfbench` " + bin_dir],
# stdout=True, stderr=True)
# exit_code, output = container.exec_run(["sh", "-c", "sudo cp -f `which cpu-benchmark` " + bin_dir],
# stdout=True, stderr=True)
#
# return container

# def _make_tarfile_of_wfcommons():
# source_dir = os.getcwd() # This assumes the testing is run from the root
# tar_stream = io.BytesIO()
# with tarfile.open(fileobj=tar_stream, mode='w') as tar:
# tar.add(source_dir, arcname=os.path.basename(source_dir))
# tar_stream.seek(0)
# return tar_stream
#
# def _install_WfCommons_on_container(container):
# # sys.stderr.write("Installing WfCommons on the container...\n")
# # Copy the WfCommons code to it (removing stuff that should be removed)
# target_path = '/tmp/' # inside container
# tar_data = _make_tarfile_of_wfcommons()
# container.put_archive(target_path, tar_data)
# exit_code, output = container.exec_run("sudo /bin/rm -rf /tmp/WfCommons/build/", stdout=True, stderr=True)
# exit_code, output = container.exec_run("sudo /bin/rm -rf /tmp/WfCommons/*.egg-info/", stdout=True, stderr=True)
# exit_code, output = container.exec_run("sudo /bin/rm -rf /tmp/WfCommons/bin/cpu-benchmark.o", stdout=True,
# stderr=True)
# exit_code, output = container.exec_run("sudo /bin/rm -rf /tmp/WfCommons/bin/cpu-benchmark", stdout=True,
# stderr=True)
#
# # Install WfCommons on the container (to install wfbench and cpu-benchmark really)
# exit_code, output = container.exec_run("sudo python3 -m pip install . --break-system-packages",
# workdir="/tmp/WfCommons", stdout=True, stderr=True)

def _create_workflow_benchmark():
# Create a workflow benchmark object to generate specifications based on a recipe (in /tmp/, whatever)
Expand Down Expand Up @@ -281,8 +279,8 @@ def test_translator(self, backend) -> None:
# Create a local translation directory
str_dirpath = "/tmp/" + backend + "_translated_workflow/"
dirpath = pathlib.Path(str_dirpath)
if dirpath.exists():
shutil.rmtree(dirpath)
# dirpath = _create_fresh_local_dir(str_dirpath)
_remove_local_dir_if_it_exists(str_dirpath)

# Perform the translation
sys.stderr.write("\nTranslating workflow...\n")
Expand Down
5 changes: 5 additions & 0 deletions tests/unit/common/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,11 @@ def test_workflow_json_generation(self):

# Create an instance from the JSON File and write it back to a JSON
instance = Instance(pathlib.Path("/tmp") / local_file_name)

# Testing the "iterator" capability
assert(len(list(instance)) == len(instance.instance["workflow"]["specification"]["tasks"]))

# Test writing to JSON
instance.workflow.write_json(pathlib.Path("/tmp/written_workflow.json"))

# Get the two jsons as objects
Expand Down
Loading