Open-R1:Deepseek-R1复现项目(包含SFT阶段、GRPO阶段代码详解)

Source

1.简介

在人工智能领域,一款名为DeepSeek-R1的模型正以其卓越的性能和极具竞争力的成本迅速崭露头角,引发了全球范围内的广泛关注。这款模型不仅在多个基准测试中表现出色,还因其开源的特性,为全球开发者和研究人员提供了前所未有的便利。

与此同时,HuggingFace推出的Open-R1项目致力于完全复现DeepSeek-R1的开发流程,并将其开源,以便更多人能够参与到这一前沿技术的探索中。Open-R1是以Qwen2.5-1.5B为基础,以deepseek-R1的训练过程打造的开源模型。该项目通过提供详细的训练脚本、评估工具以及数据生成方法,极大地降低了进入门槛,使得无论是独立研究人员、初创公司,还是业余爱好者,都能轻松上手并进行创新。

github地址:GitHub - huggingface/open-r1: Fully open reproduction of DeepSeek-R1

往期内容:Deepseek-R1:纯强化学习实现接近OpenAI o1水平(论文解读)_deepseek r1 论文-CSDN博客

-

-

2.原理回顾

训练过程

DeepSeek-R1的训练过程采用了多阶段策略,结合了监督微调(SFT)和强化学习(RL),最主要的阶段包括以下两个:

第一阶段:冷启动数据微调(SFT)

  • 目标:让模型具备基本的推理能力,防止在后续强化学习阶段出现语言混乱或无意义的输出。

  • 方法

    • 收集高质量的推理数据,包括数学推理、代码生成、长链推理等任务。

    • 使用监督微调(SFT),让模型在有限的数据集上学习基础推理逻辑。

    • 通过人工筛选和数据优化,提高模型的可读性和表达能力。

第二阶段:推理任务强化学习(GRPO)

  • 目标:增强模型的推理能力,使其能够更好地处理复杂任务。

  • 方法

    • 使用群体相对策略优化(GRPO)算法进行强化学习,结合准确性和格式奖励,以增强推理能力。

    • 通过奖励机制对模型生成的回答进行评分,调整模型策略以提高未来回答的质量。

-

组相对策略优化(GRPO)

其中组相对策略优化(GRPO)是DeepSeek-R1模型中采用的一种创新的强化学习算法,旨在优化大型语言模型(LLMs)在复杂任务中的表现,如数学推理和代码生成。Open-R1中最值得学习的就是GRPO的复现代码。

GRPO的核心思想是通过组内相对奖励来优化模型策略,而不是依赖传统的批评模型(critic model),从而简化了训练过程。在GRPO中,对于每个输入问题,模型会从当前策略中采样一组输出(例如16个候选解),然后根据这些输出的相对表现来计算奖励。具体来说,GRPO会计算组内奖励的均值和标准差,以此来归一化每个输出的优势函数。这种方法避免了传统PPO算法中需要额外训练价值模型的高成本。通过这种方式,GRPO能够更高效地调整策略模型的参数,以提高未来输出的质量。

GRPO的这种设计不仅降低了训练成本,还提高了计算效率,使得算法能够在单卡环境下完成训练。此外,GRPO直接将KL散度项集成到损失函数中,用于正则化策略模型,避免了在奖励信号中使用复杂的KL惩罚。这种直接的KL散度优化方式使得GRPO在优化过程中能够实现更精细的控制。

总的来说,GRPO通过组内相对奖励的机制,提供了一种更高效且简化的强化学习方法,特别适合于大型语言模型的优化。

-

-

3.环境配置

首先,本项目依赖于Python3.11和CUDA12.1,请确保虚拟环境的正确。

安装vllm==0.7.0并配置环境路径

pip install vllm>=0.7.0 --extra-index-url https://download.pytorch.org/whl/cu121
export LD_LIBRARY_PATH=$(python -c "import site; print(site.getsitepackages()[0] + '/nvidia/nvjitlink/lib')"):$LD_LIBRARY_PATH

安装其他依赖库(注意,一定是pytorch2.5.1)

pip install -e ".[dev]"

注册huggingface和wandb

huggingface-cli login
wandb login

在注册huggingface的时候显示以下界面

(open-r1) good@good:/media/good/4TB/mn/model/llm/open-r1-main$ huggingface-cli login

    _|    _|  _|    _|    _|_|_|    _|_|_|  _|_|_|  _|      _|    _|_|_|      _|_|_|_|    _|_|      _|_|_|  _|_|_|_|
    _|    _|  _|    _|  _|        _|          _|    _|_|    _|  _|            _|        _|    _|  _|        _|
    _|_|_|_|  _|    _|  _|  _|_|  _|  _|_|    _|    _|  _|  _|  _|  _|_|      _|_|_|    _|_|_|_|  _|        _|_|_|
    _|    _|  _|    _|  _|    _|  _|    _|    _|    _|    _|_|  _|    _|      _|        _|    _|  _|        _|
    _|    _|    _|_|      _|_|_|    _|_|_|  _|_|_|  _|      _|    _|_|_|      _|        _|    _|    _|_|_|  _|_|_|_|

    To log in, `huggingface_hub` requires a token generated from https://huggingface.co/settings/tokens .
Enter your token (input will not be visible): 

你需要点击网页:https://huggingface.co/settings/tokens

然后点击Create New Token,全选其中所有可选项,保存后复制token,粘贴到命令行,注意命令行不显示token,然后回车。

出现下面的界面说明成功了

Token has not been saved to git credential helper.
Your token has been saved to /home/good/.cache/huggingface/token
Login successful.
The current active token is: `open-r1`

-

-

4.sft

数据集和权重

数据集地址:https://huggingface.co/datasets/bespokelabs/Bespoke-Stratos-17k

初始模型权重地址:https://huggingface.co/Qwen/Qwen2.5-1.5B-Instruct

  1. 下载完成后注意修改recipes/qwen/Qwen2.5-1.5B-Instruct/sft/config_full.yaml的model_name_or_path和dataset_name,以匹配模型和数据集的位置。

-

sft阶段代码位于src/open_r1/sft.py,使用以下代码运行

ACCELERATE_LOG_LEVEL=info accelerate launch --config_file recipes/accelerate_configs/zero3.yaml src/open_r1/sft.py --config recipes/qwen/Qwen2.5-1.5B-Instruct/sft/config_full.yaml

或者在pycharm中仅仅设置参数:

--config recipes/qwen/Qwen2.5-1.5B-Instruct/sft/config_full.yaml

 3090Ti 24G显存能以batch-size=1的条件下运行,训练一轮大约6小时

-

代码解析

由于sft是大模型训练的常规过程,这里就简单介绍一下。

首先加载数据集和tokenizer

################
# Load datasets
################
dataset = load_dataset(script_args.dataset_name, name=script_args.dataset_config)

################
# Load tokenizer
################
tokenizer = AutoTokenizer.from_pretrained(
    model_args.model_name_or_path, trust_remote_code=model_args.trust_remote_code, use_fast=True
)
tokenizer.pad_token = tokenizer.eos_token

配置模型参数

###################
# Model init kwargs
###################
logger.info("*** Initializing model kwargs ***")
torch_dtype = (     # 根据配置选择合适的 torch_dtype 类型
    model_args.torch_dtype if model_args.torch_dtype in ["auto", None] else getattr(torch, model_args.torch_dtype)
)
quantization_config = get_quantization_config(model_args)       # 获取量化配置 quantization_config。
model_kwargs = dict(        # 构建模型初始化参数字典 model_kwargs
    revision=model_args.model_revision, 
    trust_remote_code=model_args.trust_remote_code,
    attn_implementation=model_args.attn_implementation,
    torch_dtype=torch_dtype,
    use_cache=False if training_args.gradient_checkpointing else True,
    device_map=get_kbit_device_map() if quantization_config is not None else None,
    quantization_config=quantization_config,
)
training_args.model_init_kwargs = model_kwargs

设置训练器

############################
# Initialize the SFT Trainer
############################
trainer = SFTTrainer(
    model=model_args.model_name_or_path,    # 指定模型路径
    args=training_args,    # 指定训练参数
    train_dataset=dataset[script_args.dataset_train_split],     # 指定训练数据集
    eval_dataset=dataset[script_args.dataset_test_split] if training_args.eval_strategy != "no" else None,  # 指定测试数据集
    processing_class=tokenizer,     # 指定tokenizer
    peft_config=get_peft_config(model_args),
    callbacks=get_callbacks(training_args, model_args),
)

训练与保存

###############
# Training loop
###############
logger.info("*** Train ***")
checkpoint = None
if training_args.resume_from_checkpoint is not None:    # 确定从哪个检查点恢复训练,优先使用 training_args.resume_from_checkpoint,其次使用 last_checkpoint。
    checkpoint = training_args.resume_from_checkpoint
elif last_checkpoint is not None:
    checkpoint = last_checkpoint
train_result = trainer.train(resume_from_checkpoint=checkpoint) # 训练模型,并返回训练结果。
metrics = train_result.metrics
metrics["train_samples"] = len(dataset[script_args.dataset_train_split])    # 获取训练数据集的大小,并保存到 metrics 中。
trainer.log_metrics("train", metrics)
trainer.save_metrics("train", metrics)      
trainer.save_state()        # 保存训练状态

##################################
# Save model and create model card
##################################
logger.info("*** Save model ***")
trainer.save_model(training_args.output_dir)    # 保存模型到指定路径。
logger.info(f"Model saved to {training_args.output_dir}")

测试

##########
# Evaluate
##########
if training_args.do_eval:
    logger.info("*** Evaluate ***")
    metrics = trainer.evaluate()    # 调用 trainer.evaluate() 进行评估,并获取评估结果 metrics
    metrics["eval_samples"] = len(dataset[script_args.dataset_test_split])  # 调用 trainer.evaluate() 进行评估,并获取评估结果 metrics
    trainer.log_metrics("eval", metrics)
    trainer.save_metrics("eval", metrics)

-

-

5.grpo

数据集和权重

数据集地址:https://huggingface.co/datasets/AI-MO/NuminaMath-TIR

初始模型权重地址:https://huggingface.co/deepseek-ai/DeepSeek-R1-Distill-Qwen-1.5B/tree/main

  1. 下载完成后注意修改recipes/qwen/Qwen2.5-1.5B-Instruct/grpo/confg_full.yaml的model_name_or_path和dataset_name,以匹配模型和数据集的位置。
  2. 然后修改num_processes,如果你有8块GPU,需设置为7(因为vllm需要占用一块GPU),以此类推。(如果只有一块GPU,建议直接将use_vllm改为false,然后将num_processes改为1)

外部代码解析

在Open-r1中,整个GRPO的流程被封装在trl库中,用户可以直接调用,快速使用。我们先来看外部调用代码。

奖励函数

具体来说,奖励函数包括两个:准确率奖励(accuracy_reward)和格式奖励(format_reward)。准确率奖励意味着解题越准确,分数越高;格式奖励意味着输出的格式越标准,分数越高。

@dataclass
class GRPOScriptArguments(ScriptArguments):
    """
    Script arguments for the GRPO training script.

    Args:
        reward_funcs (`list[str]`):
            List of reward functions. Possible values: 'accuracy', 'format'.
    """

    reward_funcs: list[str] = field(    # 它包含一个列表类型的参数 reward_funcs,默认值为 ["accuracy", "format"],用于指定奖励函数的类型。
        default_factory=lambda: ["accuracy", "format"],
        metadata={"help": "List of reward functions. Possible values: 'accuracy', 'format'"},
    )


def accuracy_reward(completions, solution, **kwargs):
    """Reward function that checks if the completion is the same as the ground truth.用于计算生成内容与标准答案的匹配度。"""
    contents = [completion[0]["content"] for completion in completions]
    rewards = []
    for content, sol in zip(contents, solution):    # 遍历每个完成项和对应的标准答案,解析标准答案。
        gold_parsed = parse(sol, extraction_mode="first_match", extraction_config=[LatexExtractionConfig()])
        if len(gold_parsed) != 0:   # 如果标准答案可解析,则进一步解析生成内容并验证其正确性,返回匹配度分数(1或0)。
            # We require the answer to be provided in correct latex (no malformed operators)
            answer_parsed = parse(
                content,
                extraction_config=[
                    LatexExtractionConfig(
                        normalization_config=NormalizationConfig(
                            nits=False,
                            malformed_operators=False,
                            basic_latex=True,
                            equations=True,
                            boxed=True,
                            units=True,
                        ),
                        # Ensures that boxed is tried first
                        boxed_match_priority=0,
                        try_extract_without_anchor=False,
                    )
                ],
                extraction_mode="first_match",
            )
            # Reward 1 if the content is the same as the ground truth, 0 otherwise
            reward = float(verify(answer_parsed, gold_parsed))
        else:   # 如果标准答案不可解析,则直接返回1并跳过该条目
            # If the gold solution is not parseable, we reward 1 to skip this example
            reward = 1.0
            print("Failed to parse gold solution: ", sol)
        rewards.append(reward)

    return rewards


def format_reward(completions, **kwargs):
    """使用正则表达式匹配格式Reward function that checks if the completion has a specific format."""
    pattern = r"^<think>.*?</think><answer>.*?</answer>$"
    completion_contents = [completion[0]["content"] for completion in completions]
    matches = [re.match(pattern, content) for content in completion_contents]
    return [1.0 if match else 0.0 for match in matches]

加载数据集、设置奖励函数、设置模型

# Load the dataset
dataset = load_dataset(script_args.dataset_name, name=script_args.dataset_config)

# Get reward functions
reward_funcs = [reward_funcs_registry[func] for func in script_args.reward_funcs]

torch_dtype = (
    model_args.torch_dtype if model_args.torch_dtype in ["auto", None] else getattr(torch, model_args.torch_dtype)
)
model_kwargs = dict(
    revision=model_args.model_revision,
    trust_remote_code=model_args.trust_remote_code,
    attn_implementation=model_args.attn_implementation,
    torch_dtype=torch_dtype,
    use_cache=False if training_args.gradient_checkpointing else True,
)
training_args.model_init_kwargs = model_kwargs

设置trainer

#############################
# Initialize the GRPO trainer
#############################
trainer = GRPOTrainer(
    model=model_args.model_name_or_path,        # 指定模型路径
    reward_funcs=reward_funcs,      # 指定奖励函数
    args=training_args,
    train_dataset=dataset[script_args.dataset_train_split],     # 指定训练集
    eval_dataset=dataset[script_args.dataset_test_split] if training_args.eval_strategy != "no" else None,
    peft_config=get_peft_config(model_args),    # 指定PEFT配置
    callbacks=get_callbacks(training_args, model_args),     # 指定回调函数
)

训练和保存

###############
# Training loop
###############
checkpoint = None
if training_args.resume_from_checkpoint is not None:
    checkpoint = training_args.resume_from_checkpoint
elif last_checkpoint is not None:
    checkpoint = last_checkpoint
train_result = trainer.train(resume_from_checkpoint=checkpoint)     # 训练模型,并返回训练结果。
metrics = train_result.metrics     
metrics["train_samples"] = len(dataset[script_args.dataset_train_split])    # 获取训练数据集的大小,并保存到 metrics 中。
trainer.log_metrics("train", metrics)
trainer.save_metrics("train", metrics)
trainer.save_state()    # 保存训练状态

##################################
# Save model and create model card
##################################
logger.info("*** Save model ***")
trainer.save_model(training_args.output_dir)    # 保存模型到指定路径。
logger.info(f"Model saved to {training_args.output_dir}")

评估

##########
# Evaluate
##########
if training_args.do_eval:
    logger.info("*** Evaluate ***")
    metrics = trainer.evaluate()
    metrics["eval_samples"] = len(dataset[script_args.dataset_test_split])
    trainer.log_metrics("eval", metrics)
    trainer.save_metrics("eval", metrics)

GRPO代码解析

初始化,包括以下步骤

  1. 初始化主模型,也就是被训练的模型
  2. 初始化参考模型。参考模型不仅提供了一个稳定的基线,还用于计算奖励、防止过拟合、计算策略偏差以及生成对比样本。这些功能使得参考模型成为优化主模型性能的重要工具。
  3. 初始化处理类,即Tokenizer
  4. 处理奖励函数
    1. 奖励函数可以是一个列表,也可以是一个单独的对象。如果是一个单独的对象,则将其包装为一个列表。
    2. 如果奖励函数是一个字符串(表示模型的路径或名称),则加载一个预训练的序列分类模型。num_labels=1 表示这是一个回归任务(输出一个奖励值)。
  5. 设置奖励处理类。奖励处理类(Reward Processing Class) 是一个用于处理和准备奖励函数输入的组件。它的主要作用是确保输入数据的格式和内容符合奖励函数的要求,从而使得奖励函数能够正确地计算奖励值。

参考模型(Reference Model)的作用

在强化学习(Reinforcement Learning, RL)或类似的自然语言处理(NLP)任务中,参考模型(Reference Model) 是一个非常重要的概念。它通常用于以下几个方面:

1. 作为基线模型

参考模型通常是主模型的一个静态副本,用于提供一个固定的性能基线。在强化学习中,主模型(即策略网络)会不断更新以优化其行为,而参考模型则保持不变。通过比较主模型和参考模型的输出,可以评估主模型的改进是否有效。

2. 计算奖励(Rewards)

在强化学习中,奖励函数用于评估生成的文本或行为的质量。参考模型可以用来计算这些奖励。例如,通过比较主模型生成的文本和参考模型生成的文本,可以计算出奖励值,从而指导主模型的训练。

3. 防止过拟合

参考模型可以作为一个稳定的参考点,帮助防止主模型在训练过程中过拟合。通过定期与参考模型进行比较,可以确保主模型不会偏离初始的性能太远。

4. 计算策略偏差(Policy Deviation)

在某些强化学习任务中,参考模型可以用来计算策略偏差,即主模型的策略与初始策略之间的差异。这种偏差可以用来调整学习率或决定是否需要恢复到更稳定的策略。

5. 生成对比样本

参考模型可以生成对比样本,用于训练主模型。这些对比样本可以帮助主模型学习到更鲁棒的特征表示,从而提高其在实际任务中的表现。

总结

参考模型在强化学习和自然语言处理任务中扮演了重要的角色。它不仅提供了一个稳定的基线,还用于计算奖励、防止过拟合、计算策略偏差以及生成对比样本。这些功能使得参考模型成为优化主模型性能的重要工具。

代码如下: 

# Models
# 1.Trained model 初始化主模型,也就是被训练的模型
model_init_kwargs = args.model_init_kwargs or {}
if isinstance(model, str):
    model_id = model
    torch_dtype = model_init_kwargs.get("torch_dtype")    # 模型的权重数据类型

    ...

    model_init_kwargs["use_cache"] = (
        False if args.gradient_checkpointing else model_init_kwargs.get("use_cache")
    )
    model = AutoModelForCausalLM.from_pretrained(model, **model_init_kwargs)


 
# 2.Reference model 初始化参考模型 它不仅提供了一个稳定的基线,还用于计算奖励、防止过拟合、计算策略偏差以及生成对比样本。这些功能使得参考模型成为优化主模型性能的重要工具。
if is_deepspeed_zero3_enabled():    # 是否启用了 DeepSpeed 的 ZeRO-3 优化。如果启用,会加载一个参考模型。
    self.ref_model = AutoModelForCausalLM.from_pretrained(model_id, **model_init_kwargs)
elif peft_config is None:
    # If PEFT configuration is not provided, create a reference model based on the initial model.
    self.ref_model = create_reference_model(model)      # 创建模型的静态参考副本,并将其设置为评估模式。
else:
    # If PEFT is used, the reference model is not needed since the adapter can be disabled
    # to revert to the initial model.
    self.ref_model = None

# 3.Processing class 初始化处理类,即Tokenizer
if processing_class is None:
    processing_class = AutoTokenizer.from_pretrained(model.config._name_or_path, padding_side="left")

# 4.Reward functions 奖励函数
if not isinstance(reward_funcs, list):    # 奖励函数可以是一个列表,也可以是一个单独的对象。如果是一个单独的对象,则将其包装为一个列表。
    reward_funcs = [reward_funcs]
for i, reward_func in enumerate(reward_funcs):    # 如果奖励函数是一个字符串(表示模型的路径或名称),则加载一个预训练的序列分类模型。num_labels=1 表示这是一个回归任务(输出一个奖励值)。
    if isinstance(reward_func, str):
        reward_funcs[i] = AutoModelForSequenceClassification.from_pretrained(
            reward_func, num_labels=1, **model_init_kwargs
        )
self.reward_funcs = reward_funcs

# 5.Reward processing class奖励处理类
if reward_processing_classes is None:    # 如果未提供奖励处理类,则为每个奖励函数创建一个默认的分词器。
    reward_processing_classes = [None] * len(reward_funcs)
elif not isinstance(reward_processing_classes, list):
    reward_processing_classes = [reward_processing_classes]
else:
    if len(reward_processing_classes) != len(reward_funcs):
        raise ValueError("The number of reward processing classes must match the number of reward functions.")

for i, (reward_processing_class, reward_func) in enumerate(zip(reward_processing_classes, reward_funcs)):
    if isinstance(reward_func, PreTrainedModel):
        if reward_processing_class is None:
            reward_processing_class = AutoTokenizer.from_pretrained(reward_func.config._name_or_path)    # 加载与奖励函数模型匹配的分词器。
        if reward_processing_class.pad_token_id is None:
            reward_processing_class.pad_token = reward_processing_class.eos_token
        # The reward model computes the reward for the latest non-padded token in the input sequence.
        # So it's important to set the pad token ID to the padding token ID of the processing class.
        reward_func.config.pad_token_id = reward_processing_class.pad_token_id    # 将奖励函数模型的填充标记 ID 设置为分词器的填充标记 ID,确保奖励计算时能够正确处理填充标记
        reward_processing_classes[i] = reward_processing_class    
self.reward_processing_classes = reward_processing_classes        # 将奖励处理类列表存储为类的属性。

-

训练

中间的其他过程并不重要,我们直接跳过,来看训练代码。

下面的代码定义了 Trainer 类中的 training_step 方法,用于在一个批次的输入上执行训练步骤。主要过程如下:

  1. 启动模型的训练
  2. 将输入转换为token ID,并生成对应的掩码
  3. 调用compute_loss计算损失
  4. 清理缓存并计算平均损失
class Trainer:
    def training_step(
        self, model: nn.Module, inputs: Dict[str, Union[torch.Tensor, Any]], num_items_in_batch=None
    ) -> torch.Tensor:
        
        model.train()
        if hasattr(self.optimizer, "train") and callable(self.optimizer.train):
            self.optimizer.train()

        inputs = self._prepare_inputs(inputs)   # tokenize并生成掩码

        ...

        with self.compute_loss_context_manager():
            loss = self.compute_loss(model, inputs, num_items_in_batch=num_items_in_batch)  # 计算损失

        del inputs
        if (    # 清理缓存
            self.args.torch_empty_cache_steps is not None
            and self.state.global_step % self.args.torch_empty_cache_steps == 0
        ):
            if is_torch_xpu_available():
                torch.xpu.empty_cache()
            elif is_torch_mlu_available():
                torch.mlu.empty_cache()
            elif is_torch_musa_available():
                torch.musa.empty_cache()
            elif is_torch_npu_available():
                torch.npu.empty_cache()
            elif is_torch_mps_available(min_version="2.0"):
                torch.mps.empty_cache()
            else:
                torch.cuda.empty_cache()

        kwargs = {}

        if self.args.n_gpu > 1:     # 计算损失
            loss = loss.mean()  # mean() to average on multi-gpu parallel training

        if self.use_apex:
            with amp.scale_loss(loss, self.optimizer) as scaled_loss:
                scaled_loss.backward()
        else:
            # Finally we need to normalize the loss for reporting
            if not self.model_accepts_loss_kwargs and self.compute_loss_func is None:
                loss = loss / self.args.gradient_accumulation_steps

其中_prepare_inputs()函数用于对文本并进行预处理,并生成优势值(advantages)

这段代码是GRPOTrainer类中的_prepare_inputs方法,主要功能如下:

  1. 准备输入:从输入字典中提取提示文本并进行预处理。
  2. 生成补全:根据配置选择使用vLLM或常规生成方式生成补全文本。
  3. 处理补全文本:对生成的补全文本进行填充、拼接和掩码处理。
  4. 计算奖励:通过多个奖励函数计算生成文本的奖励,并归一化为优势值(advantages)。
    1. 计算每个生成的完成结果的总奖励。

    2. 计算每个组的平均奖励和标准差。

    3. 将平均奖励和标准差扩展到与原始奖励值相同的维度。

    4. 计算每个生成的完成结果的奖励与其所在组的平均奖励的差值。

    5. 将差值除以标准差(加上一个小常数),得到最终的优势值。

  5. 记录指标:将奖励和优势值记录到训练指标中。

解释

  • 分组奖励:将生成的完成结果按组划分,每组包含多个生成结果。每个组的平均奖励和标准差用于衡量该组的整体表现。

  • 优势值:衡量每个生成的完成结果相对于其所在组的平均表现的优势。如果一个生成结果的奖励高于其所在组的平均奖励,则其优势值为正;否则为负。通过标准化(除以标准差),优势值可以更好地反映每个生成结果的相对表现。计算公式:Advantage=\frac{Rewards-Mean(Grouped\, Rewards)}{Std(Grouped\, Rewards)+\varepsilon }

    其中:
    • Rewards 是每个生成的完成结果的总奖励。

    • Mean Grouped Rewards 是每个组的平均奖励。

    • Std Grouped Rewards 是每个组的奖励标准差。

    • ϵ 是一个小常数(如 1e-4),用于防止除以零的错误。

class GRPOTrainer(Trainer):
    def _prepare_inputs(self, inputs: dict[str, Union[torch.Tensor, Any]]) -> dict[str, Union[torch.Tensor, Any]]:
        # 1.从输入数据中提取提示文本,并对其进行预处理
        device = self.accelerator.device
        prompts = [x["prompt"] for x in inputs]
        prompts_text = [maybe_apply_chat_template(example, self.processing_class)["prompt"] for example in inputs]
        prompt_inputs = self.processing_class(
            prompts_text, return_tensors="pt", padding=True, padding_side="left", add_special_tokens=False
        )
        prompt_inputs = super()._prepare_inputs(prompt_inputs)
        prompt_ids, prompt_mask = prompt_inputs["input_ids"], prompt_inputs["attention_mask"]

        if self.max_prompt_length is not None:
            prompt_ids = prompt_ids[:, -self.max_prompt_length :]
            prompt_mask = prompt_mask[:, -self.max_prompt_length :]

        # 2.生成文本并分离生成的提示和完成部分 Generate completions using either vLLM or regular generation
        if self.args.use_vllm:
            ...
        else:
            # Regular generation path
            with unwrap_model_for_generation(self.model, self.accelerator) as unwrapped_model:
                prompt_completion_ids = unwrapped_model.generate(    # 使用 unwrap_model_for_generation 解包模型进行文本生成,得到完整的提示加完成的 ID 序列。
                    prompt_ids, attention_mask=prompt_mask, generation_config=self.generation_config
                )

            # Compute prompt length and extract completion ids
            prompt_length = prompt_ids.size(1)
            prompt_ids = prompt_completion_ids[:, :prompt_length]
            completion_ids = prompt_completion_ids[:, prompt_length:]
            prompt_mask = prompt_mask.repeat_interleave(self.num_generations, dim=0)

        # 3.处理序列中的EOS标记,生成掩码  Mask everything after the first EOS token
        is_eos = completion_ids == self.processing_class.eos_token_id       # 找到每个序列中的EOS标记位置
        eos_idx = torch.full((is_eos.size(0),), is_eos.size(1), dtype=torch.long, device=device)
        eos_idx[is_eos.any(dim=1)] = is_eos.int().argmax(dim=1)[is_eos.any(dim=1)]  # 更新 eos_idx 中有EOS标记的行,记录第一个EOS标记的位置。
        sequence_indices = torch.arange(is_eos.size(1), device=device).expand(is_eos.size(0), -1)   # 生成序列索引矩阵 sequence_indices
        completion_mask = (sequence_indices <= eos_idx.unsqueeze(1)).int()      # 创建掩码 completion_mask,标记所有在第一个EOS标记之前的位置。

        # Concatenate prompt_mask with completion_mask for logit computation
        attention_mask = torch.cat([prompt_mask, completion_mask], dim=1)  # (B*G, P+C)

        logits_to_keep = completion_ids.size(1)  # we only need to compute the logits for the completion tokens

        # 4.计算参考模型的对数概率
        with torch.inference_mode():
            if self.ref_model is not None:
                ref_per_token_logps = self._get_per_token_logps(    # 计算参考模型 ref_model 在给定的输入 prompt_completion_ids 上每个 token 的对数概率。
                    self.ref_model, prompt_completion_ids, attention_mask, logits_to_keep
                )
            else:
                ...

        # 5.解码生成的完成结果,并根据输入是否为对话格式进行处理。  Decode the generated completions
        completions = self.processing_class.batch_decode(completion_ids, skip_special_tokens=True)      # 使用 batch_decode 方法将 completion_ids 解码为文本,跳过特殊标记
        if is_conversational(inputs[0]):    # 如果输入是对话格式,则将每个完成结果封装为包含角色和内容的字典列表。
            completions = [[{"role": "assistant", "content": completion}] for completion in completions]

        # 6.计算奖励  Compute the rewards
        prompts = [prompt for prompt in prompts for _ in range(self.num_generations)]  # repeat prompts

        rewards_per_func = torch.zeros(len(prompts), len(self.reward_funcs), device=device)     # 初始化一个二维张量 rewards_per_func,用于存储每个奖励函数的奖励值。
        for i, (reward_func, reward_processing_class) in enumerate(
            zip(self.reward_funcs, self.reward_processing_classes)
        ):
            if isinstance(reward_func, nn.Module):  # Module instead of PretrainedModel for compat with compiled models
                ...
            else:
                # Repeat all input columns (but "prompt" and "completion") to match the number of generations
                reward_kwargs = {key: [] for key in inputs[0].keys() if key not in ["prompt", "completion"]}
                for key in reward_kwargs:
                    for example in inputs:
                        # Repeat each value in the column for `num_generations` times
                        reward_kwargs[key].extend([example[key]] * self.num_generations)
                output_reward_func = reward_func(prompts=prompts, completions=completions, **reward_kwargs)     #  调用奖励函数计算奖励
                rewards_per_func[:, i] = torch.tensor(output_reward_func, dtype=torch.float32, device=device)

        # 7.计算所有奖励函数的总和 Sum the rewards from all reward functions
        rewards = rewards_per_func.sum(dim=1)       # 计算所有奖励函数的总和,得到一个形状为 (B,) 的张量 rewards。

        # 8.计算每组奖励的平均值和方差   Compute grouped-wise rewards
        mean_grouped_rewards = rewards.view(-1, self.num_generations).mean(dim=1)   # 计算每个分组的奖励均值 mean_grouped_rewards
        std_grouped_rewards = rewards.view(-1, self.num_generations).std(dim=1)     # 计算每个分组的奖励标准差 std_grouped_rewards

        # 9.计算advantages     Normalize the rewards to compute the advantages
        mean_grouped_rewards = mean_grouped_rewards.repeat_interleave(self.num_generations, dim=0)  # 将平均奖励和标准差重复扩展以匹配奖励的维度
        std_grouped_rewards = std_grouped_rewards.repeat_interleave(self.num_generations, dim=0)
        advantages = (rewards - mean_grouped_rewards) / (std_grouped_rewards + 1e-4)    # 计算每个奖励与扩展后的平均奖励的差值, 将差值除以扩展后的标准差加一个小常数,防止除零错误。

        # 10. Log the metrics  记录每个奖励函数的平均奖励值
        reward_per_func = self.accelerator.gather_for_metrics(rewards_per_func).mean(0)
        for i, reward_func in enumerate(self.reward_funcs):
            if isinstance(reward_func, nn.Module):  # Module instead of PretrainedModel for compat with compiled models
                reward_func_name = reward_func.config._name_or_path.split("/")[-1]
            else:
                reward_func_name = reward_func.__name__
            self._metrics[f"rewards/{reward_func_name}"].append(reward_per_func[i].item())

        self._metrics["reward"].append(self.accelerator.gather_for_metrics(rewards).mean().item())
        self._metrics["reward_std"].append(self.accelerator.gather_for_metrics(std_grouped_rewards).mean().item())

接下来我们来看compute_loss(),注意,这个是GRPOTrainer覆写的函数,不是Trainer的函数,具体步骤如下:

  1. 分别用于表示提示部分、完整部分的ID及其对应的掩码。
  2. 调用self._get_per_token_logps(),使用模型计算每个输入 token 的对数概率。
  3. 计算KL散度(通过参考模型和主模型的对数概率的差异计算KL散度)并应用
  4. 记录信息。
class GRPOTrainer(Trainer):
    def compute_loss(self, model, inputs, return_outputs=False, num_items_in_batch=None):
        
        prompt_ids, prompt_mask = inputs["prompt_ids"], inputs["prompt_mask"]   # 分别用于表示提示部分的ID及其对应的掩码。
        completion_ids, completion_mask = inputs["completion_ids"], inputs["completion_mask"]   # 分别用于表示完成部分的ID及其对应的掩码。
        input_ids = torch.cat([prompt_ids, completion_ids], dim=1)
        attention_mask = torch.cat([prompt_mask, completion_mask], dim=1)
        logits_to_keep = completion_ids.size(1)  # we only need to compute the logits for the completion tokens

        per_token_logps = self._get_per_token_logps(model, input_ids, attention_mask, logits_to_keep)   # 计算每个输入 token 的对数概率。

        # 计算KL散度 Compute the KL divergence between the model and the reference model
        ref_per_token_logps = inputs["ref_per_token_logps"]     # 从输入中获取参考模型的每令牌对数概率
        per_token_kl = torch.exp(ref_per_token_logps - per_token_logps) - (ref_per_token_logps - per_token_logps) - 1   # 计算每令牌的KL散度

        # x - x.detach() allows for preserving gradients from x
        advantages = inputs["advantages"]
        per_token_loss = torch.exp(per_token_logps - per_token_logps.detach()) * advantages.unsqueeze(1)    # 计算优势值加权的每token损失。
        per_token_loss = -(per_token_loss - self.beta * per_token_kl)   # 应用KL散度惩罚项调整每token损失。
        loss = ((per_token_loss * completion_mask).sum(dim=1) / completion_mask.sum(dim=1)).mean()  # 使用完成掩码对损失进行加权平均,最后求均值。

        # Log the metrics   
        ## 计算并记录完成长度
        completion_length = self.accelerator.gather_for_metrics(completion_mask.sum(1)).float().mean().item()   # 使用 gather_for_metrics 汇总所有进程的 completion_mask 并按行求和
        self._metrics["completion_length"].append(completion_length)
        ## 计算每个样本的平均KL散度,并将结果收集到metrics中
        mean_kl = ((per_token_kl * completion_mask).sum(dim=1) / completion_mask.sum(dim=1)).mean()
        self._metrics["kl"].append(self.accelerator.gather_for_metrics(mean_kl).mean().item())

        return loss

    def _get_per_token_logps(self, model, input_ids, attention_mask, logits_to_keep):   # 计算每个输入 token 的对数概率。
        # We add 1 to `logits_to_keep` because the last logits of the sequence is later excluded
        logits = model(
            input_ids=input_ids, attention_mask=attention_mask, logits_to_keep=logits_to_keep + 1
        ).logits  # (B, L, V)   调用模型获取 logits,增加 logits_to_keep 以排除最后一个 logit。
        logits = logits[:, :-1, :]  # (B, L-1, V), 排除最后一个 logit,因为它对应于下一个 token 的预测。

        # Compute the log probabilities for the input tokens. Use a loop to reduce memory peak.
        per_token_logps = []
        for logits_row, input_ids_row in zip(logits, input_ids[:, -logits_to_keep:]):   # 使用循环逐行计算每个 token 的对数概率,并减少内存峰值。
            log_probs = logits_row.log_softmax(dim=-1)
            token_log_prob = torch.gather(log_probs, dim=1, index=input_ids_row.unsqueeze(1)).squeeze(1)
            per_token_logps.append(token_log_prob)
        return torch.stack(per_token_logps)     # 返回堆叠后的对数概率张量。

公式解读

代码中的KL散度公式可以展开为:KL=e^{ref\_logp-logp}-(ref\_logp-logp)-1

其中:

  • ref_logp 是参考模型的对数概率。

  • logp 是当前模型的对数概率。

-

这个公式实际上是基于以下数学推导:

  1. 原始的KL散度公式D_{KL}(P||Q)=\sum_{x}P(x)log(\frac{P(x)}{Q(x)})
    在这里,P(x) 是参考模型的概率,Q(x) 是当前模型的概率。

  2. 对数概率的表示\begin{matrix} logP(x)=ref\_logp\\ logQ(x)=logp \end{matrix}

  3. 代入对数概率D_{KL}(P||Q)=\sum_{x}e^{(ref\_logp)}\cdot (ref\_logp-logp)

  4. 简化公式: 在代码中,为了计算每个令牌的KL散度,使用了以下公式:KL=e^{ref\_logp-logp}-(ref\_logp-logp)-1

  5. 这个公式可以进一步理解为:KL=e^{\bigtriangleup logp}-(\bigtriangleup logp)-1,其中\bigtriangleup logp=ref\_logp-logp

解释

  • exp(Δlogp):这是参考模型和当前模型对数概率差异的指数。它表示参考模型的概率与当前模型的概率的比值。

  • Δlogp:这是参考模型和当前模型对数概率的直接差异。

  • 减去1:这个操作是为了调整公式,使其在 Δlogp=0 时(即两个模型的概率相同)结果为0。

这个公式在某些情况下用于近似KL散度,特别是在处理对数概率时。它避免了直接计算概率比值的对数,从而在数值上更加稳定。此外,它还提供了一个关于两个模型对数概率差异的直观度量。该公式是基于对数概率差异的一种近似计算方式。它在数值上更加稳定,并且能够有效地衡量两个模型之间的差异。

-

-

6.总结

DeepSeek-R1的意义在于其通过创新的训练方法和技术突破,为大型语言模型(LLMs)在推理能力、训练效率和成本控制等方面树立了新的标杆。其采用的多阶段训练策略,结合冷启动数据微调、强化学习(RL)和监督微调(SFT),不仅显著提升了模型在复杂推理任务中的表现,还解决了传统单阶段训练方法难以克服的挑战。DeepSeek-R1的训练过程证明了强化学习可以有效提升模型的推理能力,同时其创新的GRPO算法大幅降低了训练成本,使得高性能推理模型的开发更加高效和经济。此外,DeepSeek-R1还通过拒绝采样和数据飞轮机制,进一步优化了模型的通用性和适应性,使其在多种任务场景中都能表现出色。


如果你觉得这篇文章对你有帮助,不妨点个赞、关注一下,或者收藏起来,方便以后随时查阅。你的支持是我不断创作的动力,感谢你的陪伴!