Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 90 additions & 10 deletions benchmark/librbdfio.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import settings
import monitoring

from typing import Optional
from .benchmark import Benchmark

logger = logging.getLogger("cbt")
Expand Down Expand Up @@ -50,6 +51,14 @@ def __init__(self, archive_dir, cluster, config):
self.rate_iops = config.get('rate_iops', None)
self.fio_out_format = config.get('fio_out_format', 'json,normal')
self.data_pool = None

self._ioddepth_per_volume: dict[int, int] = {}
total_iodepth: Optional[str] = config.get("total_iodepth", None)
if total_iodepth is not None:
self._ioddepth_per_volume = self._calculate_iodepth_per_volume(
int(self.volumes_per_client), int(total_iodepth)
)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not very clear why this is needed as a string rather than a numeric value (because it can be validated quickly). As a good practise, it might need its own method, with a short documentation in code.

# use_existing_volumes needs to be true to set the pool and rbd names
self.use_existing_volumes = bool(config.get('use_existing_volumes', False))
self.no_sudo = bool(config.get('no_sudo', False))
Expand Down Expand Up @@ -90,6 +99,7 @@ def backup_global_fio_options(self):
Backup/copy the FIO global options into a dictionary
"""
self.global_fio_options['time_based'] = self.time_based
self.global_fio_options['time'] = self.time
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is the correct way to add new global FIO options that can be inherited to the workloads sections.

self.global_fio_options['ramp'] = self.ramp
self.global_fio_options['iodepth'] = self.iodepth
self.global_fio_options['numjobs'] = self.numjobs
Expand All @@ -115,7 +125,7 @@ def restore_global_fio_options(self):
self.log_avg_msec = self.global_fio_options['log_avg_msec']
self.op_size = self.global_fio_options['op_size']
self.time_based = self.global_fio_options['time_based']

self.time = self.global_fio_options['time']

def exists(self):
"""
Expand Down Expand Up @@ -163,18 +173,46 @@ def run_workloads(self):
enable_monitor = bool(test['monitor'])
# TODO: simplify this loop to have a single iterator for general queu depth
for job in test['numjobs']:
for iod in test['iodepth']:
iodepth: list[str] = []
use_total_iodepth: bool = False
if "total_iodepth" in test.keys():
iodepth = test["total_iodepth"]
use_total_iodepth = True
else:
iodepth = test["iodepth"]
for iod in iodepth:
if use_total_iodepth:
self._ioddepth_per_volume = self._calculate_iodepth_per_volume(
int(self.volumes_per_client), int(iod)
)

Copy link
Contributor

@perezjosibm perezjosibm Feb 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might sound like criticism, but this code is making the logic obscure rather than improving it, I'd suggest to define a new method that calculates this, with a meaningful name, for example:

def get_iodepth(self, test)->list[str]:
      """
      Returns the appropriate iodepth list of values 
      """
      iodepth: list[str] = []
      # your code here, ie with the proper identation
      self.use_total_iodepth: bool = False
       if "total_iodepth" in test.keys():
                    iodepth = test["total_iodepth"]
                    self.use_total_iodepth = True
        else:
                    iodepth = test["iodepth"]
       return iodepth

Now, the _calculate_iodepth_per_volume() can use the self. use_total_iodepth can test whether its enabled or not, so it make the code easier to understand.

self.time = test['time']
self.mode = test['mode']
if 'op_size' in test:
self.op_size = test['op_size']
self.mode = test['mode']
self.numjobs = job
self.iodepth = iod
self.run_dir = ( f'{self.base_run_dir}/{self.mode}_{int(self.op_size)}/'
f'iodepth-{int(self.iodepth):03d}/numjobs-{int(self.numjobs):03d}' )

# Needed to allow for different mixed ratio results with the same block size, we
# store the ratio within the directory name. Otherwise workloads would only support
# 1 mixed workload for a given block size. For 100% read, 100% write don't need to
# store the read/write ratio.

if self.mode == 'randrw':
self.rwmixread = test['rwmixread']
self.rwmixwrite = 100 - self.rwmixread
self.run_dir = ( f'{self.base_run_dir}/{self.mode}{self.rwmixread}{self.rwmixwrite}_{int(self.op_size)}/'
f'iodepth-{int(self.iodepth):03d}/numjobs-{int(self.numjobs):03d}' )
else:
self.run_dir = ( f'{self.base_run_dir}/{self.mode}_{int(self.op_size)}/'
f'iodepth-{int(self.iodepth):03d}/numjobs-{int(self.numjobs):03d}' )
Comment on lines +196 to +208
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is also crying for its own method, with Chris' suggestion


common.make_remote_dir(self.run_dir)

for i in range(self.volumes_per_client):
number_of_volumes: int = int(self.volumes_per_client)
if use_total_iodepth:
number_of_volumes = len(self._ioddepth_per_volume.keys())
for i in range(number_of_volumes):
Comment on lines +213 to +215
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you follow the suggestion above that the flag self.use_total_iodepth be an object's attribute, the condition can be encapsulated in a small method as well.

fio_cmd = self.mkfiocmd(i)
p = common.pdsh(settings.getnodes('clients'), fio_cmd)
ps.append(p)
Expand Down Expand Up @@ -226,7 +264,10 @@ def run(self):
monitoring.start(self.run_dir)
logger.info('Running rbd fio %s test.', self.mode)
ps = []
for i in range(self.volumes_per_client):
number_of_volumes: int = int(self.volumes_per_client)
if self._ioddepth_per_volume != {}:
number_of_volumes = len(self._ioddepth_per_volume.keys())
for i in range(number_of_volumes):
fio_cmd = self.mkfiocmd(i)
p = common.pdsh(settings.getnodes('clients'), fio_cmd)
ps.append(p)
Expand All @@ -244,7 +285,7 @@ def run(self):
self.analyze(self.out_dir)


def mkfiocmd(self, volnum):
def mkfiocmd(self, volnum: int) -> str:
"""
Construct a FIO cmd (note the shell interpolation for the host
executing FIO).
Expand All @@ -257,7 +298,7 @@ def mkfiocmd(self, volnum):
logger.debug('Using rbdname %s', rbdname)
out_file = f'{self.run_dir}/output.{volnum:d}'

fio_cmd = ''
fio_cmd: str = ''
if not self.no_sudo:
fio_cmd = 'sudo '
fio_cmd += '%s --ioengine=rbd --clientname=admin --pool=%s --rbdname=%s --invalidate=0' % (self.cmd_path, self.pool_name, rbdname)
Expand All @@ -274,7 +315,12 @@ def mkfiocmd(self, volnum):
fio_cmd += ' --numjobs=%s' % self.numjobs
fio_cmd += ' --direct=1'
fio_cmd += ' --bs=%dB' % self.op_size
fio_cmd += ' --iodepth=%d' % self.iodepth

iodepth: str = f"{self.iodepth}"
if self._ioddepth_per_volume != {}:
iodepth = f"{self._ioddepth_per_volume[volnum]}"
Comment on lines +320 to +321
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably a typo "ioddepth" -- why double 'd'? , but is used consistently


fio_cmd += ' --iodepth=%s' % iodepth
fio_cmd += ' --end_fsync=%d' % self.end_fsync
# if self.vol_size:
# fio_cmd += ' -- size=%dM' % self.vol_size
Expand Down Expand Up @@ -401,6 +447,40 @@ def analyze(self, out_dir):
logger.info('Convert results to json format.')
self.parse(out_dir)

def _calculate_iodepth_per_volume(self, number_of_volumes: int, total_desired_iodepth: int) -> dict[int, int]:
"""
Given the total desired iodepth and the number of volumes from the
configuration yaml file, calculate the iodepth for each volume

If the iodepth specified in total_iodepth is too small to allow
an iodepth of 1 per volume, then reduce the number of volumes
used to allow an iodepth of 1 per volume.
"""
queue_depths: dict[int, int] = {}

if number_of_volumes > total_desired_iodepth:
logger.warning(
"The total iodepth requested: %s is less than 1 per volume (%s)",
total_desired_iodepth,
number_of_volumes,
)
logger.warning(
"Number of volumes per client will be reduced from %s to %s", number_of_volumes, total_desired_iodepth
)
number_of_volumes = total_desired_iodepth

iodepth_per_volume: int = total_desired_iodepth // number_of_volumes
remainder: int = total_desired_iodepth % number_of_volumes

for volume_id in range(number_of_volumes):
iodepth: int = iodepth_per_volume

if remainder > 0:
iodepth += 1
remainder -= 1
queue_depths[volume_id] = iodepth

return queue_depths
Copy link
Contributor

@perezjosibm perezjosibm Feb 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the queue_depths[] is always going to be assigned to the attribute self._ioddepth_per_volume, you might set it here instead. This is updating the object attribute (like the cummulative parameter technique in Functional Programming), and makes clearer the intention IMO


def __str__(self):
return "%s\n%s\n%s" % (self.run_dir, self.out_dir, super(LibrbdFio, self).__str__())