Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions hugegraph-llm/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ dependencies = [
"apscheduler",
"litellm",
"hugegraph-python-client",
"pycgraph",
]
[project.urls]
homepage = "https://hugegraph.apache.org/"
Expand Down Expand Up @@ -85,3 +86,4 @@ allow-direct-references = true

[tool.uv.sources]
hugegraph-python-client = { workspace = true }
pycgraph = { git = "https://github.com/ChunelFeng/CGraph.git", subdirectory = "python", rev = "main", marker = "sys_platform == 'linux'" }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

‼️ Critical: Platform-specific dependency with insufficient error handling

The pycgraph dependency is marked with marker = "sys_platform == 'linux'", meaning it will only install on Linux systems. However, the code imports and uses PyCGraph unconditionally without any platform checks or error handling.

Impact: The application will crash immediately on non-Linux systems (macOS, Windows) when trying to import modules from hugegraph_llm.flows.

Recommendation:

  1. Add platform compatibility checks and graceful degradation
  2. Provide clear error messages for unsupported platforms
  3. Consider making CGraph support optional with a feature flag
  4. Document platform requirements in README/docs

16 changes: 16 additions & 0 deletions hugegraph-llm/src/hugegraph_llm/flows/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Minor: Empty __init__.py with only license header

This file contains only the Apache license header with no actual code. While functional, consider adding a brief module docstring to describe the flows package purpose.

Suggestion:

# ... (license header) ...

"""
Workflow orchestration module for HugeGraph AI.

This package provides flexible workflow scheduling and pipeline management
for various AI tasks including vector indexing and graph extraction.
"""

55 changes: 55 additions & 0 deletions hugegraph-llm/src/hugegraph_llm/flows/build_vector_index.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from hugegraph_llm.flows.common import BaseFlow
from hugegraph_llm.state.ai_state import WkFlowInput

import json
from PyCGraph import GPipeline

from hugegraph_llm.operators.document_op.chunk_split import ChunkSplitNode
from hugegraph_llm.operators.index_op.build_vector_index import BuildVectorIndexNode
from hugegraph_llm.state.ai_state import WkFlowState


class BuildVectorIndexFlow(BaseFlow):
def __init__(self):
pass

def prepare(self, prepared_input: WkFlowInput, texts):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

‼️ Critical: Hardcoded configuration breaks flexibility

The prepare method hardcodes language = "zh" (Chinese) and split_type = "paragraph". This contradicts the PR's goal of "flexible and extensible workflow scheduling."

Issues:

  1. Non-Chinese users cannot use this flow without code modification
  2. No way to customize split strategy for different document types
  3. Breaks the principle of configuration over convention

Recommendation:
Make these configurable parameters:

def prepare(self, prepared_input: WkFlowInput, texts, language="zh", split_type="paragraph"):
    prepared_input.texts = texts
    prepared_input.language = language
    prepared_input.split_type = split_type

prepared_input.texts = texts
prepared_input.language = "zh"
prepared_input.split_type = "paragraph"
return

def build_flow(self, texts):
pipeline = GPipeline()
# prepare for workflow input
prepared_input = WkFlowInput()
self.prepare(prepared_input, texts)

pipeline.createGParam(prepared_input, "wkflow_input")
pipeline.createGParam(WkFlowState(), "wkflow_state")

chunk_split_node = ChunkSplitNode()
build_vector_node = BuildVectorIndexNode()
pipeline.registerGElement(chunk_split_node, set(), "chunk_split")
pipeline.registerGElement(build_vector_node, {chunk_split_node}, "build_vector")

return pipeline

def post_deal(self, pipeline=None):
res = pipeline.getGParamWithNoEmpty("wkflow_state").to_json()
return json.dumps(res, ensure_ascii=False, indent=2)
45 changes: 45 additions & 0 deletions hugegraph-llm/src/hugegraph_llm/flows/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from abc import ABC, abstractmethod

from hugegraph_llm.state.ai_state import WkFlowInput


class BaseFlow(ABC):
"""
Base class for flows, defines three interface methods: prepare, build_flow, and post_deal.
"""

@abstractmethod
def prepare(self, prepared_input: WkFlowInput, *args, **kwargs):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Minor: Missing type hints

The BaseFlow abstract class and its implementations lack type hints for parameters and return values. This reduces code maintainability and IDE support.

Suggestion:

from typing import Any
from PyCGraph import GPipeline

@abstractmethod
def prepare(self, prepared_input: WkFlowInput, *args: Any, **kwargs: Any) -> None:
    """Pre-processing interface."""
    pass

@abstractmethod
def build_flow(self, *args: Any, **kwargs: Any) -> GPipeline:
    """Interface for building the flow."""
    pass

@abstractmethod
def post_deal(self, pipeline: GPipeline | None = None) -> str:
    """Post-processing interface."""
    pass

"""
Pre-processing interface.
"""
pass

@abstractmethod
def build_flow(self, *args, **kwargs):
"""
Interface for building the flow.
"""
pass

@abstractmethod
def post_deal(self, *args, **kwargs):
"""
Post-processing interface.
"""
pass
127 changes: 127 additions & 0 deletions hugegraph-llm/src/hugegraph_llm/flows/graph_extract.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import json
from PyCGraph import GPipeline
from hugegraph_llm.flows.common import BaseFlow
from hugegraph_llm.state.ai_state import WkFlowInput, WkFlowState
from hugegraph_llm.operators.common_op.check_schema import CheckSchemaNode
from hugegraph_llm.operators.document_op.chunk_split import ChunkSplitNode
from hugegraph_llm.operators.hugegraph_op.schema_manager import SchemaManagerNode
from hugegraph_llm.operators.llm_op.info_extract import InfoExtractNode
from hugegraph_llm.operators.llm_op.property_graph_extract import (
PropertyGraphExtractNode,
)
from hugegraph_llm.utils.log import log


class GraphExtractFlow(BaseFlow):
def __init__(self):
pass

def _import_schema(
self,
from_hugegraph=None,
from_extraction=None,
from_user_defined=None,
):
if from_hugegraph:
return SchemaManagerNode()
elif from_user_defined:
return CheckSchemaNode()
elif from_extraction:
raise NotImplementedError("Not implemented yet")
else:
raise ValueError("No input data / invalid schema type")

def prepare(
self, prepared_input: WkFlowInput, schema, texts, example_prompt, extract_type
):
# prepare input data
prepared_input.texts = texts
prepared_input.language = "zh"
prepared_input.split_type = "document"
prepared_input.example_prompt = example_prompt
prepared_input.schema = schema
schema = schema.strip()
if schema.startswith("{"):
try:
schema = json.loads(schema)
prepared_input.schema = schema
except json.JSONDecodeError as exc:
log.error("Invalid JSON format in schema. Please check it again.")
raise ValueError("Invalid JSON format in schema.") from exc
else:
log.info("Get schema '%s' from graphdb.", schema)
prepared_input.graph_name = schema
return

def build_flow(self, schema, texts, example_prompt, extract_type):
pipeline = GPipeline()
prepared_input = WkFlowInput()
# prepare input data
self.prepare(prepared_input, schema, texts, example_prompt, extract_type)

pipeline.createGParam(prepared_input, "wkflow_input")
pipeline.createGParam(WkFlowState(), "wkflow_state")
schema = schema.strip()
schema_node = None
if schema.startswith("{"):
try:
schema = json.loads(schema)
schema_node = self._import_schema(from_user_defined=schema)
except json.JSONDecodeError as exc:
log.error("Invalid JSON format in schema. Please check it again.")
raise ValueError("Invalid JSON format in schema.") from exc
else:
log.info("Get schema '%s' from graphdb.", schema)
schema_node = self._import_schema(from_hugegraph=schema)

chunk_split_node = ChunkSplitNode()
graph_extract_node = None
if extract_type == "triples":
graph_extract_node = InfoExtractNode()
elif extract_type == "property_graph":
graph_extract_node = PropertyGraphExtractNode()
else:
raise ValueError(f"Unsupported extract_type: {extract_type}")
pipeline.registerGElement(schema_node, set(), "schema_node")
pipeline.registerGElement(chunk_split_node, set(), "chunk_split")
pipeline.registerGElement(
graph_extract_node, {schema_node, chunk_split_node}, "graph_extract"
)

return pipeline

def post_deal(self, pipeline=None):
res = pipeline.getGParamWithNoEmpty("wkflow_state").to_json()
vertices = res.get("vertices", [])
edges = res.get("edges", [])
if not vertices and not edges:
log.info("Please check the schema.(The schema may not match the Doc)")
return json.dumps(
{
"vertices": vertices,
"edges": edges,
"warning": "The schema may not match the Doc",
},
ensure_ascii=False,
indent=2,
)
return json.dumps(
{"vertices": vertices, "edges": edges},
ensure_ascii=False,
indent=2,
)
90 changes: 90 additions & 0 deletions hugegraph-llm/src/hugegraph_llm/flows/scheduler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import threading
from typing import Dict, Any
from PyCGraph import GPipelineManager
from hugegraph_llm.flows.build_vector_index import BuildVectorIndexFlow
from hugegraph_llm.flows.common import BaseFlow
from hugegraph_llm.flows.graph_extract import GraphExtractFlow
from hugegraph_llm.utils.log import log


class Scheduler:
pipeline_pool: Dict[str, Any] = None
max_pipeline: int
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Important: Unused parameter

The max_pipeline parameter is stored in __init__ but never used anywhere in the class. The GPipelineManager is created without any size limits, so this parameter has no effect.

Recommendation:

  1. Either implement pipeline pool size limits using this parameter
  2. Or remove it if not needed yet
  3. Document the intended behavior for future implementation


def __init__(self, max_pipeline: int = 10):
self.pipeline_pool = {}
# pipeline_pool act as a manager of GPipelineManager which used for pipeline management
self.pipeline_pool["build_vector_index"] = {
"manager": GPipelineManager(),
"flow": BuildVectorIndexFlow(),
}
self.pipeline_pool["graph_extract"] = {
"manager": GPipelineManager(),
"flow": GraphExtractFlow(),
}
self.max_pipeline = max_pipeline

# TODO: Implement Agentic Workflow
def agentic_flow(self):
pass

def schedule_flow(self, flow: str, *args, **kwargs):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

‼️ Critical: Missing thread safety in pipeline access

The schedule_flow method accesses and modifies the pipeline pool without proper synchronization. While SchedulerSingleton uses a lock for instance creation, the schedule_flow method itself doesn't protect concurrent access to the shared pipeline_pool dictionary.

Race condition scenarios:

  1. Multiple threads calling schedule_flow for the same flow type
  2. Concurrent fetch() and release() operations on the same manager
  3. Pipeline state corruption during concurrent runs

Recommendation:
Add proper locking around pipeline operations:

def schedule_flow(self, flow: str, *args, **kwargs):
    if flow not in self.pipeline_pool:
        raise ValueError(f"Unsupported workflow {flow}")
    
    with threading.Lock():  # Add lock here
        manager = self.pipeline_pool[flow]["manager"]
        # ... rest of the logic

if flow not in self.pipeline_pool:
raise ValueError(f"Unsupported workflow {flow}")
manager = self.pipeline_pool[flow]["manager"]
flow: BaseFlow = self.pipeline_pool[flow]["flow"]
pipeline = manager.fetch()
if pipeline is None:
# call coresponding flow_func to create new workflow
Copy link

Copilot AI Oct 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Corrected spelling of 'coresponding' to 'corresponding'.

Suggested change
# call coresponding flow_func to create new workflow
# call corresponding flow_func to create new workflow

Copilot uses AI. Check for mistakes.
pipeline = flow.build_flow(*args, **kwargs)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

‼️ Critical: Incomplete error recovery logic

When pipeline.init() or pipeline.run() fails, the code raises an exception but never calls manager.add(pipeline) to return the pipeline to the pool. This creates a resource leak where failed pipelines are never recycled.

Impact: After max_pipeline failures, the system runs out of pipelines and cannot process new requests.

Recommendation:
Use try-finally to ensure pipelines are always returned:

pipeline = flow.build_flow(*args, **kwargs)
try:
    status = pipeline.init()
    if status.isErr():
        raise RuntimeError(f"Error in flow init: {status.getInfo()}")
    status = pipeline.run()
    if status.isErr():
        raise RuntimeError(f"Error in flow execution: {status.getInfo()}")
    res = flow.post_deal(pipeline)
    return res
finally:
    manager.add(pipeline)

status = pipeline.init()
if status.isErr():
error_msg = f"Error in flow init: {status.getInfo()}"
log.error(error_msg)
raise RuntimeError(error_msg)
status = pipeline.run()
if status.isErr():
error_msg = f"Error in flow execution: {status.getInfo()}"
log.error(error_msg)
raise RuntimeError(error_msg)
res = flow.post_deal(pipeline)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Important: Inconsistent error handling

The code uses both raise ValueError and raise RuntimeError inconsistently. Also, the post_deal method can fail (e.g., JSON serialization errors) but is not wrapped in error handling.

Recommendation:

  1. Define a clear exception hierarchy for the flows module
  2. Wrap post_deal in try-catch to handle serialization errors
  3. Document which exceptions callers should expect

manager.add(pipeline)
return res
else:
# fetch pipeline & prepare input for flow
prepared_input = pipeline.getGParamWithNoEmpty("wkflow_input")
flow.prepare(prepared_input, *args, **kwargs)
status = pipeline.run()
if status.isErr():
raise RuntimeError(f"Error in flow execution {status.getInfo()}")
res = flow.post_deal(pipeline)
manager.release(pipeline)
return res


class SchedulerSingleton:
_instance = None
_instance_lock = threading.Lock()

@classmethod
def get_instance(cls):
if cls._instance is None:
with cls._instance_lock:
if cls._instance is None:
cls._instance = Scheduler()
return cls._instance
Loading
Loading