-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathwebui.py
More file actions
1832 lines (1609 loc) · 96.3 KB
/
webui.py
File metadata and controls
1832 lines (1609 loc) · 96.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python3
"""
数据质量检测工具 - 简约Web UI界面
基于Gradio构建的简洁图形化界面
"""
import gradio as gr
import os
import json
from pathlib import Path
from typing import List, Dict, Any, Optional
import tempfile
import shutil
from data_quality_checker import DataQualityChecker
from batch_processor import BatchProcessor
class DataQualityWebUI:
"""数据质量检测Web UI界面"""
def __init__(self):
self.checker = DataQualityChecker("config.yaml") # 固定使用默认配置
self.available_directories = self._get_common_directories()
def _get_common_directories(self) -> List[str]:
"""获取常用目录列表"""
directories = ["."] # 当前目录
# 检测是否为WSL环境
is_wsl = 'microsoft' in os.uname().release.lower() if hasattr(os, 'uname') else False
# Windows常见目录
if os.name == 'nt': # 原生Windows
user_home = os.path.expanduser("~")
windows_dirs = [
user_home,
os.path.join(user_home, "Desktop"),
os.path.join(user_home, "Documents"),
os.path.join(user_home, "Downloads"),
]
# 添加存在的Windows目录
for d in windows_dirs:
if os.path.exists(d):
directories.append(d)
# 添加常见驱动器
for drive in ['C:', 'D:', 'E:', 'F:']:
if os.path.exists(drive + '\\'):
directories.append(drive + '\\')
elif is_wsl: # WSL环境
# WSL中的Windows驱动器挂载在/mnt/下
for drive in ['c', 'd', 'e', 'f']:
wsl_drive_path = f'/mnt/{drive}'
if os.path.exists(wsl_drive_path):
directories.append(f'{wsl_drive_path} (Windows {drive.upper()}盘)')
# WSL中的Windows用户目录
try:
# 尝试获取Windows用户目录
windows_user_dirs = [
'/mnt/c/Users',
'/mnt/d/Users' if os.path.exists('/mnt/d/Users') else None
]
for user_dir in windows_user_dirs:
if user_dir and os.path.exists(user_dir):
directories.append(f'{user_dir} (Windows用户目录)')
# 尝试找到当前用户的目录
try:
for user_folder in os.listdir(user_dir):
user_path = os.path.join(user_dir, user_folder)
if os.path.isdir(user_path):
desktop_path = os.path.join(user_path, 'Desktop')
documents_path = os.path.join(user_path, 'Documents')
downloads_path = os.path.join(user_path, 'Downloads')
if os.path.exists(desktop_path):
directories.append(f'{desktop_path} (Windows桌面)')
if os.path.exists(documents_path):
directories.append(f'{documents_path} (Windows文档)')
if os.path.exists(downloads_path):
directories.append(f'{downloads_path} (Windows下载)')
except:
pass
except:
pass
# Linux/WSL 常见目录
directories.extend(['/home', '/tmp'])
else:
# Linux/Mac 常见目录
directories.extend(['/home', '/tmp', '/var'])
# 添加一些常见的项目目录
common_dirs = ["docs", "data", "input", "output", "files", "documents", "test"]
for dir_name in common_dirs:
if os.path.exists(dir_name) and os.path.isdir(dir_name):
directories.append(dir_name)
# 添加当前目录下的所有子目录
try:
for item in os.listdir('.'):
if os.path.isdir(item) and not item.startswith('.') and item not in directories:
directories.append(item)
except:
pass
# 去除重复并过滤存在的目录
unique_dirs = []
for d in directories:
# 提取实际路径用于存在性检查
actual_path = d.split(" (Windows")[0] if " (Windows" in d else d
if d == "." or (os.path.exists(actual_path) and os.path.isdir(actual_path)):
if d not in unique_dirs:
unique_dirs.append(d)
return unique_dirs
def _get_subdirectories(self, parent_dir: str) -> List[str]:
"""获取指定目录下的子目录"""
if not parent_dir or not os.path.exists(parent_dir) or not os.path.isdir(parent_dir):
return []
subdirs = []
try:
for item in os.listdir(parent_dir):
item_path = os.path.join(parent_dir, item)
if os.path.isdir(item_path) and not item.startswith('.'):
subdirs.append(item_path)
except PermissionError:
pass
except Exception:
pass
return sorted(subdirs)
def _browse_directory(self, current_dir: str, action: str) -> List[str]:
"""浏览目录"""
if action == "refresh" or not current_dir:
return self._get_common_directories()
if action == "browse_subdirs":
subdirs = self._get_subdirectories(current_dir)
# 添加父目录选项
parent_dir = os.path.dirname(current_dir)
if parent_dir and parent_dir != current_dir:
subdirs.insert(0, f"📁 {parent_dir} (上级目录)")
# 添加当前目录选项
subdirs.insert(0, f"✅ {current_dir} (选择此目录)")
return subdirs
return self._get_common_directories()
def _get_files_in_directory(self, directory: str, pattern: str = "*.txt") -> List[str]:
"""获取目录中的文件列表"""
if not directory or not os.path.exists(directory):
return []
try:
files = []
dir_path = Path(directory)
# 支持不同的文件模式
if pattern == "*.txt":
files = [str(f) for f in dir_path.glob("*.txt")]
elif pattern == "*.md":
files = [str(f) for f in dir_path.glob("*.md")]
elif pattern == "*.docx":
files = [str(f) for f in dir_path.glob("*.docx")]
else:
files = [str(f) for f in dir_path.glob("*.*") if f.is_file()]
return sorted(files)
except:
return []
def update_file_list(self, directory: str, pattern: str) -> gr.Dropdown:
"""更新文件列表"""
actual_dir = self._extract_actual_path(directory)
files = self._get_files_in_directory(actual_dir, pattern)
return gr.Dropdown(choices=files, value=None, label="选择文件")
def _create_output_directory(self, base_name: str = "output") -> str:
"""创建输出目录"""
counter = 1
output_dir = base_name
while os.path.exists(output_dir):
output_dir = f"{base_name}_{counter}"
counter += 1
os.makedirs(output_dir, exist_ok=True)
return output_dir
def _get_output_directories(self) -> List[str]:
"""获取输出目录选项"""
dirs = ["(自动创建)"]
# 常见输出目录
common_output_dirs = ["output", "results", "reports", "processed"]
dirs.extend(common_output_dirs)
# Windows用户目录
if os.name == 'nt':
user_home = os.path.expanduser("~")
user_dirs = [
os.path.join(user_home, "Desktop"),
os.path.join(user_home, "Documents"),
]
for d in user_dirs:
if os.path.exists(d):
dirs.append(d)
# 添加现有的输出目录
try:
for item in os.listdir('.'):
if os.path.isdir(item) and (item.startswith('output') or item.startswith('result') or item.startswith('report')):
if item not in dirs:
dirs.append(item)
except:
pass
return dirs
def _extract_actual_path(self, display_path: str) -> str:
"""从显示路径中提取实际路径"""
# 处理带有描述性标签的路径
if " (Windows " in display_path and "盘)" in display_path:
# 如 "/mnt/c (Windows C盘)" -> "/mnt/c"
return display_path.split(" (Windows ")[0]
elif " (Windows" in display_path and ")" in display_path:
# 如 "/mnt/c/Users (Windows用户目录)" -> "/mnt/c/Users"
return display_path.split(" (Windows")[0]
else:
return display_path
def handle_directory_browse(self, selected_dir: str) -> tuple:
"""处理目录浏览"""
if selected_dir.startswith("✅"):
# 用户选择了这个目录
actual_dir = selected_dir.replace("✅ ", "").replace(" (选择此目录)", "")
actual_dir = self._extract_actual_path(actual_dir)
files = self._get_files_in_directory(actual_dir, "*.txt")
return gr.Dropdown(choices=files, value=None), actual_dir, gr.Button(visible=False)
elif selected_dir.startswith("📁"):
# 用户要进入上级目录
parent_dir = selected_dir.replace("📁 ", "").replace(" (上级目录)", "")
parent_dir = self._extract_actual_path(parent_dir)
subdirs = self._browse_directory(parent_dir, "browse_subdirs")
return gr.Dropdown(choices=[], value=None), parent_dir, gr.Button(visible=True)
else:
# 用户要浏览子目录
actual_dir = self._extract_actual_path(selected_dir)
subdirs = self._browse_directory(actual_dir, "browse_subdirs")
return gr.Dropdown(choices=[], value=None), actual_dir, gr.Button(visible=True)
def _create_custom_config(self,
truncation_check, completeness_check,
traditional_simplified_check, format_consistency_check,
duplicate_filter_check,
special_chars_check, special_chars_action, special_chars_replacement,
bracket_matching_check,
emoji_detection_check, emoji_action, emoji_replacement,
escape_chars_check, escape_action,
abnormal_chars_check, abnormal_action, abnormal_replacement):
"""根据用户选择创建自定义配置"""
import yaml
import tempfile
import os
# 读取默认配置
with open("config.yaml", 'r', encoding='utf-8') as f:
config = yaml.safe_load(f)
# 根据用户选择更新配置
config['text_integrity']['truncation_detection']['enabled'] = truncation_check
config['text_integrity']['completeness_validation']['enabled'] = completeness_check
config['text_consistency']['traditional_simplified_mix']['enabled'] = traditional_simplified_check
config['text_consistency']['chinese_english_format']['enabled'] = format_consistency_check
config['content_duplication']['duplicate_filter']['enabled'] = duplicate_filter_check
config['format_validation']['special_characters']['enabled'] = special_chars_check
config['format_validation']['special_characters']['action'] = special_chars_action
config['format_validation']['special_characters']['replacement_text'] = special_chars_replacement
config['format_validation']['json_format_validation']['enabled'] = bracket_matching_check
config['format_validation']['garbled_characters']['enabled'] = False # 始终禁用乱码符号过滤
# 更新新增的检测项配置
config['format_validation']['special_characters']['emoji_detection']['enabled'] = emoji_detection_check
config['format_validation']['special_characters']['emoji_detection']['action'] = emoji_action
config['format_validation']['special_characters']['emoji_detection']['replacement_text'] = emoji_replacement
# 特殊符号检测始终启用,使用特殊字符检测的配置
config['format_validation']['special_characters']['special_symbol_detection']['enabled'] = True
config['format_validation']['special_characters']['special_symbol_detection']['action'] = special_chars_action
config['format_validation']['special_characters']['special_symbol_detection']['replacement_text'] = special_chars_replacement
# 转义字符处理配置
config['format_validation']['special_characters']['escape_characters']['enabled'] = escape_chars_check
config['format_validation']['special_characters']['escape_characters']['action'] = escape_action
config['format_validation']['special_characters']['abnormal_chars']['enabled'] = abnormal_chars_check
config['format_validation']['special_characters']['abnormal_chars']['action'] = abnormal_action
config['format_validation']['special_characters']['abnormal_chars']['replacement_text'] = abnormal_replacement
# 创建临时配置文件
temp_config = tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False, encoding='utf-8')
yaml.dump(config, temp_config, default_flow_style=False, allow_unicode=True)
temp_config.close()
return temp_config.name
def _custom_batch_process(self, processor, input_dir, pattern, output_dir,
generate_report, output_format, generate_cleaned):
"""自定义批量处理,支持可选报告生成"""
import glob
import os
from pathlib import Path
# 获取文件列表
search_pattern = os.path.join(input_dir, "**", pattern) if pattern != "*.*" else os.path.join(input_dir, "**", "*.*")
file_paths = glob.glob(search_pattern, recursive=True)
file_paths = [f for f in file_paths if os.path.isfile(f)]
results = {
"total_files": len(file_paths),
"processed_files": 0,
"failed_files": 0,
"summary": {},
"details": {}
}
for file_path in file_paths:
try:
# 读取文件内容
with open(file_path, 'r', encoding='utf-8') as f:
text = f.read()
# 执行质量检测
check_results = processor.checker.check_text_quality(text)
# 保存到结果中
results["details"][file_path] = {
"passed": all(r.passed for r in check_results.values()),
"results": {k: {"passed": v.passed, "issues": len(v.issues)}
for k, v in check_results.items()}
}
# 生成报告文件(如果需要)
if generate_report and output_dir:
file_stem = Path(file_path).stem
if output_format == "JSON":
report = processor.checker.generate_report(check_results, "json")
report_file = os.path.join(output_dir, f"{file_stem}_report.json")
elif output_format == "YAML":
report = processor.checker.generate_report(check_results, "yaml")
report_file = os.path.join(output_dir, f"{file_stem}_report.yaml")
else:
report = processor.checker.generate_report(check_results, "txt")
report_file = os.path.join(output_dir, f"{file_stem}_report.txt")
with open(report_file, 'w', encoding='utf-8') as f:
f.write(report)
# 生成清洗文件(如果需要)
if generate_cleaned and output_dir:
cleaned_text = text
for result in check_results.values():
if result.cleaned_text:
cleaned_text = result.cleaned_text
break
if cleaned_text != text:
file_stem = Path(file_path).stem
file_suffix = Path(file_path).suffix
cleaned_file = os.path.join(output_dir, f"{file_stem}_cleaned{file_suffix}")
with open(cleaned_file, 'w', encoding='utf-8') as f:
f.write(cleaned_text)
results["processed_files"] += 1
except Exception as e:
results["failed_files"] += 1
results["details"][file_path] = {"error": str(e)}
# 生成汇总统计
rule_stats = {}
for file_details in results["details"].values():
if "results" in file_details:
for rule_name, rule_result in file_details["results"].items():
if rule_name not in rule_stats:
rule_stats[rule_name] = {"total_files": 0, "passed_files": 0}
rule_stats[rule_name]["total_files"] += 1
if rule_result["passed"]:
rule_stats[rule_name]["passed_files"] += 1
results["summary"] = rule_stats
return results
def process_single_file(self, input_file, current_dir, generate_report, output_format, output_dir, generate_cleaned,
special_chars_check, special_chars_action, special_chars_replacement,
bracket_matching_check,
emoji_detection_check, emoji_action, emoji_replacement,
escape_chars_check, escape_action,
abnormal_chars_check, abnormal_action, abnormal_replacement):
"""处理单个文件"""
try:
if not input_file or not os.path.exists(input_file):
return "❌ 请选择有效的输入文件", "", "", ""
# 创建自定义配置
custom_config_file = self._create_custom_config(
False, False, False, # 禁用的规则:truncation_check, completeness_check, traditional_simplified_check
False, False, special_chars_check, special_chars_action, special_chars_replacement, # 禁用的规则:format_consistency_check, duplicate_filter_check, 启用special_chars_check
bracket_matching_check, # bracket_matching_check
emoji_detection_check, emoji_action, emoji_replacement,
escape_chars_check, escape_action,
abnormal_chars_check, abnormal_action, abnormal_replacement
)
# 使用自定义配置创建检测器
custom_checker = DataQualityChecker(custom_config_file)
# 读取文件内容
with open(input_file, 'r', encoding='utf-8') as f:
text = f.read()
# 执行检测
results = custom_checker.check_text_quality(text)
# 生成报告
if output_format == "JSON":
report = custom_checker.generate_report(results, "json")
elif output_format == "YAML":
report = custom_checker.generate_report(results, "yaml")
else:
report = custom_checker.generate_report(results, "txt")
# 处理输出目录和保存报告文件
report_file = ""
if generate_report:
if output_dir == "(自动创建)":
output_dir = self._create_output_directory()
else:
os.makedirs(output_dir, exist_ok=True)
# 保存报告文件
file_stem = Path(input_file).stem
if output_format == "JSON":
report_file = os.path.join(output_dir, f"{file_stem}_report.json")
elif output_format == "YAML":
report_file = os.path.join(output_dir, f"{file_stem}_report.yaml")
else:
report_file = os.path.join(output_dir, f"{file_stem}_report.txt")
with open(report_file, 'w', encoding='utf-8') as f:
f.write(report)
else:
# 如果不生成报告文件,但需要生成清洗文件时,仍需要输出目录
if generate_cleaned:
if output_dir == "(自动创建)":
output_dir = self._create_output_directory()
else:
os.makedirs(output_dir, exist_ok=True)
# 生成清洗后的文本
cleaned_text = ""
cleaned_file = ""
if generate_cleaned:
# 确保有输出目录
if not output_dir:
output_dir = self._create_output_directory()
cleaned_text = text
for result in results.values():
if result.cleaned_text:
cleaned_text = result.cleaned_text
break
if cleaned_text != text:
file_stem = Path(input_file).stem # 确保file_stem被定义
cleaned_file = os.path.join(output_dir, f"{file_stem}_cleaned{Path(input_file).suffix}")
with open(cleaned_file, 'w', encoding='utf-8') as f:
f.write(cleaned_text)
else:
cleaned_text = "文本无需清洗"
# 统计结果
total_rules = len(results)
rules_with_issues = sum(1 for r in results.values() if not r.passed)
status_msg = f"✅ 处理完成!\n\n"
status_msg += f"📊 检测结果:\n"
status_msg += f" • 检测规则数: {total_rules}\n"
status_msg += f" • 发现问题的规则: {rules_with_issues}\n\n"
if generate_report or generate_cleaned:
status_msg += f"📁 输出位置:\n"
if output_dir:
status_msg += f" • 输出目录: {output_dir}\n"
if report_file:
status_msg += f" • 报告文件: {os.path.basename(report_file)}\n"
if cleaned_file:
status_msg += f" • 清洗文件: {os.path.basename(cleaned_file)}"
else:
status_msg += f"📋 仅显示检测结果,未生成文件"
# 清理临时配置文件
try:
os.unlink(custom_config_file)
except:
pass
return status_msg, report, cleaned_text, output_dir or ""
except Exception as e:
# 即使出错也要清理临时文件
try:
os.unlink(custom_config_file)
except:
pass
return f"❌ 处理失败: {str(e)}", "", "", ""
def process_batch(
self,
current_input_dir: str,
file_pattern: str,
generate_report: bool,
output_format: str,
output_directory: str,
generate_cleaned: bool,
special_chars_check, special_chars_action, special_chars_replacement,
bracket_matching_check,
emoji_detection_check, emoji_action, emoji_replacement,
escape_chars_check, escape_action,
abnormal_chars_check, abnormal_action, abnormal_replacement
) -> tuple:
"""批量处理文件"""
try:
# 提取实际目录路径
actual_input_dir = self._extract_actual_path(current_input_dir)
if not actual_input_dir or not os.path.exists(actual_input_dir):
return "❌ 请选择有效的输入目录", "", ""
# 创建自定义配置
custom_config_file = self._create_custom_config(
False, False, False, # 禁用的规则:truncation_check, completeness_check, traditional_simplified_check
False, False, special_chars_check, special_chars_action, special_chars_replacement, # 禁用的规则:format_consistency_check, duplicate_filter_check, 启用special_chars_check
bracket_matching_check, # bracket_matching_check
emoji_detection_check, emoji_action, emoji_replacement,
escape_chars_check, escape_action,
abnormal_chars_check, abnormal_action, abnormal_replacement
)
# 使用自定义配置创建批量处理器
processor = BatchProcessor(custom_config_file)
# 处理输出目录
if generate_report or generate_cleaned:
if output_directory == "(自动创建)":
output_dir = self._create_output_directory("batch_output")
else:
output_dir = output_directory
os.makedirs(output_dir, exist_ok=True)
else:
output_dir = None
# 获取文件列表
if file_pattern == "*.txt":
pattern = "*.txt"
elif file_pattern == "*.md":
pattern = "*.md"
elif file_pattern == "*.docx":
pattern = "*.docx"
else:
pattern = "*.*"
# 手动执行批量处理以支持可选报告生成
results = self._custom_batch_process(
processor, actual_input_dir, pattern, output_dir,
generate_report, output_format, generate_cleaned
)
# 生成批量报告摘要
summary = f"🎯 批量处理完成!\n\n"
summary += f"📊 处理统计:\n"
summary += f" • 总文件数: {results['total_files']}\n"
summary += f" • 成功处理: {results['processed_files']}\n"
summary += f" • 处理失败: {results['failed_files']}\n\n"
if results['summary']:
summary += f"📋 质量检测统计:\n"
for rule_name, stats in results['summary'].items():
issue_files = stats['total_files'] - stats['passed_files']
summary += f" • {rule_name}: {issue_files} 个文件检测到问题\n"
if generate_report or generate_cleaned:
summary += f"\n📁 输出目录: {output_dir}"
if generate_report:
summary += f"\n📄 已生成报告文件 (格式: {output_format})"
if generate_cleaned:
summary += f"\n🧹 已生成清洗文件"
else:
summary += f"\n📋 仅显示检测结果,未生成文件"
# 详细报告
detailed_report = json.dumps(results, ensure_ascii=False, indent=2)
# 清理临时配置文件
try:
os.unlink(custom_config_file)
except:
pass
return summary, detailed_report, output_dir
except Exception as e:
# 即使出错也要清理临时文件
try:
os.unlink(custom_config_file)
except:
pass
return f"❌ 批量处理失败: {str(e)}", "", ""
def create_interface(self):
"""创建Gradio界面"""
# 自定义CSS样式
custom_css = """
.gradio-container {
max-width: 1400px !important;
margin: 0 auto !important;
}
.gr-group {
border-radius: 8px !important;
border: none !important;
background: #f8f9fa !important;
padding: 12px !important;
margin: 8px 0 !important;
box-shadow: 0 1px 2px rgba(0, 0, 0, 0.05) !important;
}
.gr-row {
gap: 8px !important;
}
.gr-column {
gap: 8px !important;
}
.compact-checkbox {
margin: 4px 0 !important;
}
.compact-dropdown {
margin: 2px 0 !important;
}
.section-header {
font-size: 16px !important;
font-weight: 600 !important;
color: #2c3e50 !important;
margin: 8px 0 !important;
padding: 6px 0 !important;
border-bottom: 2px solid #3498db !important;
}
.config-section {
border-left: 3px solid #3498db !important;
padding-left: 12px !important;
margin: 8px 0 !important;
}
/* 目录选择相关样式美化 */
.directory-browser {
background: linear-gradient(145deg, #f0f8ff 0%, #e6f3ff 100%) !important;
border: none !important;
border-radius: 12px !important;
padding: 16px !important;
box-shadow: 0 1px 3px rgba(74, 144, 226, 0.08) !important;
margin: 8px 0 !important;
}
.file-input-section {
background: linear-gradient(145deg, #fff5f5 0%, #ffe6e6 100%) !important;
border: none !important;
border-radius: 12px !important;
padding: 16px !important;
box-shadow: 0 1px 3px rgba(255, 107, 107, 0.08) !important;
margin: 8px 0 !important;
}
.output-config-section {
background: linear-gradient(145deg, #f0fff0 0%, #e6ffe6 100%) !important;
border: none !important;
border-radius: 12px !important;
padding: 16px !important;
box-shadow: 0 1px 3px rgba(82, 196, 26, 0.08) !important;
margin: 8px 0 !important;
}
/* 按钮美化 */
.browse-button {
background: linear-gradient(45deg, #667eea, #764ba2) !important;
border: none !important;
color: white !important;
font-weight: 600 !important;
border-radius: 8px !important;
padding: 8px 16px !important;
box-shadow: 0 2px 4px rgba(102, 126, 234, 0.2) !important;
}
.browse-button:hover {
background: linear-gradient(45deg, #5a6fd8, #6a42a0) !important;
box-shadow: 0 3px 6px rgba(102, 126, 234, 0.3) !important;
}
/* 下拉框美化 */
.directory-dropdown {
border: 1px solid #e0e6ed !important;
border-radius: 8px !important;
background: white !important;
box-shadow: 0 1px 2px rgba(0, 0, 0, 0.05) !important;
}
.directory-dropdown:focus {
border-color: #667eea !important;
box-shadow: 0 0 0 2px rgba(102, 126, 234, 0.1) !important;
}
/* 文本框美化 */
.directory-textbox {
border: 1px solid #e0e6ed !important;
border-radius: 8px !important;
background: white !important;
font-family: 'Monaco', 'Consolas', monospace !important;
}
.directory-textbox:focus {
border-color: #667eea !important;
box-shadow: 0 0 0 2px rgba(102, 126, 234, 0.1) !important;
}
/* 图标样式 */
.icon-text {
display: inline-flex !important;
align-items: center !important;
gap: 6px !important;
}
"""
with gr.Blocks(title="数据质量检测工具", theme=gr.themes.Soft(), css=custom_css) as iface:
gr.HTML("""
<div style="text-align: center; padding: 20px 0; background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); border-radius: 10px; margin-bottom: 20px;">
<h1 style="color: white; margin: 0; font-size: 2.5em;">🔍 数据质量检测工具</h1>
<p style="color: rgba(255,255,255,0.9); margin: 10px 0 0 0; font-size: 1.1em;">智能文本数据质量检测与清洗平台</p>
</div>
""")
with gr.Tabs():
# 单文件处理标签页
with gr.TabItem("📄 单文件处理"):
with gr.Row():
# 左侧:文件选择和输出配置
with gr.Column(scale=2):
with gr.Group(elem_classes=["file-input-section"]):
gr.HTML('<h3 class="section-header icon-text">📁 文件选择</h3>')
with gr.Row():
input_dir_dropdown = gr.Dropdown(
choices=self.available_directories,
value=".",
label="📂 浏览目录",
scale=3,
elem_classes=["directory-dropdown"]
)
browse_btn = gr.Button("🔍 浏览", scale=1, size="sm",
visible=False, elem_classes=["browse-button"])
with gr.Row():
current_input_dir = gr.Textbox(
label="当前目录",
value=".",
interactive=False,
scale=2,
elem_classes=["directory-textbox"]
)
file_pattern_radio = gr.Radio(
choices=["*.txt", "*.md", "*.docx", "*.*"],
value="*.txt",
label="文件类型",
scale=1
)
input_file_dropdown = gr.Dropdown(
choices=self._get_files_in_directory(".", "*.txt"),
label="📄 选择文件",
interactive=True,
elem_classes=["directory-dropdown"]
)
with gr.Group(elem_classes=["output-config-section"]):
gr.HTML('<h3 class="section-header icon-text">⚙️ 输出配置</h3>')
with gr.Row():
generate_report_check = gr.Checkbox(
label="生成检测报告",
value=True,
elem_classes=["compact-checkbox"]
)
generate_cleaned_check = gr.Checkbox(
label="生成清洗文件",
value=True,
elem_classes=["compact-checkbox"]
)
with gr.Row():
output_format_radio = gr.Radio(
choices=["json", "txt"],
value="json",
label="报告格式",
scale=1
)
with gr.Row():
output_dir_dropdown = gr.Dropdown(
choices=["./results"] + self.available_directories,
value="./results",
label="📁 输出目录",
scale=3,
elem_classes=["directory-dropdown"]
)
output_browse_btn = gr.Button("🔍 浏览", scale=1, size="sm",
elem_classes=["browse-button"])
with gr.Row():
current_output_dir = gr.Textbox(
label="当前输出目录",
value="./results",
interactive=False,
scale=2,
elem_classes=["directory-textbox"]
)
create_output_dir_btn = gr.Button("📂 创建目录", scale=1, size="sm")
# 右侧:检测规则配置
with gr.Column(scale=2):
with gr.Group():
gr.HTML('<h3 class="section-header">�️ 检测规则配置</h3>')
# 压缩配置项布局
with gr.Accordion("📐 格式规范检测", open=True):
with gr.Row():
special_chars_check = gr.Checkbox(
label="🔤 特殊字符",
value=True,
elem_classes=["compact-checkbox"]
)
special_chars_action = gr.Dropdown(
choices=["mark", "remove", "replace"],
value="mark",
label="处理方式",
scale=1,
elem_classes=["compact-dropdown"]
)
special_chars_replacement = gr.Textbox(
value="[特殊字符]",
label="替换文本",
scale=1,
visible=False
)
with gr.Row():
emoji_detection_check = gr.Checkbox(
label="😀 表情符号",
value=True,
elem_classes=["compact-checkbox"]
)
emoji_action = gr.Dropdown(
choices=["mark", "remove", "replace"],
value="mark",
label="处理方式",
scale=1,
elem_classes=["compact-dropdown"]
)
emoji_replacement = gr.Textbox(
value="[表情]",
label="替换文本",
scale=1,
visible=False
)
with gr.Row():
escape_chars_check = gr.Checkbox(
label="⚡ 转义字符",
value=True,
elem_classes=["compact-checkbox"]
)
escape_action = gr.Dropdown(
choices=["convert", "normalize", "mark"],
value="convert",
label="处理方式",
scale=1,
elem_classes=["compact-dropdown"]
)
with gr.Row():
abnormal_chars_check = gr.Checkbox(
label="⚠️ 异常字符",
value=True,
elem_classes=["compact-checkbox"]
)
abnormal_action = gr.Dropdown(
choices=["remove", "replace", "mark"],
value="remove",
label="处理方式",
scale=1,
elem_classes=["compact-dropdown"]
)
abnormal_replacement = gr.Textbox(
value="[异常字符]",
label="替换文本",
scale=1,
visible=False
)
with gr.Row():
bracket_matching_check = gr.Checkbox(
label="🔧 JSON/JSONL格式验证",
value=True,
info="验证JSON格式并自动修复常见问题",
elem_classes=["compact-checkbox"]
)
process_single_btn = gr.Button(
"🚀 开始检测",
variant="primary",
size="lg",
scale=1
)
# 动态显示/隐藏替换文本框的事件处理函数
def handle_special_chars_action_change(action):
return gr.Textbox(visible=(action == "replace"))
def handle_emoji_action_change(action):
return gr.Textbox(visible=(action == "replace"))
def handle_abnormal_action_change(action):
return gr.Textbox(visible=(action == "replace"))
def handle_dir_selection(selected_dir, pattern):
print(f"[DEBUG] handle_dir_selection called: selected_dir='{selected_dir}', pattern='{pattern}'")
# 增强的None检查,包括空字符串和各种形式的None
if (selected_dir is None or
selected_dir == "" or
selected_dir == "None" or
str(selected_dir).lower() == "none"):
print(f"[DEBUG] No directory selected (empty/None), returning to root")
return (
gr.Dropdown(choices=self.available_directories, value=None, interactive=True),
".",
gr.Dropdown(choices=[], value=None, interactive=True),
gr.Button(visible=True)
)
# 防止处理相同的选择(避免循环)
if hasattr(self, '_last_selected_dir') and selected_dir == self._last_selected_dir:
print(f"[DEBUG] Same directory as last selection, skipping")
return gr.update(), gr.update(), gr.update(), gr.update()
# 记录当前选择
self._last_selected_dir = selected_dir
if selected_dir == "🏠 返回根目录列表":
print(f"[DEBUG] Returning to root directory list")
self._last_selected_dir = None # 重置
return (
gr.Dropdown(choices=self.available_directories, value=None, interactive=True),
".",
gr.Dropdown(choices=self._get_files_in_directory(".", pattern), value=None, interactive=True),
gr.Button(visible=True)
)
elif selected_dir.startswith("✅"):
print(f"[DEBUG] User confirmed directory selection")
actual_dir = selected_dir.replace("✅ ", "").replace(" (选择此目录)", "")
actual_dir = self._extract_actual_path(actual_dir)
files = self._get_files_in_directory(actual_dir, pattern)
current_subdirs = self._browse_directory(actual_dir, "browse_subdirs")
print(f"[DEBUG] Confirmed directory: {actual_dir}, files: {len(files)}")
return (
gr.Dropdown(choices=current_subdirs, value=None, interactive=True),
actual_dir,
gr.Dropdown(choices=files, value=None, interactive=True),
gr.Button(visible=True)
)
elif selected_dir.startswith("📁"):
print(f"[DEBUG] Navigating to parent directory")
parent_dir = selected_dir.replace("📁 ", "").replace(" (上级目录)", "")
parent_dir = self._extract_actual_path(parent_dir)
subdirs = self._browse_directory(parent_dir, "browse_subdirs")