diff --git a/bkflow/pipeline_converter/constants.py b/bkflow/pipeline_converter/constants.py index b9800b3a9b..2a50f5a41e 100644 --- a/bkflow/pipeline_converter/constants.py +++ b/bkflow/pipeline_converter/constants.py @@ -1,6 +1,8 @@ # -*- coding: utf-8 -*- from enum import Enum +from pipeline.variable_framework.models import VariableModel + class NodeTypes(str, Enum): START_EVENT = "start_event" @@ -19,6 +21,18 @@ class NodeTypes(str, Enum): ] +class ConstantTypes(str, Enum): + # 自定义常量 + CUSTOM_CONSTANT = "custom" + # 节点输入常量 + COMPONENT_INPUTS_CONSTANT = "component_inputs" + # 节点输出常量 + COMPONENT_OUTPUTS_CONSTANT = "component_outputs" + + CUSTOM_CONSTANT_CUSTOMS = [variable.code for variable in VariableModel.objects.filter(status=True)] + CUSTOM_CONSTANT_TAGS = [variable.tag for variable in VariableModel.objects.filter(status=True)] + + class DataTypes(str, Enum): DATA_MODEL = "data_model" JSON = "json" diff --git a/bkflow/pipeline_converter/converters/data_model_to_web_pipeline/component.py b/bkflow/pipeline_converter/converters/data_model_to_web_pipeline/component.py index 4c91adc31d..f27a8182fa 100644 --- a/bkflow/pipeline_converter/converters/data_model_to_web_pipeline/component.py +++ b/bkflow/pipeline_converter/converters/data_model_to_web_pipeline/component.py @@ -12,7 +12,6 @@ def convert(self, *args, **kwargs): for field in fields: self.target_data[field.key] = { "need_render": field.need_render, - "key": field.key, "value": field.value, "hook": False, # TODO: 可配置 } diff --git a/bkflow/pipeline_converter/converters/data_model_to_web_pipeline/constant.py b/bkflow/pipeline_converter/converters/data_model_to_web_pipeline/constant.py index 40a96afc6f..9f7310647f 100644 --- a/bkflow/pipeline_converter/converters/data_model_to_web_pipeline/constant.py +++ b/bkflow/pipeline_converter/converters/data_model_to_web_pipeline/constant.py @@ -1 +1,83 @@ # -*- coding: utf-8 -*- +from bkflow.pipeline_converter.constants import ConstantTypes +from bkflow.pipeline_converter.converters.base import DataModelToPipelineTreeConverter +from bkflow.pipeline_converter.validators.constant import ( + ComponentInputValidator, + ConstantValidator, +) +from bkflow.pipeline_converter.validators.node import NodeTypeValidator + + +class SourceInfoConverter(DataModelToPipelineTreeConverter): + def convert(self): + self.target_data = {} + for info in self.source_data: + self.target_data[info.key] = [info.value] + return self.target_data + + +class CustomConstantConverter(DataModelToPipelineTreeConverter): + validators = [NodeTypeValidator(ConstantTypes.CUSTOM_CONSTANT.value), ConstantValidator()] + + def convert(self): + converter_data = self.source_data + self.target_data = { + "name": converter_data.name, + "key": f"${{{converter_data.key}}}", + "desc": converter_data.desc, + "value": converter_data.value, + "custom_type": converter_data.custom_type, + "show_type": converter_data.show_type, + "source_tag": converter_data.source_tag, + "source_type": ConstantTypes.CUSTOM_CONSTANT.value, + "source_info": SourceInfoConverter(converter_data.source_info).convert(), + "validation": converter_data.validation, + "version": converter_data.version, + "pre_render_mako": converter_data.pre_render_mako, + "is_meta": converter_data.is_meta, + } + + return self.target_data + + +class ComponentInputConverter(DataModelToPipelineTreeConverter): + validators = [NodeTypeValidator(ConstantTypes.COMPONENT_INPUTS_CONSTANT.value), ComponentInputValidator()] + + def convert(self): + converter_data = self.source_data + self.target_data = { + "name": converter_data.name, + "key": f"${{{converter_data.key}}}", + "desc": converter_data.desc, + "value": [converter_data.value], + "custom_type": converter_data.custom_type, + "show_type": converter_data.show_type, + "source_tag": converter_data.source_tag, + "source_type": ConstantTypes.COMPONENT_INPUTS_CONSTANT.value, + "source_info": SourceInfoConverter(converter_data.source_info).convert(), + "validation": converter_data.validation, + "version": converter_data.version, + "plugin_code": converter_data.plugin_code, + } + return self.target_data + + +class ComponentOutputConverter(DataModelToPipelineTreeConverter): + validators = [NodeTypeValidator(ConstantTypes.COMPONENT_OUTPUTS_CONSTANT.value)] + + def convert(self): + converter_data = self.source_data + self.target_data = { + "name": converter_data.name, + "key": f"${{{converter_data.key}}}", + "desc": converter_data.desc, + "value": converter_data.value, + "custom_type": converter_data.custom_type, + "show_type": converter_data.show_type, + "source_tag": converter_data.source_tag, + "source_type": ConstantTypes.COMPONENT_OUTPUTS_CONSTANT.value, + "source_info": SourceInfoConverter(converter_data.source_info).convert(), + "validation": converter_data.validation, + "plugin_code": converter_data.plugin_code, + } + return self.target_data diff --git a/bkflow/pipeline_converter/converters/data_model_to_web_pipeline/pipeline.py b/bkflow/pipeline_converter/converters/data_model_to_web_pipeline/pipeline.py index a593408ae6..5e759d285c 100644 --- a/bkflow/pipeline_converter/converters/data_model_to_web_pipeline/pipeline.py +++ b/bkflow/pipeline_converter/converters/data_model_to_web_pipeline/pipeline.py @@ -5,8 +5,13 @@ from pipeline.parser.utils import replace_all_id from pipeline.utils.uniqid import line_uniqid -from bkflow.pipeline_converter.constants import NodeTypes +from bkflow.pipeline_converter.constants import ConstantTypes, NodeTypes from bkflow.pipeline_converter.converters.base import DataModelToPipelineTreeConverter +from bkflow.pipeline_converter.converters.data_model_to_web_pipeline.constant import ( + ComponentInputConverter, + ComponentOutputConverter, + CustomConstantConverter, +) from bkflow.pipeline_converter.converters.data_model_to_web_pipeline.gateway import ( ConditionalParallelGatewayConverter, ConvergeGatewayConverter, @@ -19,6 +24,7 @@ StartNodeConverter, ) from bkflow.pipeline_converter.data_models import Flow, Node, Pipeline +from bkflow.pipeline_converter.hub import ConverterHub class PipelineConverter(DataModelToPipelineTreeConverter): @@ -28,6 +34,7 @@ def convert(self) -> dict: """ pipeline: Pipeline = self.source_data nodes: List[Node] = pipeline.nodes + constants = pipeline.constants self.target_data = { PE.id: pipeline.id, PE.name: pipeline.name, @@ -57,6 +64,10 @@ def convert(self) -> dict: elif node.type in NodeTypes.GATEWAYS: gateway_data = gateway_mapping[node.type](node).convert() self.target_data[PE.gateways][node.id] = gateway_data + + # 常量数据转换 + self.validate_constant(self.target_data[PE.activities], constants) + self.target_data[PE.constants] = self.constant_converter(constants) # 连线数据转换 flows = self._generate_flows_by_nodes(nodes) self.target_data[PE.flows] = {flow.id: flow.dict() for flow in flows} @@ -73,6 +84,24 @@ def convert(self) -> dict: replace_all_id(self.target_data) return self.target_data + def constant_converter(self, constants): + constant_type_converter_cls_name_map = { + ConstantTypes.CUSTOM_CONSTANT.value: CustomConstantConverter.__name__, + ConstantTypes.COMPONENT_INPUTS_CONSTANT.value: ComponentInputConverter.__name__, + ConstantTypes.COMPONENT_OUTPUTS_CONSTANT.value: ComponentOutputConverter.__name__, + } + result = {} + for index, constant in enumerate(constants): + converter_cls = ConverterHub.get_converter_cls( + source=self.source, + target=self.target, + converter_name=constant_type_converter_cls_name_map[constant.type], + ) + constant_data = converter_cls(constant).convert() + constant_data["index"] = index + result[constant_data.get("key")] = constant_data + return result + @staticmethod def _generate_flows_by_nodes(nodes: List[Node]) -> List[Flow]: """生成流程中的连线信息""" @@ -130,3 +159,34 @@ def remap_condition_keys_to_outgoing(gateways_data, flows): flow_id = flow_id_map.get((gateway_id, condition_node)) new_conditions[flow_id] = {"name": condition["name"], "evaluate": condition["evaluate"]} gateway["conditions"] = new_conditions + + @staticmethod + def validate_constant(nodes, constants): + validate_types = [ConstantTypes.COMPONENT_INPUTS_CONSTANT.value, ConstantTypes.COMPONENT_OUTPUTS_CONSTANT.value] + + for constant in constants: + # 只检查组件输入/输出类型的常量 + if constant.type not in validate_types: + continue + + constant_name = constant.name + + for info in constant.source_info: + node_id = info.key + # 检查源节点是否存在 + if node_id not in nodes: + raise ValueError(f"常量{constant_name}的源节点{node_id}不存在") + + if constant.type == ConstantTypes.COMPONENT_OUTPUTS_CONSTANT.value: + continue + + constant_node = nodes[node_id]["component"] + code, component_file = constant.source_tag.split(".") + + # 检查组件code是否匹配 + if code != constant_node["code"]: + raise ValueError(f"常量{constant_name}的source_tag字段信息与源节点{node_id}的组件信息不匹配") + + # 检查目标字段是否存在 + if component_file not in constant_node["data"]: + raise ValueError(f"常量{constant_name}的源节点{node_id}的字段{component_file}不存在") diff --git a/bkflow/pipeline_converter/converters/json_to_data_model/constant.py b/bkflow/pipeline_converter/converters/json_to_data_model/constant.py index 40a96afc6f..7b3c8355b6 100644 --- a/bkflow/pipeline_converter/converters/json_to_data_model/constant.py +++ b/bkflow/pipeline_converter/converters/json_to_data_model/constant.py @@ -1 +1,93 @@ # -*- coding: utf-8 -*- +from bkflow.pipeline_converter.constants import ConstantTypes +from bkflow.pipeline_converter.converters.base import JsonToDataModelConverter +from bkflow.pipeline_converter.data_models import ( + ComponentInputConstant, + ComponentOutConstant, + CustomConstant, + SourceInfo, +) +from bkflow.pipeline_converter.validators.constant import ( + JsonComponentInputValidator, + JsonConstantValidator, +) +from bkflow.pipeline_converter.validators.node import JsonNodeTypeValidator + + +class SourceInfoConverter(JsonToDataModelConverter): + def convert(self): + self.target_data = [] + for info in self.source_data: + info_data = SourceInfo(key=info["key"], value=info["value"]) + self.target_data.append(info_data) + return self.target_data + + +class CustomConstantConverter(JsonToDataModelConverter): + validators = [JsonNodeTypeValidator(ConstantTypes.CUSTOM_CONSTANT.value), JsonConstantValidator()] + + def convert(self): + self.target_data = CustomConstant( + name=self.source_data["name"], + type=ConstantTypes.CUSTOM_CONSTANT.value, + key=self.source_data["key"], + value=self.source_data["value"], + custom_type=self.source_data["custom_type"], + source_tag=self.source_data["source_tag"], + ) + default_optional_field = [ + "desc", + "source_info", + "validation", + "is_meta", + "version", + "show_type", + "pre_render_mako", + ] + for field in default_optional_field: + if field not in self.source_data: + continue + setattr(self.target_data, field, self.source_data[field]) + + return self.target_data + + +class ComponentInputConverter(JsonToDataModelConverter): + validators = [JsonNodeTypeValidator(ConstantTypes.COMPONENT_INPUTS_CONSTANT.value), JsonComponentInputValidator()] + + def convert(self): + self.target_data = ComponentInputConstant( + name=self.source_data["name"], + key=self.source_data["key"], + value=self.source_data["value"], + source_tag=self.source_data["source_tag"], + type=ConstantTypes.COMPONENT_INPUTS_CONSTANT.value, + source_info=SourceInfoConverter(self.source_data["source_info"]).convert(), + ) + default_optional_field = ["desc", "validation", "version", "show_type", "plugin_code"] + for field in default_optional_field: + if field not in self.source_data: + continue + setattr(self.target_data, field, self.source_data[field]) + + return self.target_data + + +class ComponentOutputConverter(JsonToDataModelConverter): + validators = [JsonNodeTypeValidator(ConstantTypes.COMPONENT_OUTPUTS_CONSTANT.value)] + + def convert(self): + self.target_data = ComponentOutConstant( + name=self.source_data["name"], + key=self.source_data["key"], + value=self.source_data["value"], + type=ConstantTypes.COMPONENT_OUTPUTS_CONSTANT.value, + source_info=SourceInfoConverter(self.source_data["source_info"]).convert(), + ) + default_optional_field = ["desc", "validation", "show_type", "plugin_code"] + for field in default_optional_field: + if field not in self.source_data: + continue + setattr(self.target_data, field, self.source_data[field]) + + return self.target_data diff --git a/bkflow/pipeline_converter/converters/json_to_data_model/pipeline.py b/bkflow/pipeline_converter/converters/json_to_data_model/pipeline.py index 529a63e0bf..e4a3a00e9d 100644 --- a/bkflow/pipeline_converter/converters/json_to_data_model/pipeline.py +++ b/bkflow/pipeline_converter/converters/json_to_data_model/pipeline.py @@ -1,6 +1,11 @@ # -*- coding: utf-8 -*- -from bkflow.pipeline_converter.constants import NodeTypes +from bkflow.pipeline_converter.constants import ConstantTypes, NodeTypes from bkflow.pipeline_converter.converters.base import JsonToDataModelConverter +from bkflow.pipeline_converter.converters.json_to_data_model.constant import ( + ComponentInputConverter, + ComponentOutputConverter, + CustomConstantConverter, +) from bkflow.pipeline_converter.converters.json_to_data_model.gateway import ( ConditionalParallelGatewayConverter, ConvergeGatewayConverter, @@ -41,4 +46,21 @@ def convert(self) -> dict: target_nodes.append(converter_cls(node).convert()) self.target_data.nodes = target_nodes + constant_type_converter_cls_name_map = { + ConstantTypes.CUSTOM_CONSTANT.value: CustomConstantConverter.__name__, + ConstantTypes.COMPONENT_INPUTS_CONSTANT.value: ComponentInputConverter.__name__, + ConstantTypes.COMPONENT_OUTPUTS_CONSTANT.value: ComponentOutputConverter.__name__, + } + + target_constants = [] + for constant in self.source_data.get("constants", []): + converter_cls = ConverterHub.get_converter_cls( + source=self.source, + target=self.target, + converter_name=constant_type_converter_cls_name_map[constant["type"]], + ) + converted_data = converter_cls(constant).convert() + target_constants.append(converted_data) + self.target_data.constants = target_constants + return self.target_data diff --git a/bkflow/pipeline_converter/data_models.py b/bkflow/pipeline_converter/data_models.py index 41e54d5663..4ad4437856 100644 --- a/bkflow/pipeline_converter/data_models.py +++ b/bkflow/pipeline_converter/data_models.py @@ -3,7 +3,7 @@ from pydantic import BaseModel -from bkflow.pipeline_converter.constants import NodeTypes +from bkflow.pipeline_converter.constants import ConstantTypes, NodeTypes class Node(BaseModel): @@ -100,14 +100,43 @@ class ConditionalParallelGateway(ExclusiveGateway, ParallelGateway): type: str = NodeTypes.CONDITIONAL_PARALLEL_GATEWAY.value +class SourceInfo(BaseModel): + key: str + value: str + + class Constant(BaseModel): name: str - type: str + type: str # source_type key: str value: Any + custom_type: str = "" + source_info: List[SourceInfo] + source_tag: str + show_type: str = "show" + validation: str = "" version: str = "legacy" - source_type: str + desc: str = "" + + +class CustomConstant(Constant): + custom_type: str + source_info: str = "" + type: str = ConstantTypes.CUSTOM_CONSTANT.value pre_render_mako: bool = False + is_meta: bool = False + + +class ComponentInputConstant(Constant): + type: str = ConstantTypes.COMPONENT_INPUTS_CONSTANT.value + plugin_code: str = "" + + +class ComponentOutConstant(Constant): + source_tag: str = "" + type: str = ConstantTypes.COMPONENT_OUTPUTS_CONSTANT.value + show_type: str = "hide" + plugin_code: str = "" class Extensions(BaseModel): diff --git a/bkflow/pipeline_converter/validators/constant.py b/bkflow/pipeline_converter/validators/constant.py new file mode 100644 index 0000000000..9a6d870b31 --- /dev/null +++ b/bkflow/pipeline_converter/validators/constant.py @@ -0,0 +1,52 @@ +# -*- coding: utf-8 -*- +from typing import Any, Dict + +from bkflow.pipeline_converter.constants import ConstantTypes +from bkflow.pipeline_converter.data_models import ComponentInputConstant, CustomConstant +from bkflow.pipeline_converter.validators.base import BaseValidator + + +class ConstantValidator(BaseValidator): + def validate(self, data: CustomConstant, *args, **kwargs): + if data.custom_type not in ConstantTypes.CUSTOM_CONSTANT_CUSTOMS.value: + raise ValueError("The parameter custom type does not meet the requirements") + if data.source_tag not in ConstantTypes.CUSTOM_CONSTANT_TAGS.value: + raise ValueError("The parameter source tag does not meet the requirements") + + +class ComponentInputValidator(BaseValidator): + def validate(self, data: ComponentInputConstant, *args, **kwargs): + constant_key = data.key + try: + code, tag_field = data.source_tag.split(".") + except ValueError: + raise ValueError("parameter source_tag does not conform to the required format") + + for info in data.source_info: + info_field = info.value + + if not (constant_key == info_field == tag_field): + raise ValueError("the information about the source node fields must be consistent in the parameters") + + +class JsonConstantValidator(BaseValidator): + def validate(self, data: Dict[str, Any], *args, **kwargs): + if data.get("custom_type") not in ConstantTypes.CUSTOM_CONSTANT_CUSTOMS.value: + raise ValueError("The parameter custom type does not meet the requirements") + if data.get("source_tag") not in ConstantTypes.CUSTOM_CONSTANT_TAGS.value: + raise ValueError("The parameter source tag does not meet the requirements") + + +class JsonComponentInputValidator(BaseValidator): + def validate(self, data: Dict[str, Any], *args, **kwargs): + constant_key = data.get("key") + try: + code, tag_field = data.get("source_tag").split(".") + except ValueError: + raise ValueError("parameter source_tag does not conform to the required format") + + for info in data.get("source_info"): + info_field = info.get("value") + + if not (constant_key == info_field == tag_field): + raise ValueError("the information about the source node fields must be consistent in the parameters") diff --git a/tests/pipeline_converter/converters/json_to_data_model/test_pipeline.py b/tests/pipeline_converter/converters/json_to_data_model/test_pipeline.py index 7177b5b0c5..0cd8337c80 100644 --- a/tests/pipeline_converter/converters/json_to_data_model/test_pipeline.py +++ b/tests/pipeline_converter/converters/json_to_data_model/test_pipeline.py @@ -2,7 +2,7 @@ from bamboo_engine import validator as engine_validator from pipeline.core.constants import PE -from bkflow.pipeline_converter.constants import DataTypes +from bkflow.pipeline_converter.constants import ConstantTypes, DataTypes from bkflow.pipeline_converter.data_models import Pipeline from bkflow.pipeline_converter.hub import CONVERTER_HUB from bkflow.pipeline_web.parser.format import format_web_data_to_pipeline @@ -213,3 +213,211 @@ def test_json_2_web_pipeline_convert_with_gateway_success(self): validate_web_pipeline_tree(web_pipeline_tree) engine_pipeline_tree = format_web_data_to_pipeline(web_pipeline_tree) engine_validator.validate_and_process_pipeline(engine_pipeline_tree, cycle_tolerate=False) + + def test_json_2_web_pipeline_convert_with_custom_constant(self): + json_data = { + "id": "pipeline_id", + "name": "pipeline_name", + "nodes": [ + { + "id": "start_node", + "type": "start_event", + "next": "custom_node", + }, + { + "id": "custom_node", + "type": "component", + "name": "component_node", + "component": { + "code": "bk_display", + "version": "v1.0", + "data": [{"key": "bk_display_message", "value": "${test_name}"}], + }, + "next": "end_node", + }, + { + "id": "end_node", + "type": "end_event", + }, + ], + "constants": [ + { + "name": "test_name", + "type": ConstantTypes.CUSTOM_CONSTANT.value, + "key": "test_name", + "value": "test_value", + "custom_type": "textarea", + "source_tag": "textarea.textarea", + } + ], + } + json_pipeline_cvt = CONVERTER_HUB.get_converter_cls( + DataTypes.JSON.value, DataTypes.DATA_MODEL.value, "PipelineConverter" + ) + dm_pipeline = json_pipeline_cvt(json_data).convert() + assert isinstance(dm_pipeline, Pipeline) + + data_model_pipeline_cvt = CONVERTER_HUB.get_converter_cls( + DataTypes.DATA_MODEL.value, DataTypes.WEB_PIPELINE.value, "PipelineConverter" + ) + web_pipeline_tree = data_model_pipeline_cvt(dm_pipeline).convert() + + assert len(web_pipeline_tree[PE.activities]) == 1 + assert len(web_pipeline_tree[PE.flows]) == 2 + + # 确保 web_pipeline_tree 格式正确 且能够正常转换成 engine_pipeline_tree + validate_web_pipeline_tree(web_pipeline_tree) + engine_pipeline_tree = format_web_data_to_pipeline(web_pipeline_tree) + engine_validator.validate_and_process_pipeline(engine_pipeline_tree, cycle_tolerate=False) + + def test_json_2_web_pipeline_convert_with_component_output_constant(self): + json_data = { + "id": "pipeline_id", + "name": "pipeline_name", + "nodes": [ + { + "id": "start_node", + "type": "start_event", + "next": "custom_node", + }, + { + "id": "custom_node", + "type": "component", + "name": "component_node", + "component": { + "code": "bk_display", + "version": "v1.0", + "data": [{"key": "bk_display_message", "value": "${test_name}"}], + }, + "next": "exclusive_gateway", + }, + { + "id": "exclusive_gateway", + "name": "exclusive_gateway", + "type": "exclusive_gateway", + "conditions": [ + {"name": "condition_1", "next": "condition_node_1", "expr": "${_result_1} == true"}, + {"name": "condition_2", "next": "condition_node_2", "expr": "1 == 2"}, + ], + "next": ["condition_node_1", "condition_node_2"], + }, + { + "id": "condition_node_1", + "name": "condition_node_1", + "type": "component", + "component": { + "code": "bk_display", + "version": "v1.0", + "data": [{"key": "bk_display_message", "value": 123}], + }, + "next": "end_node", + }, + { + "id": "condition_node_2", + "name": "condition_node_2", + "type": "component", + "component": { + "code": "bk_display", + "version": "v1.0", + "data": [{"key": "bk_display_message", "value": 123}], + }, + "next": "end_node", + }, + { + "id": "end_node", + "type": "end_event", + }, + ], + "constants": [ + { + "name": "test_name", + "type": ConstantTypes.CUSTOM_CONSTANT.value, + "key": "test_name", + "value": "test_value", + "custom_type": "textarea", + "source_tag": "textarea.textarea", + }, + { + "name": "执行结果", + "type": ConstantTypes.COMPONENT_OUTPUTS_CONSTANT.value, + "key": "_result_1", + "source_info": [{"key": "custom_node", "value": "_result"}], + "value": "", + }, + ], + } + json_pipeline_cvt = CONVERTER_HUB.get_converter_cls( + DataTypes.JSON.value, DataTypes.DATA_MODEL.value, "PipelineConverter" + ) + dm_pipeline = json_pipeline_cvt(json_data).convert() + assert isinstance(dm_pipeline, Pipeline) + + data_model_pipeline_cvt = CONVERTER_HUB.get_converter_cls( + DataTypes.DATA_MODEL.value, DataTypes.WEB_PIPELINE.value, "PipelineConverter" + ) + web_pipeline_tree = data_model_pipeline_cvt(dm_pipeline).convert() + + assert len(web_pipeline_tree[PE.activities]) == 3 + assert len(web_pipeline_tree[PE.flows]) == 6 + + # 确保 web_pipeline_tree 格式正确 且能够正常转换成 engine_pipeline_tree + validate_web_pipeline_tree(web_pipeline_tree) + engine_pipeline_tree = format_web_data_to_pipeline(web_pipeline_tree) + engine_validator.validate_and_process_pipeline(engine_pipeline_tree, cycle_tolerate=False) + + def test_json_2_web_pipeline_convert_with_component_input_constant(self): + json_data = { + "id": "pipeline_id", + "name": "pipeline_name", + "nodes": [ + { + "id": "start_node", + "type": "start_event", + "next": "custom_node", + }, + { + "id": "custom_node", + "type": "component", + "name": "component_node", + "component": { + "code": "bk_display", + "version": "v1.0", + "data": [{"key": "bk_display_message", "value": "${bk_display_message}"}], + }, + "next": "end_node", + }, + { + "id": "end_node", + "type": "end_event", + }, + ], + "constants": [ + { + "name": "bk_display_message", + "type": ConstantTypes.COMPONENT_INPUTS_CONSTANT.value, + "key": "bk_display_message", + "value": "test_value", + "source_tag": "bk_display.bk_display_message", + "version": "v1.0", + "source_info": [{"key": "custom_node", "value": "bk_display_message"}], + } + ], + } + json_pipeline_cvt = CONVERTER_HUB.get_converter_cls( + DataTypes.JSON.value, DataTypes.DATA_MODEL.value, "PipelineConverter" + ) + dm_pipeline = json_pipeline_cvt(json_data).convert() + assert isinstance(dm_pipeline, Pipeline) + + data_model_pipeline_cvt = CONVERTER_HUB.get_converter_cls( + DataTypes.DATA_MODEL.value, DataTypes.WEB_PIPELINE.value, "PipelineConverter" + ) + web_pipeline_tree = data_model_pipeline_cvt(dm_pipeline).convert() + + assert len(web_pipeline_tree[PE.activities]) == 1 + assert len(web_pipeline_tree[PE.flows]) == 2 + + # 确保 web_pipeline_tree 格式正确 且能够正常转换成 engine_pipeline_tree + validate_web_pipeline_tree(web_pipeline_tree) + engine_pipeline_tree = format_web_data_to_pipeline(web_pipeline_tree) + engine_validator.validate_and_process_pipeline(engine_pipeline_tree, cycle_tolerate=False)