[For merge][part 0] Support Gedit Evaulate#160
Conversation
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request lays the groundwork for advanced distributed training and evaluation capabilities for the Qwen-Image model, with a strong emphasis on NPU compatibility. It refactors core components like attention and parallel group management to support efficient, scalable processing of multimodal data across multiple devices. The changes enable distributed classifier-free guidance and introduce performance optimizations specific to NPU hardware, ensuring the model can leverage specialized accelerators effectively. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces support for Gedit Evaluate, adding new Python files for distributed training and model components tailored for Ascend NPUs, including custom attention layers, distributed communication utilities, process group management, a new pipeline, and a custom transformer model. However, a security audit identified two significant vulnerabilities: an insecure deserialization flaw using the pickle module in the distributed communication logic, which could lead to remote code execution, and a prompt injection vulnerability in the image editing pipeline, allowing manipulation of the LLM's internal state. Furthermore, the code contains critical issues such as uninitialized attributes that will cause runtime errors, and requires improvements in cleanliness and maintainability, including removing debug prints, personal notes, and translating Chinese comments to English.
| # Bypass the function if we are using only 1 GPU. | ||
| if self.world_size == 1: | ||
| return obj | ||
| if self.shm_broadcaster is not None: |
| else: | ||
| if if_cond: | ||
| hidden_states, encoder_hidden_states = self.cache_cond.apply( | ||
| block, | ||
| hidden_states=hidden_states, | ||
| encoder_hidden_states=encoder_hidden_states, | ||
| encoder_hidden_states_mask=encoder_hidden_states_mask, | ||
| temb=temb, | ||
| image_rotary_emb=image_rotary_emb, | ||
| joint_attention_kwargs=attention_kwargs, | ||
| ) | ||
| else: | ||
| hidden_states, encoder_hidden_states = self.cache_uncond.apply( | ||
| block, | ||
| hidden_states=hidden_states, | ||
| encoder_hidden_states=encoder_hidden_states, | ||
| encoder_hidden_states_mask=encoder_hidden_states_mask, | ||
| temb=temb, | ||
| image_rotary_emb=image_rotary_emb, | ||
| joint_attention_kwargs=attention_kwargs, | ||
| ) | ||
| #----------------------------------- |
There was a problem hiding this comment.
The attributes self.cache_cond and self.cache_uncond are used here when use_cache=True, but they are initialized to None on lines 655-656. This will raise an AttributeError at runtime if caching is enabled. These attributes should be initialized with proper cache objects, or a check should be added to ensure they are not None before being used.
| tensor, | ||
| self.ranks[dst], | ||
| group=( | ||
| self.device_groups[self.rank_in_group % 2] |
| object_tensor, src=self.ranks[src], group=self.cpu_group | ||
| ) | ||
|
|
||
| obj = pickle.loads(object_tensor.numpy().tobytes()) |
There was a problem hiding this comment.
The recv_object method uses pickle.loads() to deserialize data received from other ranks in a distributed environment via torch.distributed.recv(). The pickle module is inherently insecure as it can execute arbitrary code during deserialization. An attacker who can control the data sent between nodes in the distributed cluster (e.g., via a compromised node or an insecure network) could achieve arbitrary code execution (RCE) on the receiving node.
Remediation: Avoid using pickle for deserializing data from the network. Use safer alternatives like json for metadata or safetensors for tensor data. If pickle must be used, ensure the network environment is strictly isolated and consider using a cryptographic signature to verify the integrity and authenticity of the serialized data before processing it.
| deterministic=False, | ||
| return_attn_probs=False, | ||
| joint_strategy="none", | ||
| txt_pad_len = 0 |
| q_descale=None, | ||
| k_descale=None, | ||
| v_descale=None, |
| # def init_distributed_environment( | ||
| # world_size: int = -1, | ||
| # rank: int = -1, | ||
| # distributed_init_method: str = "env://", | ||
| # local_rank: int = -1, | ||
| # backend: str = "hccl", | ||
| # ): | ||
| # logging.debug( | ||
| # "world_size=%d rank=%d local_rank=%d " "distributed_init_method=%s backend=%s", | ||
| # world_size, | ||
| # rank, | ||
| # local_rank, | ||
| # distributed_init_method, | ||
| # backend, | ||
| # ) | ||
| # if not torch.distributed.is_initialized(): | ||
| # assert distributed_init_method is not None, ( | ||
| # "distributed_init_method must be provided when initializing " | ||
| # "distributed environment" | ||
| # ) | ||
| # # this backend is used for WORLD | ||
| # torch.distributed.init_process_group( | ||
| # backend=backend, | ||
| # init_method=distributed_init_method, | ||
| # world_size=world_size, | ||
| # rank=rank, | ||
| # ) | ||
| # set_device(torch.distributed.get_rank() % device_count()) | ||
| # # set the local rank | ||
| # # local_rank is not available in torch ProcessGroup, | ||
| # # see https://github.com/pytorch/pytorch/issues/122816 | ||
| # if local_rank == -1: | ||
| # # local rank not set, this usually happens in single-node | ||
| # # setting, where we can use rank as local rank | ||
| # if distributed_init_method == "env://": | ||
| # # local_rank = int(os.getenv('LOCAL_RANK', 0)) | ||
| # local_rank = dist.get_rank() | ||
| # print(f"init_distributed_environment 里面 local_rank {local_rank}") | ||
| # else: | ||
| # local_rank = rank | ||
| # global _WORLD | ||
| # if _WORLD is None: | ||
| # ranks = list(range(torch.distributed.get_world_size())) | ||
| # _WORLD = init_world_group(ranks, local_rank, backend) | ||
| # print(f"_WORLD 初始化") | ||
| # else: | ||
| # assert ( | ||
| # _WORLD.world_size == torch.distributed.get_world_size() | ||
| # ), "world group already initialized with a different world size" | ||
| # print(f"_WORLD 没有 初始化") |
| if get_classifier_free_guidance_world_size() == 2: | ||
| if get_classifier_free_guidance_rank() == 0: | ||
| with self.transformer.cache_context("uncond"): | ||
| noise_pred = self.transformer( | ||
| hidden_states=latent_model_input, | ||
| timestep=timestep / 1000, | ||
| guidance=guidance, | ||
| encoder_hidden_states_mask=negative_prompt_embeds_mask, | ||
| encoder_hidden_states=negative_prompt_embeds, | ||
| img_shapes=img_shapes, | ||
| txt_seq_lens=negative_txt_seq_lens, | ||
| attention_kwargs=self.attention_kwargs, | ||
| return_dict=False, | ||
| use_cache=UNCOND_CACHE, #-------------ljf------------- | ||
| if_cond=False, #----------------ljf------------- | ||
| )[0] | ||
| noise_pred = noise_pred[:, : latents.size(1)] | ||
|
|
||
| else: | ||
| with self.transformer.cache_context("cond"): | ||
| noise_pred = self.transformer( | ||
| hidden_states=latent_model_input, | ||
| timestep=timestep / 1000, | ||
| guidance=guidance, | ||
| encoder_hidden_states_mask=prompt_embeds_mask, | ||
| encoder_hidden_states=prompt_embeds, | ||
| img_shapes=img_shapes, | ||
| txt_seq_lens=txt_seq_lens, | ||
| attention_kwargs=self.attention_kwargs, | ||
| return_dict=False, | ||
| use_cache=COND_CACHE, #-------ljf-------- | ||
| if_cond=True, #------------ljf----------- | ||
| )[0] | ||
| noise_pred = noise_pred[:, : latents.size(1)] | ||
|
|
||
|
|
||
| noise_pred_uncond, noise_pred_text = get_cfg_group().all_gather(noise_pred, separate_tensors=True) | ||
|
|
||
| comb_pred = noise_pred_uncond + true_cfg_scale * (noise_pred_text - noise_pred_uncond) | ||
|
|
||
| cond_norm = torch.norm(noise_pred_text, dim=-1, keepdim=True) # 修正代码 | ||
| noise_norm = torch.norm(comb_pred, dim=-1, keepdim=True) | ||
| noise_pred = comb_pred * (cond_norm / noise_norm) | ||
| else: | ||
| #------------ljf 原始代码--------------- | ||
| with self.transformer.cache_context("cond"): | ||
| noise_pred = self.transformer( | ||
| hidden_states=latent_model_input, | ||
| timestep=timestep / 1000, | ||
| guidance=guidance, | ||
| encoder_hidden_states_mask=prompt_embeds_mask, | ||
| encoder_hidden_states=prompt_embeds, | ||
| img_shapes=img_shapes, | ||
| txt_seq_lens=txt_seq_lens, | ||
| attention_kwargs=self.attention_kwargs, | ||
| return_dict=False, | ||
| use_cache=COND_CACHE, #-------ljf-------- | ||
| if_cond=True, #------------ljf----------- | ||
| )[0] | ||
| noise_pred = noise_pred[:, : latents.size(1)] | ||
|
|
||
| if do_true_cfg: | ||
| with self.transformer.cache_context("uncond"): | ||
| neg_noise_pred = self.transformer( | ||
| hidden_states=latent_model_input, | ||
| timestep=timestep / 1000, | ||
| guidance=guidance, | ||
| encoder_hidden_states_mask=negative_prompt_embeds_mask, | ||
| encoder_hidden_states=negative_prompt_embeds, | ||
| img_shapes=img_shapes, | ||
| txt_seq_lens=negative_txt_seq_lens, | ||
| attention_kwargs=self.attention_kwargs, | ||
| return_dict=False, | ||
| use_cache=UNCOND_CACHE, #-------------ljf------------- | ||
| if_cond=False, #----------------ljf------------- | ||
| )[0] | ||
| neg_noise_pred = neg_noise_pred[:, : latents.size(1)] | ||
| comb_pred = neg_noise_pred + true_cfg_scale * (noise_pred - neg_noise_pred) | ||
|
|
||
| cond_norm = torch.norm(noise_pred, dim=-1, keepdim=True) | ||
| noise_norm = torch.norm(comb_pred, dim=-1, keepdim=True) | ||
| noise_pred = comb_pred * (cond_norm / noise_norm) | ||
| #---------------------------------------------------- |
There was a problem hiding this comment.
The logic for applying classifier-free guidance is duplicated for the distributed (get_classifier_free_guidance_world_size() == 2) and non-distributed cases. The core logic for combining conditional and unconditional predictions is identical. To improve readability and maintainability, consider refactoring this shared logic into a helper method.
| print("ljf 进入采样器,涉及随机") | ||
| x0 = sample - current_sigma * model_output | ||
| noise = torch.randn_like(sample) | ||
| prev_sample = (1.0 - next_sigma) * x0 + next_sigma * noise | ||
| else: | ||
| print("ljf 进入采样器,无随机") |
There was a problem hiding this comment.
This block contains debug print statements with personal notes. These should be removed from production code to keep the output clean and the code professional.
| print("ljf 进入采样器,涉及随机") | |
| x0 = sample - current_sigma * model_output | |
| noise = torch.randn_like(sample) | |
| prev_sample = (1.0 - next_sigma) * x0 + next_sigma * noise | |
| else: | |
| print("ljf 进入采样器,无随机") | |
| if self.config.stochastic_sampling: | |
| x0 = sample - current_sigma * model_output | |
| noise = torch.randn_like(sample) | |
| prev_sample = (1.0 - next_sigma) * x0 + next_sigma * noise | |
| else: | |
| prev_sample = sample + dt * model_output |
| template = self.prompt_template_encode | ||
|
|
||
| drop_idx = self.prompt_template_encode_start_idx | ||
| txt = [template.format(base_img_prompt + e) for e in prompt] |
There was a problem hiding this comment.
The pipeline constructs the final LLM prompt by directly inserting user-provided text into a template using Python's .format() method. The template uses special control tokens (e.g., <|im_start|>, <|im_end|>) to delineate system and user roles. Because the user input is not sanitized, an attacker can include these control tokens in their prompt to terminate the user block and inject new system-level instructions, potentially bypassing safety filters or manipulating the model's behavior (Prompt Injection).
Remediation: Sanitize user input by stripping or escaping the model's special control tokens (e.g., <|im_start|>, <|im_end|>, <|image_pad|>) before inserting it into the prompt template.
Thanks for your contribution; we appreciate it a lot. The following instructions will make your pull request healthier and help you get feedback more easily. If you do not understand some items, don't worry, just make the pull request and seek help from maintainers.
感谢您的贡献,我们非常重视。以下说明将使您的拉取请求更健康,更易于获得反馈。如果您不理解某些项目,请不要担心,只需提交拉取请求并从维护人员那里寻求帮助即可。
PR Type / PR类型
Related Issue | 关联 Issue
Fixes #(issue ID / issue 编号) / Relates to #(issue ID / issue 编号)
🔍 Motivation / 变更动机
Please describe the motivation of this PR and the goal you want to achieve through this PR.
请描述您的拉取请求的动机和您希望通过此拉取请求实现的目标。
📝 Modification / 修改内容
Please briefly describe what modification is made in this PR.
请简要描述此拉取请求中进行的修改。
📐 Associated Test Results / 关联测试结果
Please provide links to the related test results, such as CI pipelines, test reports, etc.
请提供相关测试结果的链接,例如 CI 管道、测试报告等。
Does the modification introduce changes that break the backward compatibility of the downstream repositories? If so, please describe how it breaks the compatibility and how the downstream projects should modify their code to keep compatibility with this PR.
是否引入了会破坏下游存储库向后兼容性的更改?如果是,请描述它如何破坏兼容性,以及下游项目应该如何修改其代码以保持与此 PR 的兼容性。
If the modification introduces performance degradation, please describe the impact of the performance degradation and the expected performance improvement.
如果引入了性能下降,请描述性能下降的影响和预期的性能改进。
🌟 Use cases (Optional) / 使用案例(可选)
If this PR introduces a new feature, it is better to list some use cases here and update the documentation.
如果此拉取请求引入了新功能,最好在此处列出一些用例并更新文档。
✅ Checklist / 检查列表
Before PR:
After PR:
👥 Collaboration Info / 协作信息
🌟 Useful CI Command / 实用的CI命令
/gemini review/gemini summary/gemini help/readthedocs build