Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
224 changes: 222 additions & 2 deletions tests/UT/cli/test_workers.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import sys
import os
import pytest
from unittest.mock import patch, MagicMock, call
from unittest.mock import patch, MagicMock, call, mock_open
from collections import defaultdict

# 添加项目根目录到Python路径
Expand All @@ -12,6 +12,7 @@
from ais_bench.benchmark.cli.workers import (
BaseWorker,
Infer,
JudgeInfer,
Eval,
AccViz,
PerfViz,
Expand Down Expand Up @@ -664,11 +665,14 @@ def test_work_flow_dict():
assert 'viz' in WORK_FLOW
assert 'perf' in WORK_FLOW
assert 'perf_viz' in WORK_FLOW
assert 'judge' in WORK_FLOW
assert 'infer_judge' in WORK_FLOW

# 验证工作流内容正确
assert Infer in WORK_FLOW['all']
assert Eval in WORK_FLOW['all']
assert AccViz in WORK_FLOW['all']
assert JudgeInfer in WORK_FLOW['all']

assert Infer in WORK_FLOW['infer']

Expand All @@ -680,4 +684,220 @@ def test_work_flow_dict():
assert Infer in WORK_FLOW['perf']
assert PerfViz in WORK_FLOW['perf']

assert PerfViz in WORK_FLOW['perf_viz']
assert PerfViz in WORK_FLOW['perf_viz']

assert JudgeInfer in WORK_FLOW['judge']
assert Infer in WORK_FLOW['infer_judge']
assert JudgeInfer in WORK_FLOW['infer_judge']


class TestJudgeInfer:
def setup_method(self):
"""设置测试环境"""
self.mock_args = MagicMock()
self.mock_args.max_num_workers = 4
self.mock_args.max_workers_per_gpu = 2
self.mock_args.debug = False
self.judge_infer_worker = JudgeInfer(self.mock_args)

@patch('ais_bench.benchmark.cli.workers.get_config_type')
def test_update_cfg_service_model(self, mock_get_config_type):
"""测试update_cfg方法,使用service模型"""
mock_get_config_type.side_effect = ['MockNaivePartitioner', 'MockOpenICLApiInferTask', 'MockLocalRunner']

cfg = MockConfigDict({
'datasets': [{
'judge_infer_cfg': {
'judge_model': {'attr': 'service', 'abbr': 'judge_model'}
}
}],
'work_dir': '/test/workdir',
'cli_args': MagicMock(debug=False)
})

with patch('os.path.join', return_value='/test/workdir/predictions/'):
result = self.judge_infer_worker.update_cfg(cfg)

assert result == cfg
assert cfg['judge_infer']['partitioner']['type'] == 'MockNaivePartitioner'
assert cfg['judge_infer']['runner']['type'] == 'MockLocalRunner'
assert cfg['judge_infer']['runner']['task']['type'] == 'MockOpenICLApiInferTask'

@patch('ais_bench.benchmark.cli.workers.get_config_type')
def test_update_cfg_local_model(self, mock_get_config_type):
"""测试update_cfg方法,使用local模型"""
mock_get_config_type.side_effect = ['MockNaivePartitioner', 'MockOpenICLInferTask', 'MockLocalRunner']

cfg = MockConfigDict({
'datasets': [{
'judge_infer_cfg': {
'judge_model': {'attr': 'local', 'abbr': 'judge_model'}
}
}],
'work_dir': '/test/workdir',
'cli_args': MagicMock(debug=True)
})

with patch('os.path.join', return_value='/test/workdir/predictions/'):
self.judge_infer_worker.update_cfg(cfg)

assert cfg['judge_infer']['runner']['task']['type'] == 'MockOpenICLInferTask'
assert cfg['judge_infer']['runner']['debug'] == True

def test_cfg_pre_process(self):
"""测试_cfg_pre_process方法"""
cfg = MockConfigDict({
'datasets': [
{
'abbr': 'test_dataset',
'judge_infer_cfg': {
'judge_model': {'abbr': 'judge_model'}
}
}
]
})

self.judge_infer_worker._cfg_pre_process(cfg)

assert cfg['datasets'][0]['abbr'] == 'test_dataset-judge_model'
assert 'test_dataset-judge_model' in self.judge_infer_worker.org_dataset_abbrs

def test_cfg_pre_process_with_model_dataset_combinations(self):
"""测试_cfg_pre_process方法,包含model_dataset_combinations"""
cfg = MockConfigDict({
'model_dataset_combinations': [
{
'datasets': [
{
'abbr': 'combo_dataset',
'judge_infer_cfg': {
'judge_model': {'abbr': 'judge_model'}
}
}
]
}
],
'datasets': []
})

self.judge_infer_worker._cfg_pre_process(cfg)

assert cfg['model_dataset_combinations'][0]['datasets'][0]['abbr'] == 'combo_dataset-judge_model'

def test_merge_datasets(self):
"""测试_merge_datasets方法"""
task1 = {
'models': [{'abbr': 'model1'}],
'datasets': [[{'type': 'dataset_type', 'infer_cfg': {'inferencer': 'inferencer_type'}}]]
}
task2 = {
'models': [{'abbr': 'model1'}],
'datasets': [[{'type': 'dataset_type', 'infer_cfg': {'inferencer': 'inferencer_type'}}]]
}
task3 = {
'models': [{'abbr': 'model2'}],
'datasets': [[{'type': 'dataset_type', 'infer_cfg': {'inferencer': 'inferencer_type'}}]]
}

result = self.judge_infer_worker._merge_datasets([task1, task2, task3])

assert len(result) == 2
assert len(result[0]['datasets'][0]) == 2
assert len(result[1]['datasets'][0]) == 1

@patch('ais_bench.benchmark.cli.workers.PARTITIONERS')
@patch('ais_bench.benchmark.cli.workers.RUNNERS')
@patch('ais_bench.benchmark.cli.workers.logger')
def test_do_work_no_tasks(self, mock_logger, mock_runners, mock_partitioners):
"""测试do_work方法,没有有效任务的情况"""
mock_partitioner = MagicMock()
mock_partitioners.build.return_value = mock_partitioner
mock_tasks = [
{
'datasets': [[{}]] # 没有judge_infer_cfg
}
]
mock_partitioner.return_value = mock_tasks

cfg = MockConfigDict({
'judge_infer': {
'partitioner': {},
'runner': {}
},
'datasets': []
})

with patch.object(self.judge_infer_worker, '_cfg_pre_process'):
with patch.object(self.judge_infer_worker, '_update_tasks_cfg'):
self.judge_infer_worker.do_work(cfg)

mock_runners.build.assert_not_called()

@patch('ais_bench.benchmark.cli.workers.load_jsonl')
@patch('ais_bench.benchmark.cli.workers.dump_jsonl')
@patch('os.path.exists')
@patch('os.remove')
def test_result_post_process(self, mock_remove, mock_exists, mock_dump_jsonl, mock_load_jsonl):
"""测试_result_post_process方法"""
mock_load_jsonl.side_effect = [
[{'uuid': 'uuid1', 'id': 'id1'}], # model_preds
[{'gold': 'uuid1', 'prediction': 'pred1'}] # judge_preds
]
mock_exists.return_value = True

task = {
'datasets': [[{
'predictions_path': '/test/model_pred.jsonl',
'abbr': 'test_dataset-judge_model'
}]],
'models': [{'abbr': 'model1'}]
}
tasks = [task]

cfg = MockConfigDict({
'judge_infer': {
'partitioner': {
'out_dir': '/test/predictions'
}
}
})

self.judge_infer_worker.org_dataset_abbrs = {'test_dataset-judge_model': 'test_dataset'}

self.judge_infer_worker._result_post_process(tasks, cfg)

mock_remove.assert_called_once()
mock_dump_jsonl.assert_called_once()

def test_update_tasks_cfg_with_judge_infer(self):
"""测试_update_tasks_cfg方法,包含judge_infer_cfg"""
self.judge_infer_worker.org_dataset_abbrs = {'test_dataset-judge_model': 'test_dataset'}

task = {
'models': [{'abbr': 'model1'}],
'datasets': [[{
'abbr': 'test_dataset-judge_model',
'judge_infer_cfg': {
'judge_model': {'type': 'judge_model_type'},
'judge_dataset_type': 'judge_dataset',
'judge_reader_cfg': {'test': 'cfg'}
}
}]]
}
tasks = [task]

cfg = MockConfigDict({
'judge_infer': {
'partitioner': {
'out_dir': '/test/predictions'
}
}
})

with patch('os.path.join', return_value='/test/predictions/model1/test_dataset.jsonl'):
with patch('os.path.exists', return_value=True):
self.judge_infer_worker._update_tasks_cfg(tasks, cfg)

assert 'judge_infer_cfg' not in task['datasets'][0][0]
assert task['models'][0]['type'] == 'judge_model_type'
assert task['datasets'][0][0]['type'] == 'judge_dataset'
Loading