diff --git a/chatlearn/algorithm/grpo_utils/megatron_utils/train_helper.py b/chatlearn/algorithm/grpo_utils/megatron_utils/train_helper.py index f230b7ed..3f7e7b2e 100644 --- a/chatlearn/algorithm/grpo_utils/megatron_utils/train_helper.py +++ b/chatlearn/algorithm/grpo_utils/megatron_utils/train_helper.py @@ -393,7 +393,6 @@ def get_batch( { "pixel_values": data_b['pixel_values'], # [token_length, token_num] "image_grid_thw": data_b['image_grid_thw'], # [batch_size, 3] - "rope_deltas": data_b['rope_deltas'], # [batch_size, 1] "image_input_mask": tokens==get_tokenizer().vocab['<|image_pad|>'] # [batch_size, token_length] } ) diff --git a/chatlearn/algorithm/grpo_utils/policy_trainer.py b/chatlearn/algorithm/grpo_utils/policy_trainer.py index 8a69b45f..b5d17fd4 100644 --- a/chatlearn/algorithm/grpo_utils/policy_trainer.py +++ b/chatlearn/algorithm/grpo_utils/policy_trainer.py @@ -103,7 +103,6 @@ def preprocess_data_list(self, data_list: List[Dict[str, Any]], training: bool): # for vl position_ids = data_b.get("position_ids", None) - rope_deltas = data_b.get("rope_deltas", None) pixel_values = data_b.get("pixel_values", None) image_grid_thw = data_b.get("image_grid_thw", None) @@ -151,7 +150,6 @@ def preprocess_data_list(self, data_list: List[Dict[str, Any]], training: bool): { "pixel_values": pixel_values, # [token_length, token_num] "image_grid_thw": image_grid_thw, # [batch_size, 3] - "rope_deltas": rope_deltas # [batch_size, 1] } ) @@ -196,8 +194,7 @@ def train_step(self, data_list: List[Dict[str, Any]], **kwargs): # pylint: disab image_grid_thw=inputs['image_grid_thw'], attention_mask=None, position_ids=inputs['position_ids'], - use_cache=False, - rope_deltas=inputs['rope_deltas'] + use_cache=False ) else: output = self.model( @@ -315,8 +312,7 @@ def forward_step(self, data: List[Dict[str, Any]], **kwargs) -> List[Dict[str, A image_grid_thw=inputs['image_grid_thw'], attention_mask=None, position_ids=inputs['position_ids'], - use_cache=False, - rope_deltas=inputs['rope_deltas'] + use_cache=False ) else: output = self.model( diff --git a/chatlearn/algorithm/grpo_utils/trainer_utils.py b/chatlearn/algorithm/grpo_utils/trainer_utils.py index 8923f307..08324f7a 100644 --- a/chatlearn/algorithm/grpo_utils/trainer_utils.py +++ b/chatlearn/algorithm/grpo_utils/trainer_utils.py @@ -139,7 +139,7 @@ def batching(data_list: List[Dict[str, Any]]) -> Dict[str, Any]: batched_data = defaultdict(list) for key in data_list[0]: batched_data[key] = [data[key] for data in data_list] - if key in ['pixel_values', 'image_grid_thw', 'rope_deltas']: + if key in ['pixel_values', 'image_grid_thw']: batched_data[key] = torch.cat(batched_data[key], dim=0) elif isinstance(batched_data[key][0], torch.Tensor): batched_data[key] = padding_tensor(batched_data[key]) diff --git a/chatlearn/data/data_preprocess/geo3k.py b/chatlearn/data/data_preprocess/geo3k.py index dd92fe69..5e03404c 100644 --- a/chatlearn/data/data_preprocess/geo3k.py +++ b/chatlearn/data/data_preprocess/geo3k.py @@ -3,9 +3,34 @@ """ import argparse +import base64 import os +from typing import List, Dict +from io import BytesIO import datasets +from PIL import Image + +def image_to_base64(img: Image.Image) -> str: + + img = img.convert("RGB") + buffered = BytesIO() + img.save(buffered, format="JPEG", quality=100) + img_str = base64.b64encode(buffered.getvalue()).decode('utf-8') + return img_str + +def prepare_image_content(img_list: List[Image.Image]) -> List[Dict]: + img_content = [] + + for img in img_list: + img_content.append( + { + "type": "image_url", + "image_url": f"data:image;base64,{image_to_base64(img)}" + } + ) + assert img_content[-1]["image_url"] is not None + return img_content if __name__ == "__main__": parser = argparse.ArgumentParser() @@ -32,18 +57,24 @@ def make_map_fn(split): def process_fn(example, idx): problem = example.pop("problem") prompt = problem + " " + instruction_following + prompt = prompt.replace("", "") answer = example.pop("answer") images = example.pop("images") + # format openai style messages + messages = [ + { + "role": "user", + "content": [ + {"type": "text", "text": prompt} + ] + } + ] + messages[0]["content"] = prepare_image_content(images)+ \ + messages[0]["content"] data = { "data_source": data_source, - "prompt": [ - { - "role": "user", - "content": prompt, - } - ], - "images": images, + "messages": messages, "ability": "math", "reward_model": {"style": "rule", "ground_truth": answer}, "extra_info": { @@ -63,5 +94,6 @@ def process_fn(example, idx): local_dir = args.local_dir hdfs_dir = args.hdfs_dir + # to_parquet may produce key: None in dict train_dataset.to_parquet(os.path.join(local_dir, "train.parquet")) test_dataset.to_parquet(os.path.join(local_dir, "test.parquet")) diff --git a/chatlearn/data/data_preprocess/geo3k_agent.py b/chatlearn/data/data_preprocess/geo3k_agent.py new file mode 100644 index 00000000..9adc0cbf --- /dev/null +++ b/chatlearn/data/data_preprocess/geo3k_agent.py @@ -0,0 +1,109 @@ +""" +Preprocess the Geometry3k dataset to parquet format +""" + +import argparse +import base64 +import os +from typing import List, Dict +from io import BytesIO + +import datasets +from PIL import Image + +def image_to_base64(img: Image.Image) -> str: + + img = img.convert("RGB") + buffered = BytesIO() + img.save(buffered, format="JPEG", quality=100) + img_str = base64.b64encode(buffered.getvalue()).decode('utf-8') + return img_str + +def prepare_image_content(img_list: List[Image.Image]) -> List[Dict]: + img_content = [] + + for img in img_list: + img_content.append( + { + "type": "image_url", + "image_url": f"data:image;base64,{image_to_base64(img)}" + } + ) + assert img_content[-1]["image_url"] is not None + return img_content + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("--local_dir", default="dataset/geo3k") + parser.add_argument("--hdfs_dir", default=None) + + args = parser.parse_args() + + data_source = "hiyouga/geometry3k" + + dataset = datasets.load_dataset(data_source) + + train_dataset = dataset["train"] + test_dataset = dataset["test"] + + instruction_following = ( + r"You FIRST think about the reasoning process as an internal monologue and then provide the final answer. " + r"The reasoning process MUST BE enclosed within tags. " + r"You must use the `calc_geo3k_reward` tool after step by step solving the question" + r"The final answer MUST BE put in \boxed{}." + ) + + system_prompt = ( + "You are a math expert. You are given a question and you need to solve it step by step. " + "Reasoning step by step before any tool call. " + "You should use the `calc_geo3k_reward` tool after step by step solving the question, " + "before generate final answer at least once and refine your answer if necessary. " + "Put your final answer within \\boxed{}." + ) + # add a row to each data item that represents a unique id + def make_map_fn(split): + def process_fn(example, idx): + problem = example.pop("problem") + prompt = problem + " " + instruction_following + prompt = prompt.replace("", "") + answer = example.pop("answer") + images = example.pop("images") + # format openai style messages + messages = [ + {"role": "system", "content": [{"type": "text", "text": system_prompt}]}, + { + "role": "user", + "content": [ + {"type": "text", "text": prompt} + ] + } + ] + messages[-1]["content"] = prepare_image_content(images)+ \ + messages[-1]["content"] + data = { + "agent_name": "geo3k_agent", + "agent_cfg_path": "template/agent/geo3k_eval.yaml", + "data_source": data_source, + "messages": messages, + "ability": "math", + "reward_model": {"style": "rule", "ground_truth": answer}, + "extra_info": { + "split": split, + "index": idx, + "answer": answer, + "question": problem, + }, + } + return data + + return process_fn + + train_dataset = train_dataset.map(function=make_map_fn("train"), with_indices=True, num_proc=8) + test_dataset = test_dataset.map(function=make_map_fn("test"), with_indices=True, num_proc=8) + + local_dir = args.local_dir + hdfs_dir = args.hdfs_dir + + # to_parquet may produce key: None in dict + train_dataset.to_parquet(os.path.join(local_dir, "train_agent.parquet")) + test_dataset.to_parquet(os.path.join(local_dir, "test_agent.parquet")) diff --git a/chatlearn/data/vl_prompt_dataset.py b/chatlearn/data/vl_prompt_dataset.py index df73b933..0d4e69b9 100644 --- a/chatlearn/data/vl_prompt_dataset.py +++ b/chatlearn/data/vl_prompt_dataset.py @@ -1,10 +1,11 @@ """prompt dataset""" from typing import List, Dict -import re + from torch.utils.data import Dataset from transformers import AutoTokenizer, AutoProcessor -from chatlearn.data.vision_utils import process_image, process_video +from qwen_vl_utils import process_vision_info + from chatlearn.models.patches.transformers.qwen2_5_vl_patch import get_rope_index @@ -13,11 +14,7 @@ class PromptPipeline(Dataset): Input data_list: List[Dict]) { "data_source": data_source, - "images": [PIL.Image] - "prompt": [{ - "role": "user", - "content": question, - }], + "messages": openai-style messages List, "ability": "math", "reward_model": { "style": "rule", @@ -38,12 +35,11 @@ class PromptPipeline(Dataset): "prompt_token_length": int, # len(input_ids) "prompt": String, "position_ids": List[List], # [3, token_length] - "rope_deltas": Tensor, # [1,1] "data_source": String, "ground_truth": String, "multi_modal_data": {'image':[PIL.Image]}, # for vllm inference "mm_processor_kwargs": {'fps':[]}, # used for video useless now - "pixel_values": Tensor, # [token_num, token_length] + "pixel_values": Tensor, # [grid_num, pixel_num] "image_grid_thw": Tensor, # [1,3] 3 means t,h,w } """ @@ -53,115 +49,92 @@ def __init__( max_prompt_tokens_length: int, tokenizer: AutoTokenizer = None, processor: AutoProcessor = None, - enable_thinking=False + enable_thinking=False, + raw_chat=False ): # pylint: disable=super-init-not-called super().__init__() self.tokenizer = tokenizer self.processor = processor - # TODO default key for input_data - self.prompt_key = "prompt" - self.image_key = "images" - self.video_key = "videos" self.data = [] self.max_prompt = 0 for data_item in data_list: - messages = self._build_messages(data_item) - - model_inputs = {} - - assert self.processor is not None - - raw_prompt = self.processor.apply_chat_template(messages, add_generation_prompt=True, tokenize=False, enable_thinking=enable_thinking) - # multi_modal_data = {} - - images = None - if self.image_key in data_item and data_item.get(self.image_key, None) is not None: - images = [process_image(image) for image in data_item.pop(self.image_key)] - - # due to the image key is "image" instead of "images" in vllm, we need to use "image" here - # link: https://github.com/vllm-project/vllm/blob/3c545c0c3b98ee642373a308197d750d0e449403/vllm/multimodal/parse.py#L205 - # multi_modal_data["image"] = images - - videos = None - if self.video_key in data_item and data_item.get(self.video_key, None) is not None: - videos = [process_video(video) for video in data_item.pop(self.video_key)] - - # due to the video key is "video" instead of "videos" in vllm, we need to use "video" here - # link: https://github.com/vllm-project/vllm/blob/3c545c0c3b98ee642373a308197d750d0e449403/vllm/multimodal/parse.py#L205 - # multi_modal_data["video"] = [video.numpy() for video in videos] - - # TODO support video. Only images are supported now. - multi_modal_data = {'image':images} - mm_processor_kwargs = {'fps': []} - - model_inputs = self.processor(text=[raw_prompt], images=images, videos=videos, return_tensors="pt") - input_ids = model_inputs.pop("input_ids") - attention_mask = model_inputs.pop("attention_mask") - image_grid_thw = model_inputs.get("image_grid_thw") - pixel_values = model_inputs.get("pixel_values") - - # text only input_ids for vllm - raw_input_ids = self.tokenizer.encode(raw_prompt, add_special_tokens=False) - - position_ids, rope_deltas = get_rope_index( - self.processor, - input_ids=input_ids, - image_grid_thw=model_inputs.get("image_grid_thw"), - video_grid_thw=model_inputs.get("video_grid_thw"), - second_per_grid_ts=model_inputs.get("second_per_grid_ts"), - attention_mask=attention_mask, - ) - + messages = data_item.get("messages") data_source = data_item.get("data_source", "") ground_truth = data_item["reward_model"]["ground_truth"] - - # for vl model, raw_input_ids is only text input_ids for vllm inference - # input_ids is used for model forward_step and sglang inference (with image pad) - # sglang support both input_ids and raw_input_ids but to merge in all_tokens, input_ids is used + agent_name = data_item.get("agent_name", None) + agent_cfg_path = data_item.get("agent_cfg_path", None) processed_data = { - "raw_input_ids": raw_input_ids, - "input_ids": input_ids[0].tolist(), - "prompt_token_length": len(input_ids[0].tolist()), - "prompt": raw_prompt, - "position_ids": position_ids.squeeze().tolist(), - "rope_deltas": rope_deltas, "data_source": data_source, "ground_truth": ground_truth, - "multi_modal_data": multi_modal_data, - "mm_processor_kwargs": mm_processor_kwargs, - "pixel_values": pixel_values, - "image_grid_thw": image_grid_thw + "agent_name": agent_name, + "agent_cfg_path": agent_cfg_path } - - if len(input_ids[0]) > self.max_prompt: - self.max_prompt = len(input_ids[0]) - - if max_prompt_tokens_length > len(input_ids[0]): + for message in messages: + message['content'] = [ + {k: v for k, v in item.items() if v is not None} + for item in message['content'] + ] + if not raw_chat: + + model_inputs = {} + + assert self.processor is not None + + raw_prompt = self.processor.apply_chat_template(messages, add_generation_prompt=True, tokenize=False, enable_thinking=enable_thinking) + images, videos = process_vision_info(messages) + + # TODO support video. Only images are supported now. + multi_modal_data = {'image':images} + mm_processor_kwargs = {'fps': []} + + model_inputs = self.processor(text=[raw_prompt], images=images, videos=videos, return_tensors="pt") + input_ids = model_inputs.pop("input_ids") + attention_mask = model_inputs.pop("attention_mask") + image_grid_thw = model_inputs.get("image_grid_thw") + pixel_values = model_inputs.get("pixel_values") + + # text only input_ids for vllm + raw_input_ids = self.tokenizer.encode(raw_prompt, add_special_tokens=False) + # get position_ids used for sequence packing + position_ids, _ = get_rope_index( + self.processor, + input_ids=input_ids, + image_grid_thw=model_inputs.get("image_grid_thw"), + video_grid_thw=model_inputs.get("video_grid_thw"), + second_per_grid_ts=model_inputs.get("second_per_grid_ts"), + attention_mask=attention_mask, + ) + + # for vl model, raw_input_ids is only text input_ids for vllm inference + # input_ids is used for model forward_step and sglang inference (with image pad) + # sglang support both input_ids and raw_input_ids but to merge in all_tokens, input_ids is used + processed_data.update({ + "raw_input_ids": raw_input_ids, + "input_ids": input_ids[0].tolist(), + "prompt_token_length": len(input_ids[0].tolist()), + "prompt": raw_prompt, + "position_ids": position_ids.squeeze().tolist(), + "multi_modal_data": multi_modal_data, + "mm_processor_kwargs": mm_processor_kwargs, + "pixel_values": pixel_values, + "image_grid_thw": image_grid_thw + }) + if len(input_ids[0]) > self.max_prompt: + self.max_prompt = len(input_ids[0]) + + if max_prompt_tokens_length > len(input_ids[0]): + self.data.append(processed_data) + else: + # used in agent module + processed_data.update( + {"messages": messages} + ) self.data.append(processed_data) - self.valid_ratio = len(self.data) / len(data_list) - def _build_messages(self, example: dict): - messages: list = example.pop(self.prompt_key) - if self.image_key in example or self.video_key in example: - for message in messages: - content = message["content"] - content_list = [] - segments = re.split("(|