Refactor: Refactor Scheduler to Support Dynamic Workflow Scheduling and Pipeline Pooling#301
Refactor: Refactor Scheduler to Support Dynamic Workflow Scheduling and Pipeline Pooling#301weijinglin wants to merge 9 commits intoapache:mainfrom
Conversation
There was a problem hiding this comment.
Pull Request Overview
This PR refactors the Scheduler to support dynamic workflow scheduling through pipeline pooling, enabling more flexible and extensible workflow management. The changes separate workflow logic into preparation, execution, and post-processing phases for better modularity.
Key changes:
- Introduced a Scheduler singleton with pipeline pooling for build_vector_index and graph_extract workflows
- Created new flow classes (BuildVectorIndexFlow, GraphExtractFlow) and node-based operators for pipeline execution
- Refactored utility functions to use the new scheduler.schedule_flow() interface instead of direct builder invocations
Reviewed Changes
Copilot reviewed 19 out of 19 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| hugegraph-llm/src/hugegraph_llm/flows/scheduler.py | Implements the core Scheduler and SchedulerSingleton classes for workflow management |
| hugegraph-llm/src/hugegraph_llm/flows/build_vector_index.py | Defines BuildVectorIndexFlow with prepare/build/post-processing logic |
| hugegraph-llm/src/hugegraph_llm/flows/graph_extract.py | Defines GraphExtractFlow with schema import and extraction logic |
| hugegraph-llm/src/hugegraph_llm/flows/common.py | Provides BaseFlow abstract class defining flow interface |
| hugegraph-llm/src/hugegraph_llm/state/ai_state.py | Introduces WkFlowInput and WkFlowState parameter classes for workflow state management |
| hugegraph-llm/src/hugegraph_llm/operators/util.py | Adds init_context helper for workflow context initialization |
| hugegraph-llm/src/hugegraph_llm/operators/document_op/chunk_split.py | Adds ChunkSplitNode implementing GNode-based chunk splitting |
| hugegraph-llm/src/hugegraph_llm/operators/index_op/build_vector_index.py | Adds BuildVectorIndexNode for vector index construction in pipeline |
| hugegraph-llm/src/hugegraph_llm/operators/llm_op/property_graph_extract.py | Adds PropertyGraphExtractNode for pipeline-based property graph extraction |
| hugegraph-llm/src/hugegraph_llm/operators/llm_op/info_extract.py | Adds InfoExtractNode for pipeline-based triple extraction |
| hugegraph-llm/src/hugegraph_llm/operators/hugegraph_op/schema_manager.py | Adds SchemaManagerNode for pipeline-based schema management |
| hugegraph-llm/src/hugegraph_llm/operators/common_op/check_schema.py | Adds CheckSchemaNode for pipeline-based schema validation |
| hugegraph-llm/src/hugegraph_llm/models/llms/init_llm.py | Adds standalone get_chat_llm, get_extract_llm, get_text2gql_llm functions |
| hugegraph-llm/src/hugegraph_llm/models/embeddings/init_embedding.py | Adds model_map and get_embedding function for embedding initialization |
| hugegraph-llm/src/hugegraph_llm/utils/vector_index_utils.py | Updates to use scheduler.schedule_flow() and model_map |
| hugegraph-llm/src/hugegraph_llm/utils/graph_index_utils.py | Updates extract_graph to use scheduler and renames original to extract_graph_origin |
| hugegraph-llm/pyproject.toml | Adds pycgraph dependency with git source specification |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
| language: str = None # language configuration used by ChunkSplit Node | ||
| split_type: str = None # split type used by ChunkSplit Node | ||
| example_prompt: str = None # need by graph information extract | ||
| schema: str = None # Schema information requeired by SchemaNode |
There was a problem hiding this comment.
Corrected spelling of 'requeired' to 'required'.
| schema: str = None # Schema information requeired by SchemaNode | |
| schema: str = None # Schema information required by SchemaNode |
| flow: BaseFlow = self.pipeline_pool[flow]["flow"] | ||
| pipeline = manager.fetch() | ||
| if pipeline is None: | ||
| # call coresponding flow_func to create new workflow |
There was a problem hiding this comment.
Corrected spelling of 'coresponding' to 'corresponding'.
| # call coresponding flow_func to create new workflow | |
| # call corresponding flow_func to create new workflow |
|
|
||
| [tool.uv.sources] | ||
| hugegraph-python-client = { workspace = true } | ||
| pycgraph = { git = "https://github.com/ChunelFeng/CGraph.git", subdirectory = "python", rev = "main", marker = "sys_platform == 'linux'" } |
There was a problem hiding this comment.
The pycgraph dependency is marked with marker = "sys_platform == 'linux'", meaning it will only install on Linux systems. However, the code imports and uses PyCGraph unconditionally without any platform checks or error handling.
Impact: The application will crash immediately on non-Linux systems (macOS, Windows) when trying to import modules from hugegraph_llm.flows.
Recommendation:
- Add platform compatibility checks and graceful degradation
- Provide clear error messages for unsupported platforms
- Consider making CGraph support optional with a feature flag
- Document platform requirements in README/docs
| def agentic_flow(self): | ||
| pass | ||
|
|
||
| def schedule_flow(self, flow: str, *args, **kwargs): |
There was a problem hiding this comment.
The schedule_flow method accesses and modifies the pipeline pool without proper synchronization. While SchedulerSingleton uses a lock for instance creation, the schedule_flow method itself doesn't protect concurrent access to the shared pipeline_pool dictionary.
Race condition scenarios:
- Multiple threads calling
schedule_flowfor the same flow type - Concurrent
fetch()andrelease()operations on the same manager - Pipeline state corruption during concurrent runs
Recommendation:
Add proper locking around pipeline operations:
def schedule_flow(self, flow: str, *args, **kwargs):
if flow not in self.pipeline_pool:
raise ValueError(f"Unsupported workflow {flow}")
with threading.Lock(): # Add lock here
manager = self.pipeline_pool[flow]["manager"]
# ... rest of the logic| def __init__(self): | ||
| pass | ||
|
|
||
| def prepare(self, prepared_input: WkFlowInput, texts): |
There was a problem hiding this comment.
The prepare method hardcodes language = "zh" (Chinese) and split_type = "paragraph". This contradicts the PR's goal of "flexible and extensible workflow scheduling."
Issues:
- Non-Chinese users cannot use this flow without code modification
- No way to customize split strategy for different document types
- Breaks the principle of configuration over convention
Recommendation:
Make these configurable parameters:
def prepare(self, prepared_input: WkFlowInput, texts, language="zh", split_type="paragraph"):
prepared_input.texts = texts
prepared_input.language = language
prepared_input.split_type = split_type| pipeline = manager.fetch() | ||
| if pipeline is None: | ||
| # call coresponding flow_func to create new workflow | ||
| pipeline = flow.build_flow(*args, **kwargs) |
There was a problem hiding this comment.
When pipeline.init() or pipeline.run() fails, the code raises an exception but never calls manager.add(pipeline) to return the pipeline to the pool. This creates a resource leak where failed pipelines are never recycled.
Impact: After max_pipeline failures, the system runs out of pipelines and cannot process new requests.
Recommendation:
Use try-finally to ensure pipelines are always returned:
pipeline = flow.build_flow(*args, **kwargs)
try:
status = pipeline.init()
if status.isErr():
raise RuntimeError(f"Error in flow init: {status.getInfo()}")
status = pipeline.run()
if status.isErr():
raise RuntimeError(f"Error in flow execution: {status.getInfo()}")
res = flow.post_deal(pipeline)
return res
finally:
manager.add(pipeline)| @@ -17,10 +17,40 @@ | |||
|
|
|||
|
|
|||
There was a problem hiding this comment.
The get_embedding function is defined twice in this file - once as a standalone function (lines 18-51) and once as a method in the Embeddings class (lines 60-78). This creates maintenance issues and confusion.
Recommendation:
- Remove the duplicate standalone function if it's not being used
- Or deprecate the
Embeddingsclass if migrating to the new pattern - Update all callers to use the consistent API
| edge_label["target_label"], | ||
| str, | ||
| "'target_label' in edge_label is not of correct type.", | ||
| ) |
There was a problem hiding this comment.
The _process_keys method silently filters out keys that don't exist in the properties list (new_keys = [key for key in keys if key in label['properties']]). This could hide configuration errors where users specify invalid keys.
Issues:
- Typos in key names go undetected
- No feedback when configured keys are ignored
- Could lead to unexpected schema behavior
Recommendation:
Add validation to warn or error on invalid keys:
def _process_keys(self, label: Dict[str, Any], key_type: str, default_keys: list) -> list:
keys = label.get(key_type, default_keys)
check_type(keys, list, f"'{key_type}' in {label['name']} is not of correct type.")
invalid_keys = [key for key in keys if key not in label['properties']]
if invalid_keys:
log.warning(f"Keys {invalid_keys} in {key_type} are not present in properties for {label['name']}")
new_keys = [key for key in keys if key in label['properties']]
return new_keys|
|
||
| class Scheduler: | ||
| pipeline_pool: Dict[str, Any] = None | ||
| max_pipeline: int |
There was a problem hiding this comment.
The max_pipeline parameter is stored in __init__ but never used anywhere in the class. The GPipelineManager is created without any size limits, so this parameter has no effect.
Recommendation:
- Either implement pipeline pool size limits using this parameter
- Or remove it if not needed yet
- Document the intended behavior for future implementation
| error_msg = f"Error in flow execution: {status.getInfo()}" | ||
| log.error(error_msg) | ||
| raise RuntimeError(error_msg) | ||
| res = flow.post_deal(pipeline) |
There was a problem hiding this comment.
The code uses both raise ValueError and raise RuntimeError inconsistently. Also, the post_deal method can fail (e.g., JSON serialization errors) but is not wrapped in error handling.
Recommendation:
- Define a clear exception hierarchy for the flows module
- Wrap
post_dealin try-catch to handle serialization errors - Document which exceptions callers should expect
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| # KIND, either express or implied. See the License for the | ||
| # specific language governing permissions and limitations | ||
| # under the License. |
There was a problem hiding this comment.
🧹 Minor: Empty __init__.py with only license header
This file contains only the Apache license header with no actual code. While functional, consider adding a brief module docstring to describe the flows package purpose.
Suggestion:
# ... (license header) ...
"""
Workflow orchestration module for HugeGraph AI.
This package provides flexible workflow scheduling and pipeline management
for various AI tasks including vector indexing and graph extraction.
"""| """ | ||
|
|
||
| @abstractmethod | ||
| def prepare(self, prepared_input: WkFlowInput, *args, **kwargs): |
There was a problem hiding this comment.
🧹 Minor: Missing type hints
The BaseFlow abstract class and its implementations lack type hints for parameters and return values. This reduces code maintainability and IDE support.
Suggestion:
from typing import Any
from PyCGraph import GPipeline
@abstractmethod
def prepare(self, prepared_input: WkFlowInput, *args: Any, **kwargs: Any) -> None:
"""Pre-processing interface."""
pass
@abstractmethod
def build_flow(self, *args: Any, **kwargs: Any) -> GPipeline:
"""Interface for building the flow."""
pass
@abstractmethod
def post_deal(self, pipeline: GPipeline | None = None) -> str:
"""Post-processing interface."""
pass
This PR refactors the Scheduler class to introduce a more flexible and extensible workflow scheduling mechanism. The main changes include:
These changes lay the foundation for supporting more complex and agentic workflows in the future, while also improving the efficiency and scalability of the current pipeline execution framework.