pytorch LLM训练过程中的精度调试实践

pytorch LLM训练过程中的精度调试实践

  • 1.查看权值的最大,最小值
  • 2.检测训练过程中的异常值
    • A.通过hook module,检测异常值
    • B.拦截算子,检测异常值,打印调用栈,保存输入参数,方便复现
    • C.拦截算子,同时执行cpu计算,对比误差,找到第一个精度异常的算子
    • D.以上的代码
  • 3.根据上面dump的数据,准备最小复现环境

本文记录了,在某加速卡上进行LLM训练,精度问题的定位过程

1.查看权值的最大,最小值

tee dump_weight.py <<-'EOF'
import torch
import json
import numpy as np

weight_files=set()
with open('../llama-2-7b-hf/pytorch_model.bin.index.json', 'r') as f:
    index_data = json.load(f)["weight_map"]
for k,v in index_data.items():
    weight_files.add(v)
gmax=[]
gmin=[]
for i in weight_files:
    for k,w in torch.load(f"../llama-2-7b-hf/{i}",map_location="cpu").items():
        print(f"{k:<64s},max:{w.max().item():8.2f},{w.min().item():8.2f}")
        gmax.append(w.max().item())
        gmin.append(w.min().item())
print(f"\n\nglobal max:{np.max(gmax)} min:{np.min(gmin)}")        
EOF
python3 dump_weight.py

2.检测训练过程中的异常值

A.通过hook module,检测异常值

B.拦截算子,检测异常值,打印调用栈,保存输入参数,方便复现

C.拦截算子,同时执行cpu计算,对比误差,找到第一个精度异常的算子

D.以上的代码

import torch
from torch import nn
import math
import copy
import torch.nn.functional as F

device="xpu"    
if torch.cuda.is_available():
    device="cuda"

def check_tensor(tensor, module_name, hook_type):
    if isinstance(tensor, torch.Tensor):
        if not torch.isfinite(tensor).all():
            print(f"[ERROR] Detected NaN or Inf in {hook_type} pass of {module_name} rank:{torch.distributed.get_rank()}")
            #os._exit(0)
    elif isinstance(tensor, list) or isinstance(tensor, tuple):
        for t in tensor:
            check_tensor(t, module_name, hook_type)

# 定义钩子函数来监测 NaN 和 Inf
def forward_hook(module, inputs, output, module_name):
    check_tensor(inputs, module_name, "forward-inputs")
    check_tensor(output, module_name, "forward-output")

def backward_hook(module, grad_input, module_name):
    check_tensor(grad_input, module_name, "backward")

from torch.utils._python_dispatch import TorchDispatchMode
from dataclasses import dataclass
from typing import Any
import inspect

@dataclass
class _ProfilerState:
    cls: Any
    object: Any = None

def is_valid(val,name,stack):
    # 判断是否为tensor或Parameter
    if isinstance(val, (torch.Tensor, nn.Parameter)):
        if not torch.isfinite(val.cpu()).all():
            print("[ERROR]:",name,stack)
            return -1
    return 0

def check_tensor(name,stack, tensor):
    if isinstance(tensor,(torch.Tensor, nn.Parameter)):
      return is_valid(tensor,name,stack)
    elif isinstance(tensor, (tuple, list)):
        for idx, t in enumerate(tensor):
            if is_valid(t,name,stack)!=0:
                return -1
    return 0

def save_tensor_data(tensor,name):
    if isinstance(tensor,(torch.Tensor, nn.Parameter)):
	    torch.save(tensor,f"{name}.pth")
    elif isinstance(tensor, (tuple, list)):
        for idx, t in enumerate(tensor):
            save_tensor_data(t,f"{name}-{idx}")

def to_cpu_and_fp32(data):
    """
    将输入数据中的所有GPU Tensor转换为CPU Tensor,并将FP16类型的张量转换为FP32类型。

    参数:
    data: 可能是单个Tensor,列表,元组,字典或这些数据类型的嵌套结构。

    返回:
    与输入结构相似,但所有的GPU张量都已转换为CPU张量,所有的FP16张量都转换为FP32张量。
    """
    if isinstance(data, torch.Tensor):
        # 将GPU张量转换为CPU张量
        tensor = data.cpu()
        # 如果张量是FP16类型,则转换为FP32
        if tensor.dtype == torch.float16:
            tensor = tensor.to(torch.float32)
        return tensor
    elif isinstance(data, list):
        return [to_cpu_and_fp32(item) for item in data]
    elif isinstance(data, tuple):
        return tuple(to_cpu_and_fp32(item) for item in data)
    elif isinstance(data, dict):
        return {key: to_cpu_and_fp32(value) for key, value in data.items()}
    else:
        # 如果既不是Tensor也不是列表、元组或字典,则直接返回数据
        return data

@dataclass
class TensorDesc:
    cat: Any
    shape: Any
    dtype: Any
    value: Any

@dataclass
class DataDescriptor:
    class_name: Any
    shape: Any
    value: Any
    dtype: Any
    max_v: Any
    min_v: Any    
    def __repr__(self) -> str:
        output_str=[]
        if self.shape:
            output_str.append("shape:({})".format(",".join([str(x) for x in self.shape])))
        if self.max_v:
            output_str.append(f"max:{self.max_v:.6f} min:{self.min_v:.6f}")
        if self.value is not None:
            if self.class_name in ["list","tuple"]:
                for t in self.value:
                    output_str.append(str(t))
            else:
                output_str.append(str(self.value))
        if self.dtype and self.class_name in ["Tensor","ndarray","Parameter"]:
            output_str.append(str(self.dtype))
        return "{}({})".format(self.class_name,"-".join(output_str))

class InputDescriptor:
    def __init__(self) -> None:
        self.input_vars=[]
        self.input_kwargs={}
        
    def _save_var(self,v):
        class_name=v.__class__.__name__
        if class_name in ["Tensor","Parameter"]:
            return DataDescriptor(class_name,list(v.shape),None,v.dtype,v.max().item(),v.min().item())
        elif class_name in ["UntypedStorage"]:
            pass
            #return DataDescriptor(class_name,None,list(v),type(v))        
        elif class_name in ["int","float","str","dtype","layout","device","NoneType","bool","memory_format"]:
            return DataDescriptor(class_name,None,v,type(v),None,None)
        elif class_name in ["ndarray"]:
            return DataDescriptor(class_name,list(v.shape),None,v.dtype,None,None)
        elif class_name in ["list","tuple"]:
            output=[]
            for t in v:
                output.append(self._save_var(t))
            return DataDescriptor(class_name,None,output,None,None,None)
        
    def save_vars(self,*args,**kwargs):        
        for arg in args:
            self.input_vars.append(self._save_var(arg))
        for k,v in kwargs.items():
            self.input_kwargs[k]=self._save_var(v)

    def __repr__(self) -> str:
        return str(self.input_vars) + "#" + str(self.input_kwargs)

def compare_tensor(tensorA,tensorB,name,params):
    if isinstance(tensorA,(torch.Tensor, nn.Parameter)):
        mse_loss = torch.nn.MSELoss()
        loss = mse_loss(tensorA.cpu().float(), tensorB.cpu().float()).item()
        print(f"{name:<64s} {loss:f} {params}")
        return loss<1e-2
    elif isinstance(tensorA, (tuple, list)):
        for idx, t in enumerate(tensorA):
            if not compare_tensor(tensorA[idx],tensorB[idx],f"{name}-{idx}",params):
                return False
    return True
    
def is_in_blacklist(name):
    black_list=["empty","like","zero","detach","has","view",
                "copy","arange","fill","ones","lift_fresh","alias",
                "scalar_tensor","clone","stack","slice","source",
                "select","random","unsqueeze","expand","normal","bernoulli"]
    for i in black_list:
        if name.find(i)>=0:
            return False
    return True
    
class TorchOpDiffDispatchMode(TorchDispatchMode):
    def __init__(self,parent):
        super().__init__()
        self.parent=parent
        self.global_index=0

    def __torch_dispatch__(self, func, types, args=(), kwargs=None):
        func_packet = func._overloadpacket
        op_name=f"{func}"
        enable_dump= is_in_blacklist(op_name)
        self.global_index+=1
        if kwargs is None:
            kwargs = {}
        if enable_dump:
            args_cpu = to_cpu_and_fp32(args)
            kwargs_cpu = to_cpu_and_fp32(kwargs)
            cpu_out=func(*args_cpu, **kwargs_cpu)
        ret= func(*args, **kwargs)
        if enable_dump:
            desc=InputDescriptor()
            desc.save_vars(*args,**kwargs)        
            if not compare_tensor(cpu_out,ret,op_name,str(desc)):
                save_tensor_data(args,f"{self.global_index}_{torch.distributed.get_rank()}{op_name}-input")
                save_tensor_data(ret,f"{self.global_index}_{torch.distributed.get_rank()}{op_name}-output")
        return ret

class TorchNanDetDispatchMode(TorchDispatchMode):
    def __init__(self,parent):
        super().__init__()
        self.parent=parent
        self.global_index=0

    def __torch_dispatch__(self, func, types, args=(), kwargs=None):
        func_packet = func._overloadpacket   
        op_name=f"{func}"  
        self.global_index+=1
        enable_dump= is_in_blacklist(op_name)
        if kwargs is None:
            kwargs = {}
        stacks=[i for i in inspect.stack()]
        stacks_sz=len(stacks)
        msg=[]
        for idx in range(stacks_sz-1,1,-1):
            if "self" in stacks[idx].frame.f_locals:
                class_name = stacks[idx].frame.f_locals["self"].__class__.__name__
            else:
                class_name=""
            msg.append(f"{stacks[idx].filename}:[{class_name}]:{stacks[idx].function}")        
        valid=0
        if enable_dump:
            valid+=check_tensor(f"aten-{op_name}-input","\n".join(msg),args)    
        ret= func(*args, **kwargs)
        if enable_dump:
            valid+=check_tensor(f"{op_name}-output","\n".join(msg), ret)
        if valid!=0:
            save_tensor_data(args,f"{self.global_index}_{torch.distributed.get_rank()}{op_name}-input")
            save_tensor_data(ret,f"{self.global_index}_{torch.distributed.get_rank()}{op_name}-output")
        return ret

class TorchHook:
    _CURRENT_Dumper = None
    def __init__(self,state):
        self.p= _ProfilerState(state)
        
    def __enter__(self):
        assert TorchHook._CURRENT_Dumper is None
        TorchHook._CURRENT_Dumper = self
        if self.p.object is None:
            o = self.p.cls(self)
            o.__enter__()
            self.p.object = o
        else:
            self.p.object.step()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        TorchHook._CURRENT_Dumper = None
        if self.p.object is not None:
            self.p.object.__exit__(exc_type, exc_val, exc_tb)
            del self.p.object

def clones(module, N):
    return nn.ModuleList([copy.deepcopy(module) for _ in range(N)])

class ScaledDotProductAttention(nn.Module):
    def __init__(self):
        super(ScaledDotProductAttention, self).__init__()

    def forward(self,query, key, value, mask=None, dropout=None):
        d_k = query.size(-1)
        scores = query@key.transpose(-2,-1) / math.sqrt(d_k)
        if mask is not None:
            scores = scores.masked_fill(mask == 0, -1e20)
        p_attn = F.softmax(scores, dim = -1)
        if dropout is not None:
            p_attn = dropout(p_attn)
        return p_attn@value, p_attn

class MultiHeadAttention(nn.Module):
    def __init__(self, h, d_model, dropout=0.1):
        super(MultiHeadAttention, self).__init__()
        assert d_model % h == 0
        self.d_k = d_model // h
        self.h = h
        self.linears = clones(nn.Linear(d_model, d_model), 4)
        self.attn = None
        self.dropout = nn.Dropout(p=dropout)
        self.attention = ScaledDotProductAttention()

    def forward(self, query, key, value, mask=None):
        if mask is not None:
            mask = mask.unsqueeze(1)
        nbatches = query.size(0)
        query=self.linears[0](query).view(nbatches, -1, self.h, self.d_k)
        query=query.transpose(1, 2)
        key=self.linears[1](key).view(nbatches, -1, self.h, self.d_k)
        key=key.transpose(1, 2)
        value=self.linears[2](value).view(nbatches, -1, self.h, self.d_k).transpose(1, 2)
        x, self.attn = self.attention(query, key, value, mask=mask,
                                 dropout=self.dropout)
        x = x.transpose(1, 2).contiguous().view(nbatches, -1, self.h * self.d_k)
        return self.linears[-1](x)

torch.random.manual_seed(1)
model = MultiHeadAttention(h=8, d_model=64).half().to(device)
model.eval()

# 通过hook Module去检测是否有Nan值
for name, module in model.named_modules():
    if isinstance(module, nn.Module):  # 检测是否是 nn.Module 的子类
        module.register_forward_hook(lambda module, inputs, output, name=name: forward_hook(module, inputs, output, name))
        module.register_full_backward_pre_hook(lambda module, grad_input, name=name: backward_hook(module, grad_input, f"{name}-in"))
        module.register_full_backward_hook(lambda module, grad_input,name=name: backward_hook(module, grad_input, f"{name}-out"))

q1 = torch.ones((100, 50, 64),dtype=torch.float32).half().to(device)
k1 = q1.clone()
v1 = q1.clone()

# 通过拦截算子去检测是否有nan值,并打印调用栈,保存输入参数,方便后续复现
with TorchHook(TorchNanDetDispatchMode):
    out = model(q1,k1,v1).sum()
    print("out:",out.item())

# 通过与cpu比较算子的计算误差,保存输入参数
with TorchHook(TorchOpDiffDispatchMode):
    out = model(q1,k1,v1).sum()
    print("out:",out.item())

3.根据上面dump的数据,准备最小复现环境

tee demo.py <<-'EOF'
import torch
import numpy as np

device="xpu"

def largest_k_errors(tensor1, tensor2, k, eps=1e-12):
    """
    找到两个张量之间误差最大的 k 个元素及其原始数据。
    
    参数:
    tensor1: 第一个输入张量。
    tensor2: 第二个输入张量。
    k (int): 要找的误差最大的元素个数。
    eps (float): 防止除以零的一个很小的值,默认值为1e-12。
    
    返回:
    包含误差最大的 k 个元素及其原始数据的元组 (errors, indices, orig_values1, orig_values2)。
    """
    # 对输入张量进行广播操作,使它们的形状一致
    tensor1, tensor2 = torch.broadcast_tensors(tensor1, tensor2)

    # 计算绝对误差
    absolute_diff = torch.abs(tensor1 - tensor2)
    
    # 计算基准值,避免除以零的情况
    denom = torch.abs(tensor1) + torch.abs(tensor2) + eps
    
    # 计算相对误差
    relative_error = absolute_diff / denom
    
    # 把inf和nan替换为零
    relative_error = torch.where(
        torch.isfinite(relative_error),
        relative_error,
        torch.zeros_like(relative_error)
    )
    
    # 找到误差最大的 k 个元素的索引
    k = min(k, tensor1.numel())  # 确保 k 不超过张量的元素总数
    _, indices = torch.topk(relative_error.view(-1), k)
    
    # 提取原始数据和误差
    errors = relative_error.view(-1)[indices]
    orig_values1 = tensor1.view(-1)[indices]
    orig_values2 = tensor2.view(-1)[indices]

    return errors, indices, orig_values1, orig_values2

left=torch.load("8611_0aten.silu_backward.default-input-0.pth",map_location="cpu")
right=torch.load("8611_0aten.silu_backward.default-input-1.pth",map_location="cpu")
tensor1=torch.ops.aten.silu_backward.default(left,right).cpu().float().reshape(-1)
out_xpu=torch.ops.aten.silu_backward.default(left.to(device),right.to(device))
tensor2=out_xpu.cpu().float().reshape(-1)

k = 10  # 找出误差最大的K个元素

errors, indices, orig_values1, orig_values2 = largest_k_errors(tensor1, tensor2, k)

print("----------------- torch.ops.aten.silu_backward.default --------------------------")
print(f"left  dtype:{left.dtype} stride:{left.stride} shape:{left.shape} max:{left.max().item()} min:{left.min().item()}")
print(f"right dtype:{right.dtype} shape:{right.shape} max:{right.max().item()} min:{right.min().item()}")

print("Top", k, "errors:")
for i in range(k):
    print(f"Index {indices[i].item()}: Error {errors[i].item()}, "
        f"Original Values: CPU_FP16 = {orig_values1[i].item()}, "
        f"XPU_FP16 = {orig_values2[i].item()}")
EOF
python3 demo.py

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/777853.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【文献解析】Voxelmap——一种自适应体素地图

Efficient and Probabilistic Adaptive Voxel Mapping for Accurate Online LiDAR Odometry 论文地址&#xff1a;https://ieeexplore.ieee.org/stamp/stamp.jsp?tp&arnumber9813516 代码&#xff1a;GitHub - hku-mars/VoxelMap: [RA-L 2022] An efficient and probabili…

2024年7月6日 (周六) 叶子游戏新闻

自动电脑内部录音器AutoAudioRecorder: 是一款免费的自动音频录制软件&#xff0c;可直接将电脑内部所有的声音录制成 mp3/wav 文件&#xff0c;包括音乐、游戏直播、网络会议、聊天通话等音频源。 卸载工具 HiBitUninstaller: Windows上的软件卸载工具 《不羁联盟》制作人&…

Java中的日期时间类详解(Date、DateFormat、Calendar)

1. Date类 1.1 概述 java.util.Date类表示特定的瞬间&#xff0c;精确到毫秒。Date类的构造函数可以把毫秒值转成日期对象。 继续查阅Date类的描述&#xff0c;发现Date拥有多个构造函数&#xff0c;只是部分已经过时&#xff0c;我们重点看以下两个构造函数 1.2 Date类构造…

【踩坑】探究PyTorch中创建稀疏矩阵的内存占用过大的问题

转载请注明出处&#xff1a;小锋学长生活大爆炸[xfxuezhagn.cn] 如果本文帮助到了你&#xff0c;欢迎[点赞、收藏、关注]哦~ 目录 问题复现 原因分析 解决方案 碎碎念 问题复现 创建一个COO格式的稀疏矩阵&#xff0c;根据计算公式&#xff0c;他应该只占用约5120MB的内存&…

54、一维和二维自组织映射(matlab)

1、一维和二维自组织映射原理 一维和二维自组织映射&#xff08;Self-Organizing Maps, SOM&#xff09;是一种无监督的机器学习算法&#xff0c;通过学习输入数据的拓扑结构&#xff0c;将高维输入数据映射到低维的网格结构中&#xff0c;使得相似的输入数据点在映射空间中也…

win7系统快速安装python

下载安装包 建议选择python3.8左右的&#xff0c;我下载的是3.7.8&#xff0c;最新版本的pythonwin7可能不支持 python网址 下拉寻找 安装python 1.双击安装包 更换完地址选择安装(install) 安装完成后点击close即可 测试是否安装成功 1.winr快捷键打开黑窗口输入cmd …

七大排序-冒泡排序,插入排序,希尔排序(一)

目录 排序冒泡排序插入排序冒泡排序和插入排序的对比希尔排序 排序 先写单趟&#xff0c;再写多趟&#xff0c;这样比较好写 排序可以理解为对商品价格的排序&#xff0c;对数字大小的排序&#xff0c;排序再生活中随处可见 冒泡排序 冒泡排序就是两个相邻的数交换&#xff…

跨界客户服务:拓展服务边界,创造更多价值

在当今这个日新月异的商业时代&#xff0c;跨界合作已不再是新鲜词汇&#xff0c;它如同一股强劲的东风&#xff0c;吹散了行业间的壁垒&#xff0c;为企业服务创新开辟了前所未有的广阔天地。特别是在客户服务领域&#xff0c;跨界合作正以前所未有的深度和广度&#xff0c;拓…

mysql 9 新特新

mysql9新特性 新特性Audit Log NotesC API NotesCharacter Set SupportCompilation NotesComponent NotesConfiguration NotesData Dictionary NotesData Type NotesDeprecation and Removal NotesEvent Scheduler NotesJavaScript ProgramsOptimizer NotesPerformance Schema …

微机原理与单片机 知识体系梳理

单片机笔记分享 我个人感觉单片机要记的东西很多&#xff0c;也很琐碎&#xff0c;特别是一些位、寄存器以及相关作用等&#xff0c;非常难以记忆。因此复习时将知识点整理在了一起做成思维导图&#xff0c;希望对大家有所帮助。内容不是很多&#xff0c;可能有些没覆盖全&…

Python人形机踊跃跨栏举重投篮高维数动作算法模型

&#x1f3af;要点 &#x1f3af;运动功能&#xff1a; 1 m / s 1 m / s 1m/s上台阶、站立平衡、 1 m / s 1 m / s 1m/s行走、坐椅子、 5 m / s 5 m / s 5m/s跑步、 1 m / s 1 m / s 1m/s爬行、穿越森林、取物、穿越迷宫、 1 m / s 1 m / s 1m/s上滑梯、 5 m / s 5 m / s 5m/s…

iOS多target时怎么对InfoPlist进行国际化

由于不同target要显示不同的App名称、不同的权限提示语&#xff0c;国际化InfoPlist文件必须创建名称为InfoPlist.strings的文件&#xff0c;那么多个target时怎么进行国际化呢&#xff1f;步骤如下&#xff1a; 一、首先我们在项目根目录创建不同的文件夹对应多个不同的targe…

自然之美无需雕琢

《自然之美&#xff0c;无需雕琢 ”》在这个颜值至上的时代&#xff0c;但在温馨氛围中&#xff0c;单依纯以一种意想不到的方式&#xff0c;为我们诠释了自然之美的真谛。而医生的回答&#xff0c;如同一股清流耳目一新。“我说医生你看我这张脸&#xff0c;有没有哪里要动的。…

09 docker 安装tomcat 详解

目录 一、安装tomcat 1. tomcat镜像的获取 2. docker创建容器实列 3. 访问测试 404错误 4. 解决方案 5. 使用免修改版容器镜像 5.1. 运行实列的创建 5.2. 出现问题及解决&#xff1a; 6. 验证 OK 一、安装tomcat 1. tomcat镜像的获取 docker search tomcat #docker …

最灵活且易用的C++开源串口通信调试软件

这款C开源串口调试软件&#xff0c;集成了丰富的功能&#xff0c;为用户提供高效、便捷的串口通信调试体验。以下是其核心功能亮点&#xff1a; 基础功能&#xff1a; 数据交互自如&#xff1a;支持串口数据的轻松读取与发送&#xff0c;让数据交换变得简单直接。 灵活配置参…

【后端面试题】【中间件】【NoSQL】MongoDB查询优化3(拆分、嵌入文档,操作系统)

拆分大文档 很常见的一种优化手段&#xff0c;在一些特定的业务场景中&#xff0c;会有一些很大的文档&#xff0c;这些文档有很多字段&#xff0c;而且有一些特定的字段还特别的大。可以考虑拆分这些文档 大文档对MongoDB的性能影响还是很大的&#xff0c;就我个人经验而言&…

【TB作品】基于ATmega48的开机登录程序设计

使用Proteus仿真软件设计一个开机登录程序,单片机选用ATmegga48. 基础要求: 1.程序启动后在LCD1602液晶屏上提示用户通过独立按键输入密码(6位)。 2.密码输入错误则在屏幕上提示密码错误,密码输入正确则在屏幕上提示密 码正确后等待约3秒后进入主界面,在屏幕中央显示HelloWorld…

基于RK3588的8路摄像头实时全景拼接

基于RK3588的8路摄像头实时全景拼接 输入&#xff1a;2路csi转8路mpi的ahd摄像头&#xff0c;分辨率1920 * 1080 8路拼接结果&#xff1a; 6路拼接结果&#xff1a; UI界面&#xff1a; UI节目设计原理

数字时代如果你的企业还未上线B端系统助力则后果很严重

**数字时代如果你的企业还未上线B端系统助力则后果很严重** 数字化浪潮席卷全球&#xff0c;企业对于数字化转型的重视程度日益提高。B端系统&#xff0c;作为企业数字化转型的核心组成部分&#xff0c;其重要性不言而喻。如果你的企业还未上线B端系统助力&#xff0c;那么后果…

异步主从复制

主从复制的概念 主从复制是一种在数据库系统中常用的数据备份和读取扩展技术&#xff0c;通过将一个数据库服务器&#xff08;主服务器&#xff09;上的数据变更自动同步到一个或多个数据库服务器&#xff08;从服务器&#xff09;上&#xff0c;以此来实现数据的冗余备份、读…