transformers 核心源码梳理
Trainer部分:
__init__() 初始化函数:
def __init__(xxx):if args is None:output_dir = "tmp_trainer"args = TrainingArguments(output_dir=output_dir) self.args = argsself.compute_loss_func = compute_loss_func# Seed must be set before instantiating the model when using modelenable_full_determinism(self.args.seed) if self.args.full_determinism else set_seed(self.args.seed)self.hp_name = Noneself.deepspeed = Noneself.is_in_train = Falseself.model = modelself.create_accelerator_and_postprocess() # 初始化加速相关参数如: self.accelerator(Accelerator)self._memory_tracker = TrainerMemoryTracker(self.args.skip_memory_metrics) # cpu/gpu监控器初始化,貌似需要先安装了 psutil 模块 self._memory_tracker.start()# set the correct log level depending on the nodelog_level = args.get_process_log_level()logging.set_verbosity(log_level)# force device and distributed setup init explicitlyargs._setup_devicesif model is None:if model_init is not None: # 每次调train()生效的模型初始化函数。self.model_init = model_initmodel = self.call_model_init()xxxxx if model.__class__.__name__ in MODEL_MAPPING_NAMES: # 模型类名不能是已有的如albert raise ValueError(xxx)if getattr(model, "is_parallelizable", False) and getattr(model, "model_parallel", False):self.is_model_parallel = True if getattr(model, "hf_device_map", None) is not None:devices = [device for device in set(model.hf_device_map.values()) if device not in ["cpu", "disk"]]if len(devices) > 1:self.is_model_parallel = Trueelif len(devices) == 1:self.is_model_parallel = self.args.device != torch.device(devices[0])else:self.is_model_parallel = False# warn usersif self.is_model_parallel:logger.info("You have loaded a model on multiple GPUs. `is_model_parallel` attribute will be force-set"" to `True` to avoid any unexpected behavior such as device placement mismatching.")if self.args.use_liger_kernel: # Patch the model with liger kernels. xxx self.is_fsdp_xla_enabled = args.fsdp_config["xla"] # 核心功能是模型分片后存放各分片到不同的GPU上,而不是默认的一块GPU存放一个完整的模型,FSDP主要是为了解决传统数据并行(如DDP)在训练大模型时显存占用过高的问题# one place to sort out whether to place the model on device or not# postpone switching model to cuda when:# 1. MP - since we are trying to fit a much bigger than 1 gpu model# 2. fp16-enabled DeepSpeed loads the model in half the size and it doesn't need .to() anyway,# and we only use deepspeed for training at the moment# 3. full bf16 or fp16 eval - since the model needs to be cast to the right dtype first# 4. FSDP - same as MPif (self.is_model_parallelor self.is_deepspeed_enabledor ((args.fp16_full_eval or args.bf16_full_eval) and not args.do_train)or self.is_fsdp_xla_enabledor self.is_fsdp_enabled):self.place_model_on_device = Falsedefault_collator = (DataCollatorWithPadding(processing_class)if processing_class is not Noneand isinstance(processing_class, (PreTrainedTokenizerBase, SequenceFeatureExtractor))else default_data_collator)self.data_collator = data_collator if data_collator is not None else default_collatorself.train_dataset = train_datasetself.eval_dataset = eval_datasetself.processing_class = processing_classxxxx # later use `self.model is self.model_wrapped` to check if it's wrapped or notself.model_wrapped = modelself.model = modelself.neftune_noise_alpha = args.neftune_noise_alphaself.compute_metrics = compute_metricsself.preprocess_logits_for_metrics = preprocess_logits_for_metricsself.optimizer, self.lr_scheduler = optimizersself.optimizer_cls_and_kwargs = optimizer_cls_and_kwargsdefault_callbacks = DEFAULT_CALLBACKS + get_reporting_integration_callbacks(self.args.report_to)callbacks = default_callbacks if callbacks is None else default_callbacks + callbacksself.callback_handler = CallbackHandler(callbacks, self.model, self.processing_class, self.optimizer, self.lr_scheduler)self.add_callback(PrinterCallback if self.args.disable_tqdm else DEFAULT_PROGRESS_CALLBACK)# Will be set to True by `self._setup_loggers()` on first call to `self.log()`.self._loggers_initialized = False# Label smoothingif self.args.label_smoothing_factor != 0:self.label_smoother = LabelSmoother(epsilon=self.args.label_smoothing_factor)self.control = TrainerControl() # 训练过程控制类,存储一些状态控制参数,与TrainerCallback搭配使用self.state = TrainerState( # 训练状态记录类,每次梯度更新时生效(gradient_accumulation_steps=1时每个batch后生效一次)is_local_process_zero=self.is_local_process_zero(),is_world_process_zero=self.is_world_process_zero(),stateful_callbacks=[cb for cb in self.callback_handler.callbacks + [self.control] if isinstance(cb, ExportableState)],)# Internal variable to count flos in each process, will be accumulated in `self.state.total_flos` then# returned to 0 every time flos need to be loggedself.current_flos = 0self.hp_search_backend = None default_label_names = find_labels(self.model.__class__) # 找label,基于torch的模型按forward()后返回 label 与否self.label_names = default_label_names if self.args.label_names is None else self.args.label_namesself.can_return_loss = can_return_loss(self.model.__class__) # 基于 forward()返回里包含 return_loss 与否self.control = self.callback_handler.on_init_end(self.args, self.state, self.control)# Internal variables to help with automatic batch size reductionself._train_batch_size = args.train_batch_sizeself._created_lr_scheduler = False# very lastself._memory_tracker.stop_and_update_metrics()self.is_fsdp_xla_v2_enabled = args.fsdp_config.get("xla_fsdp_v2", False)self.is_fsdp_xla_v1_enabled = self.is_fsdp_xla_enabled and not self.is_fsdp_xla_v2_enabled
train()函数:
train(self,resume_from_checkpoint: Optional[Union[str, bool]] = None,trial: Union["optuna.Trial", dict[str, Any], None] = None,ignore_keys_for_eval: Optional[list[str]] = None,**kwargs,): # 内部核心实现代码 self._memory_tracker.start()args = self.argsself.is_in_train = True# This might change the seed so needs to run first.self._hp_search_setup(trial) # 超参搜索相关 # Model re-init 每次train()中重新执行 model_init model_reloaded = Falseif self.model_init is not None:# Seed must be set before instantiating the model when using model_init.enable_full_determinism(self.args.seed) if self.args.full_determinism else set_seed(self.args.seed)self.model = self.call_model_init(trial)model_reloaded = True# Reinitializes optimizer and schedulerself.optimizer, self.lr_scheduler = None, None# Load potential model checkpoint: 优先级比 model_init 更高if isinstance(resume_from_checkpoint, bool) and resume_from_checkpoint:resume_from_checkpoint = get_last_checkpoint(args.output_dir)if resume_from_checkpoint is not None:if not is_sagemaker_mp_enabled() and not self.is_deepspeed_enabled and not self.is_fsdp_enabled:self._load_from_checkpoint(resume_from_checkpoint)# If model was re-initialized, put it on the right device and update self.model_wrappedif model_reloaded:if self.place_model_on_device:self._move_model_to_device(self.model, args.device)self.model_wrapped = self.modelinner_training_loop = find_executable_batch_size(self._inner_training_loop, self._train_batch_size, args.auto_find_batch_size)return inner_training_loop( # 该函数找到显存可承受的batch调用_inner_training_loop()训练args=args,resume_from_checkpoint=resume_from_checkpoint,trial=trial,ignore_keys_for_eval=ignore_keys_for_eval,)
_inner_training_loop()函数:
def _inner_training_loop(self, batch_size=None, args=None, resume_from_checkpoint=None, trial=None, ignore_keys_for_eval=None): # 核心代码展开self.accelerator.free_memory()self._train_batch_size = batch_sizetrain_dataloader = self.get_train_dataloader() # 数据加载封装total_train_batch_size = self._train_batch_size * args.gradient_accumulation_steps * args.world_size (num_train_epochs, num_update_steps_per_epoch, xxx) = self.set_initial_training_values(args, train_dataloader, total_train_batch_size) # 确定具体的训练数据num_train_tokens = None # 记录总训练的token数量if self.is_deepspeed_enabled:self.optimizer, self.lr_scheduler = deepspeed_init(self, num_training_steps=max_steps)self.state = TrainerState(xxx) # 记录当前训练状态if args.gradient_checkpointing:self.model.gradient_checkpointing_enable(gradient_checkpointing_kwargs=args.gradient_checkpointing_kwargs)model = self._wrap_model(self.model_wrapped)use_accelerator_prepare = True if model is self.model else Falseif delay_optimizer_creation:xxxself.create_optimizer_and_scheduler(num_training_steps=max_steps) # 设置优化器和lr调度器