Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ais_bench/benchmark/cli/argument_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def _base_parser(self):
help='Running mode. Choose "perf" for performance evaluation, "infer" to run inference only, '
'"eval" to evaluate existing inference results, or "viz" to visualize the results. '
'The default mode is "all", which runs all steps.',
choices=['all', 'infer', 'eval', 'viz', 'perf', 'perf_viz'],
choices=['all', 'infer', 'eval', 'viz', 'perf', 'perf_viz', 'judge', 'infer_judge'],
default='all',
type=str
)
Expand Down
3 changes: 1 addition & 2 deletions ais_bench/benchmark/cli/config_manager.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

import os
import os.path as osp
import tabulate
Expand Down Expand Up @@ -104,7 +103,7 @@ def load_config(self, workflow):
self._update_cfg_of_workflow(workflow)
self._dump_and_reload_config()
return self.cfg

def _fill_dataset_configs(self):
for dataset_cfg in self.cfg["datasets"]:
fill_test_range_use_num_prompts(self.cfg["cli_args"].get("num_prompts"), dataset_cfg)
Expand Down
176 changes: 170 additions & 6 deletions ais_bench/benchmark/cli/workers.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import os
import os.path as osp
import copy
import shutil
from abc import ABC, abstractmethod
from collections import defaultdict

Expand All @@ -8,12 +10,15 @@
from ais_bench.benchmark.registry import PARTITIONERS, RUNNERS, build_from_cfg
from ais_bench.benchmark.utils.config.run import get_config_type
from ais_bench.benchmark.utils.logging.logger import AISLogger
from ais_bench.benchmark.utils.logging.exceptions import PredictionInvalidException
from ais_bench.benchmark.utils.logging.error_codes import TMAN_CODES
from ais_bench.benchmark.partitioners import NaivePartitioner
from ais_bench.benchmark.runners import LocalRunner
from ais_bench.benchmark.tasks import OpenICLEvalTask, OpenICLApiInferTask, OpenICLInferTask
from ais_bench.benchmark.summarizers import DefaultSummarizer, DefaultPerfSummarizer
from ais_bench.benchmark.calculators import DefaultPerfMetricCalculator
from ais_bench.benchmark.cli.utils import fill_model_path_if_datasets_need
from ais_bench.benchmark.utils.file.file import load_jsonl, dump_jsonl

logger = AISLogger()

Expand Down Expand Up @@ -108,6 +113,133 @@ def _update_tasks_cfg(self, tasks, cfg: ConfigDict):
task.attack = cfg.attack


class JudgeInfer(BaseWorker):
def update_cfg(self, cfg: ConfigDict) -> None:
def get_task_type() -> str:
if cfg["datasets"][0]["judge_infer_cfg"]["judge_model"]["attr"] == "service":
return get_config_type(OpenICLApiInferTask)
else:
return get_config_type(OpenICLInferTask)

new_cfg = dict(
judge_infer=dict(
partitioner=dict(type=get_config_type(NaivePartitioner)),
runner=dict(
max_num_workers=self.args.max_num_workers,
max_workers_per_gpu=self.args.max_workers_per_gpu,
debug=self.args.debug,
task=dict(type=get_task_type()),
type=get_config_type(LocalRunner),
),
),
)

cfg.merge_from_dict(new_cfg)
if cfg.cli_args.debug:
cfg.judge_infer.runner.debug = True
cfg.judge_infer.partitioner["out_dir"] = osp.join(cfg["work_dir"], "predictions/")
return cfg

def do_work(self, cfg: ConfigDict):
partitioner = PARTITIONERS.build(cfg.judge_infer.partitioner)
logger.info("Starting inference tasks...")
self._cfg_pre_process(cfg)
tasks = partitioner(cfg)

# delete the tasks without judge_infer_cfg
new_tasks = []
for task in tasks:
if task["datasets"][0][0].get("judge_infer_cfg"):
new_tasks.append(task)
tasks = new_tasks
if len(tasks) == 0:
return

# update tasks cfg before run
self._update_tasks_cfg(tasks, cfg)

if (
cfg.get("cli_args", {}).get("merge_ds", False)
or cfg.get("cli_args", {}).get("mode") == "perf" # performance mode will enable merge datasets by default
):
logger.info("Merging datasets with the same model and inferencer...")
tasks = self._merge_datasets(tasks)

runner = RUNNERS.build(cfg.judge_infer.runner)
runner(tasks)
self._result_post_process(tasks, cfg)
logger.info("Inference tasks completed.")

def _merge_datasets(self, tasks):
# merge datasets with the same model, dataset type and inferencer
task_groups = defaultdict(list)
for task in tasks:
key = (
task["models"][0]["abbr"] # same model
+ "_"
+ str(task['datasets'][0][0]['type']) # same dataset type
+ "_"
+ str(task["datasets"][0][0]["infer_cfg"]["inferencer"]) # same inferencer with the same args
)
Comment on lines +177 to +183
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The key for grouping tasks is generated by converting the inferencer dictionary to a string using str(). The string representation of a dictionary is not guaranteed to be consistent across different Python versions or even runs, which could lead to tasks not being merged correctly. A more robust approach would be to create a canonical representation, for example by serializing to a sorted JSON string. You will need to import json for this suggestion to work.

Suggested change
key = (
task["models"][0]["abbr"] # same model
+ "_"
+ str(task['datasets'][0][0]['type']) # same dataset type
+ "_"
+ str(task["datasets"][0][0]["infer_cfg"]["inferencer"]) # same inferencer with the same args
)
key = (
task["models"][0]["abbr"] # same model
+ "_"
+ str(task['datasets'][0][0]['type']) # same dataset type
+ "_"
+ json.dumps(task["datasets"][0][0]["infer_cfg"]["inferencer"], sort_keys=True) # same inferencer with the same args
)

task_groups[key].append(task)
new_tasks = []
for key, task_group in task_groups.items():
new_task = copy.deepcopy(task_group[0])
if len(task_group) > 1:
for t in task_group[1:]:
new_task["datasets"][0].extend(t["datasets"][0])
new_tasks.append(new_task)
return new_tasks
Comment on lines +173 to +192
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This _merge_datasets method is identical to the one in the Infer class. To improve maintainability and reduce redundancy, consider moving this method to the BaseWorker class or a shared utility module.


def _cfg_pre_process(self, cfg: ConfigDict) -> None:
self.org_dataset_abbrs = {}
def change_judge_dataset_abbr(item):
if item.get("judge_infer_cfg"):
org_dataset_abbr = item["abbr"]
new_dataset_abbr = f'{item["abbr"]}-{item["judge_infer_cfg"]["judge_model"]["abbr"]}'
item["abbr"] = new_dataset_abbr
self.org_dataset_abbrs[new_dataset_abbr] = org_dataset_abbr
if cfg.get('model_dataset_combinations', None) is not None:
for item in cfg.model_dataset_combinations:
for dataset in item["datasets"]:
change_judge_dataset_abbr(dataset)
for dataset in cfg.datasets:
change_judge_dataset_abbr(dataset)
return cfg

def _update_tasks_cfg(self, tasks, cfg: ConfigDict):
# update parameters to correct sub cfg
if hasattr(cfg, "attack"):
for task in tasks:
cfg.attack.dataset = task.datasets[0][0].abbr
task.attack = cfg.attack

# update judge cfgs to model cfgs and data
for task in tasks:
task["datasets"][0][0]["predictions_path"] = osp.join(cfg.judge_infer.partitioner.out_dir, task["models"][0]["abbr"], f'{self.org_dataset_abbrs[task["datasets"][0][0]["abbr"]]}.jsonl')
if not osp.exists(task["datasets"][0][0]["predictions_path"]):
raise PredictionInvalidException(TMAN_CODES.UNKNOWN_ERROR, f"Predictions path {task['datasets'][0][0]['predictions_path']} does not exist.")
model_abbr = task["models"][0]["abbr"]
task["models"][0] = task["datasets"][0][0]["judge_infer_cfg"].pop("judge_model")
task["models"][0]["abbr"] = model_abbr
task["datasets"][0][0]["type"] = task["datasets"][0][0]["judge_infer_cfg"].pop("judge_dataset_type")
task["datasets"][0][0]["reader_cfg"] = task["datasets"][0][0]["judge_infer_cfg"].pop("judge_reader_cfg")
task["datasets"][0][0]["infer_cfg"] = task["datasets"][0][0].pop("judge_infer_cfg")

def _result_post_process(self, tasks, cfg: ConfigDict):
# Reconstruct the judge infer predictions to normal predictions format
for task in tasks:
model_org_prediction_path = task["datasets"][0][0]["predictions_path"]
model_preds: dict = {item["uuid"]: item for item in load_jsonl(model_org_prediction_path)}
judge_org_prediction_path = osp.join(cfg.judge_infer.partitioner.out_dir, task["models"][0]["abbr"], f'{task["datasets"][0][0]["abbr"]}.jsonl')
judge_preds: list = load_jsonl(judge_org_prediction_path)
for i, pred in enumerate(judge_preds):
uuid = pred["gold"]
judge_preds[i]["id"] = model_preds[uuid]["id"]
os.remove(judge_org_prediction_path)
dump_jsonl(judge_preds, judge_org_prediction_path)
Comment on lines +239 to +240
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The current implementation removes the original prediction file before writing the new one. If the dump_jsonl operation fails for any reason (e.g., disk full, permission error), the original data will be lost. It's safer to write to a temporary file first and then atomically rename it to the final destination.

Suggested change
os.remove(judge_org_prediction_path)
dump_jsonl(judge_preds, judge_org_prediction_path)
tmp_path = f"{judge_org_prediction_path}.tmp"
dump_jsonl(judge_preds, tmp_path)
shutil.move(tmp_path, judge_org_prediction_path)



class Eval(BaseWorker):
def update_cfg(self, cfg: ConfigDict) -> None:
new_cfg = dict(
Expand Down Expand Up @@ -136,9 +268,11 @@ def update_cfg(self, cfg: ConfigDict) -> None:
def do_work(self, cfg: ConfigDict):
partitioner = PARTITIONERS.build(cfg.eval.partitioner)
logger.info("Starting evaluation tasks...")
self._cfg_pre_process(cfg)

tasks = partitioner(cfg)

# update tasks cfg before run
# Update tasks cfg before run
self._update_tasks_cfg(tasks, cfg)

runner = RUNNERS.build(cfg.eval.runner)
Expand All @@ -150,9 +284,28 @@ def do_work(self, cfg: ConfigDict):
runner(tasks)
logger.info("Evaluation tasks completed.")

def _cfg_pre_process(self, cfg: ConfigDict) -> None:
self.org_dataset_abbrs = {}
def change_eval_dataset_abbr(item):
if item.get("judge_infer_cfg"):
org_dataset_abbr = item["abbr"]
new_dataset_abbr = f'{item["abbr"]}-{item["judge_infer_cfg"]["judge_model"]["abbr"]}'
item["abbr"] = new_dataset_abbr
self.org_dataset_abbrs[new_dataset_abbr] = org_dataset_abbr
if cfg.get('model_dataset_combinations', None) is not None:
for item in cfg.model_dataset_combinations:
for dataset in item["datasets"]:
change_eval_dataset_abbr(dataset)
for dataset in cfg.datasets:
change_eval_dataset_abbr(dataset)
return cfg
Comment on lines +287 to +301
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This _cfg_pre_process method is very similar to the one in the JudgeInfer class. This code duplication makes the code harder to maintain. Consider refactoring this logic into a shared method or utility function to avoid redundancy.


def _update_tasks_cfg(self, tasks, cfg: ConfigDict):
# update parameters to correct sub cfg
pass
# Replace default model config to judge model config
self.judge_result_paths = {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The instance variable self.judge_result_paths is initialized here but it is never used anywhere else in the class. This appears to be dead code and should be removed to improve clarity.

Suggested change
self.judge_result_paths = {}
# self.judge_result_paths = {}

for task in tasks:
if task["datasets"][0][0].get("judge_infer_cfg"):
task["datasets"][0][0].pop("judge_infer_cfg")


class AccViz(BaseWorker):
Expand All @@ -171,6 +324,7 @@ def update_cfg(self, cfg: ConfigDict) -> None:
def do_work(self, cfg: ConfigDict) -> int:
logger.info("Summarizing evaluation results...")
summarizer_cfg = cfg.get("summarizer", {})
cfg = self._cfg_pre_process(cfg)

# For subjective summarizer
if summarizer_cfg.get("function", None):
Expand Down Expand Up @@ -203,6 +357,13 @@ def do_work(self, cfg: ConfigDict) -> int:
summarizer = build_from_cfg(summarizer_cfg)
summarizer.summarize(time_str=self.args.cfg_time_str)

def _cfg_pre_process(self, cfg: ConfigDict) -> None:
for i, dataset in enumerate(cfg.datasets):
if dataset.get("judge_infer_cfg"):
cfg.datasets[i]["abbr"] = f'{cfg.datasets[i]["abbr"]}-{cfg.datasets[i]["judge_infer_cfg"]["judge_model"]["abbr"]}'
cfg.datasets[i].pop("judge_infer_cfg")
return cfg


class PerfViz(BaseWorker):
def update_cfg(self, cfg: ConfigDict) -> None:
Expand Down Expand Up @@ -233,9 +394,11 @@ def do_work(self, cfg: ConfigDict) -> int:


WORK_FLOW = dict(
all=[Infer, Eval, AccViz],
all=[Infer, JudgeInfer, Eval, AccViz],
infer=[Infer],
eval=[Eval, AccViz],
judge=[JudgeInfer],
infer_judge=[Infer, JudgeInfer],
eval=[JudgeInfer, Eval, AccViz],
viz=[AccViz],
perf=[Infer, PerfViz],
perf_viz=[PerfViz],
Expand All @@ -249,4 +412,5 @@ def __init__(self, cfg, workflow) -> None:

def execute(self) -> None:
for worker in self.workflow:
worker.do_work(self.cfg)
cfg = copy.deepcopy(self.cfg)
worker.do_work(cfg)
Empty file.
22 changes: 10 additions & 12 deletions ais_bench/benchmark/models/local_models/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def __init__(self,
self.is_synthetic = False

@abstractmethod
def _generate(self, input, max_out_len: int) -> List[str]:
def generate(self, inputs, max_out_len: int) -> List[str]:
"""Generate result given a input.

Args:
Expand Down Expand Up @@ -133,17 +133,6 @@ def parse_template(self, prompt_template: PromptType, mode: str) -> str:
"""
return self.template_parser.parse_template(prompt_template, mode)

def generate_from_template(self, templates: List[PromptType], **kwargs):
"""Generate completion from a list of templates.

Args:
templates (List[PromptType]): A list of templates.
max_out_len (int): The maximum length of the output.
"""
inputs = self.parse_template(templates, mode='gen')
max_out_lens = kwargs.get("max_out_lens", [None] * len(templates))
return self.generate(inputs, max_out_lens, **kwargs)

def get_token_len_from_template(
self,
templates: Union[PromptType, List[PromptType]],
Expand Down Expand Up @@ -204,6 +193,15 @@ def sync_inputs(self, inputs: str) -> str:
def to(self, device):
self.model.to(device)

class BaseLMModel(BaseModel):
"""Base class for language models.
"""
def generate(self, inputs, outputs, **kwargs) -> List[str]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The generate method is type-hinted to return List[str], but its implementation in subclasses (e.g., QwenImageEditModel) modifies the outputs object in-place and does not return a value. This mismatch is misleading and can cause issues with static analysis and developer understanding. The return type should be None.

Suggested change
def generate(self, inputs, outputs, **kwargs) -> List[str]:
def generate(self, inputs, outputs, **kwargs) -> None:

raise AISBenchNotImplementedError(
MODEL_CODES.UNKNOWN_ERROR,
f'{self.__class__.__name__} does not supported'
' to be called in base classes')


class LMTemplateParser:
"""Intermidate prompt template parser, specifically for language models.
Expand Down
Loading
Loading