diff --git a/README_YC.md b/README_YC.md new file mode 100644 index 000000000..92d30930f --- /dev/null +++ b/README_YC.md @@ -0,0 +1,54 @@ + +## Step 1: Format the data +python /opt/NeMo-Aligner/examples/nlp/data/steerlm/preprocess_openassistant_data.py --output_directory=data/oasst + +## Step 2: Run SFT training + +export WANDB_DISABLED=true +export NCCL_IB_DISABLE=1 # 禁用 InfiniBand,如果通信出错时可尝试 +export NCCL_P2P_DISABLE=1 # 禁用 P2P 传输,排查问题时有用 + +export NCCL_DEBUG=INFO +export TMPDIR=/mnt/workspace/yangchao.zhou/opt/models/tmp +MODEL="/mnt/workspace/yangchao.zhou/opt/models/Mistral-NeMo-12B-Instruct/Mistral-NeMo-12B-Instruct.nemo" +TRAIN_DS="/mnt/workspace/yangchao.zhou/opt/data/oasst/train.jsonl" +VALID_DS="/mnt/workspace/yangchao.zhou/opt/data/oasst/val.jsonl" +RESULTS="/mnt/workspace/yangchao.zhou/opt/RESULTS/7B" + + +python examples/nlp/gpt/train_gpt_sft4linky.py \ + trainer.precision=bf16 \ + trainer.num_nodes=1 \ + trainer.devices=8 \ + trainer.sft.max_steps=-1 \ + trainer.sft.limit_val_batches=40 \ + trainer.sft.val_check_interval=1000 \ + model.tensor_model_parallel_size=1 \ + model.pipeline_model_parallel_size=8 \ + model.megatron_amp_O2=True \ + model.activations_checkpoint_granularity=selective\ + model.restore_from_path=${MODEL} \ + model.optim.lr=5e-6 \ + model.data.chat=True \ + model.data.num_workers=0 \ + model.data.train_ds.micro_batch_size=1 \ + model.data.train_ds.global_batch_size=8 \ + model.data.train_ds.max_seq_length=1024 \ + model.data.train_ds.file_path=${TRAIN_DS} \ + model.data.validation_ds.micro_batch_size=1 \ + model.data.validation_ds.global_batch_size=8 \ + model.data.validation_ds.file_path=${VALID_DS} \ + model.data.validation_ds.max_seq_length=1024 \ + exp_manager.create_wandb_logger=False \ + exp_manager.explicit_log_dir=${RESULTS} \ + exp_manager.wandb_logger_kwargs.project=sft_run \ + exp_manager.wandb_logger_kwargs.name=chat_sft_run \ + exp_manager.checkpoint_callback_params.save_nemo_on_train_end=True \ + exp_manager.resume_if_exists=True \ + exp_manager.resume_ignore_no_checkpoint=True \ + exp_manager.create_checkpoint_callback=True \ + exp_manager.checkpoint_callback_params.monitor=validation_loss + +### 杀掉进程 +ps -ef | grep train_gpt_sft4linky +pkill -f train_gpt_sft4linky.py \ No newline at end of file diff --git a/examples/nlp/gpt/conf/gpt_sft4linky.yaml b/examples/nlp/gpt/conf/gpt_sft4linky.yaml new file mode 100644 index 000000000..bdd757f31 --- /dev/null +++ b/examples/nlp/gpt/conf/gpt_sft4linky.yaml @@ -0,0 +1,204 @@ +name: megatron_gpt_sft + +trainer: + num_nodes: 1 + devices: 1 + accelerator: gpu + precision: bf16 + + sft: + max_epochs: 1 + max_steps: -1 + + val_check_interval: 100 + save_interval: ${.val_check_interval} + limit_train_batches: 1.0 + + limit_val_batches: 1.0 + gradient_clip_val: 1.0 + + # can be used to register any custom metrics that require token-by-token generation + # inference_metrics: + # my_metric_name1: + # _target_: + # my_metric_name2: + # _target_: + # + + # do not change these + logger: False # logger provided by exp_manager + enable_checkpointing: False + use_distributed_sampler: False + max_time: null + max_epochs: ${.sft.max_epochs} + max_steps: ${.sft.max_steps} + +exp_manager: + explicit_log_dir: null + exp_dir: null + name: ${name} + create_wandb_logger: False + wandb_logger_kwargs: + project: null + name: null + resume_if_exists: True + resume_ignore_no_checkpoint: True + create_checkpoint_callback: True + checkpoint_callback_params: + monitor: val_loss + save_top_k: 5 + mode: min + save_nemo_on_train_end: False + filename: 'megatron_gpt_sft--{${.monitor}:.3f}-{step}-{consumed_samples}-{epoch}' + model_parallel_size: ${model.tensor_model_parallel_size} + save_best_model: False # need to keep this false otherwise it will create multiple last.ckpt files because restore reset the previous best model + +model: + seed: 1234 + tensor_model_parallel_size: 1 # intra-layer model parallelism + pipeline_model_parallel_size: 1 # inter-layer model parallelism + restore_from_path: ??? # Path to an existing p-tuned/prompt tuned .nemo model you wish to add new tasks to or run inference with + resume_from_checkpoint: null # The path to a checkpoint file to continue the training, restores the whole state including the epoch, step, LR schedulers, apex, etc. + save_nemo_on_validation_end: True # Saves an inference ready .nemo file every time a checkpoint is saved during training. + sync_batch_comm: False + megatron_amp_O2: False + encoder_seq_length: 4096 # the sequence length of the encoder model, it will be overwriten by loaded GPT model + + ## Sequence Parallelism + # Makes tensor parallelism more memory efficient for LLMs (20B+) by parallelizing layer norms and dropout sequentially + # See Reducing Activation Recomputation in Large Transformer Models: https://arxiv.org/abs/2205.05198 for more details. + sequence_parallel: False + + ## Activation Checkpoint + activations_checkpoint_granularity: null # 'selective' or 'full' + activations_checkpoint_method: null # 'uniform', 'block', not used with 'selective' + # 'uniform' divides the total number of transformer layers and checkpoints the input activation + # of each chunk at the specified granularity + # 'block' checkpoints the specified number of layers per pipeline stage at the specified granularity + activations_checkpoint_num_layers: null # not used with 'selective' + activations_checkpoint_layers_per_pipeline: null + # This feature is valid only when used with pipeline-model-parallelism. More details in megatron_gpt_config.yaml. + answer_only_loss: False # not used right now + gradient_as_bucket_view: False + seq_len_interpolation_factor: null # if not None, seq_len_interpolation_factor will match the base model's value + use_flash_attention: null # if not None, will match the base model's value + + hidden_dropout: 0.0 + attention_dropout: 0.0 + ffn_dropout: 0.0 + + steerlm2: + forward_micro_batch_size: 1 # the micro batch size for the forward pass, used to compute the weights + micro_batch_size: 1 # the steerlm2 training micro batch size + + # can be used to customize behavior of model.generate for inference metrics + # note that you have to specify all parameters explicitly even if they match defaults + # as long as you change at least one parameter + # + # inference: + # sampling_params: + # use_greedy: False + # temperature: 0.7 + # top_k: 0 + # top_p: 0.95 + # repetition_penalty: 1.0 + # add_BOS: True + # all_probs: False + # compute_logprob: False + # end_strings: ["<|endoftext|>", ""] + # length_params: + # min_length: 0 + # max_length: 512 + # strategy: + # _target_: + # + + + peft: + peft_scheme: "none" # ["lora", "none"] + restore_from_path: null + + lora_tuning: + target_modules: ['attention_qkv'] # this can either be 'attention_qkv','attention_dense','mlp_fc1','mlp_fc2', 'attention' (qkv & dense), 'mlp' (fc1 & fc2), 'all' + adapter_dim: 32 + adapter_dropout: 0.0 + column_init_method: 'xavier' # IGNORED if linear_adapter is used, options: xavier, zero or normal + row_init_method: 'zero' # IGNORED if linear_adapter is used, options: xavier, zero or normal + layer_selection: null # selects in which layers to add lora adapters. e.g. [1,12] will add lora to layer 1 (lowest) and 12. null will apply adapters to all layers + weight_tying: False + position_embedding_strategy: null # used only when weight_tying is True + + + data: + chat: False # whether use chatbot data or not + chat_prompt_tokens: # special tokens for the chat prompts, a dictionary of {token_type: token}. note that some tokenizer may combine the characters at the junction between {end_of_turn}{turn_start}. e.g. '', the '><' sometimes is merged to be a single token. This is not supported, try to avoid + system_turn_start: "\x00" + turn_start: "\x11" + label_start: "\x12" + end_of_turn: "\x0A" # \0x0A is '\n' + end_of_name: "\x0A" # \0x0A is '\n' + sample: False # create the index mapping files for the sample data, so max_steps * global_batch_size can be larger than the dataset size + num_workers: 0 + train_ds: + # Example of how to specify paths to multiple datasets + # file_names: + # - /path/to/squad.jsonl + # - /path/to/mnli.jsonl + # - /path/to/boolq.jsonl + # Example of how each dataset is formatted + # {'input': 'John von Neumann\nVon Neumann made fundamental contributions .... Q: What did the math of artificial viscosity do?', 'output': 'smoothed the shock transition without sacrificing basic physics'} + file_path: ??? # Path to a JSONL file corresponding to the source data. Data format is identical to validation_ds. + global_batch_size: 128 + micro_batch_size: 1 + shuffle: True + memmap_workers: null + max_seq_length: ${model.encoder_seq_length} + min_seq_length: 1 + drop_last: True # note that `False` is not currently supported + # Example of how to specify concat_sampling_probabilities + # concat_sampling_probabilities: + # - 0.5 + # - 0.25 + # - 0.25 + label_key: 'output' + add_eos: True + add_sep: False + add_bos: False + truncation_field: "input" # # Can be multiple keys separated with ',' Options: keys in prompt_template + index_mapping_dir: null # Path to a directory to write index mapping files. + prompt_template: "{input} {output}" # fstring to use for assistant prompt. Example: "Q: {input}\nA: {output}" + hf_dataset: False # Whether to load the json file with the HuggingFace dataset. otherwise, will load the jsonl file with the JSONLMemMapDataset. + truncation_method: 'right' # Truncation from which position, Options: ['left', 'right'] + + validation_ds: + file_path: ??? # Path to a JSONL file corresponding to the source data. Data format is identical to validation_ds. + global_batch_size: ${model.data.train_ds.global_batch_size} + micro_batch_size: ${model.data.train_ds.micro_batch_size} + shuffle: False + memmap_workers: ${model.data.train_ds.memmap_workers} + max_seq_length: ${model.data.train_ds.max_seq_length} + min_seq_length: 1 + drop_last: True # note that `False` is not currently supported + label_key: ${model.data.train_ds.label_key} + add_eos: ${model.data.train_ds.add_eos} + add_sep: ${model.data.train_ds.add_sep} + add_bos: ${model.data.train_ds.add_bos} + truncation_field: ${model.data.train_ds.truncation_field} # Options: keys in prompt_template + index_mapping_dir: null # Path to a directory to write index mapping files. + prompt_template: ${model.data.train_ds.prompt_template} # fstring to use for assistant prompt. Example: "Q: {input}\nA: {output}" + hf_dataset: False # Whether to load the json file with the HuggingFace dataset. otherwise, will load the jsonl file with the JSONLMemMapDataset. + truncation_method: 'right' # Truncation from which position, Options: ['left', 'right'] + output_original_text: True # needed for the proper metrics support + + optim: + name: distributed_fused_adam # Supports distributed optimizer for memory savings. To enable, set to 'distributed_fused_adam'. Needs Apex to be built with specific args to work. + lr: 3e-5 + weight_decay: 0.01 + betas: + - 0.9 + - 0.98 + sched: + name: CosineAnnealing + warmup_steps: 10 + constant_steps: 1000 + min_lr: 9e-7 diff --git a/examples/nlp/gpt/train_gpt_sft4linky.py b/examples/nlp/gpt/train_gpt_sft4linky.py new file mode 100644 index 000000000..6fd699ab4 --- /dev/null +++ b/examples/nlp/gpt/train_gpt_sft4linky.py @@ -0,0 +1,243 @@ +# Copyright (c) 2023, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import torch.multiprocessing as mp +from omegaconf.omegaconf import OmegaConf, open_dict + +from nemo.collections.nlp.data.language_modeling.megatron.gpt_sft_chat_dataset import get_prompt_template_example +from nemo.collections.nlp.data.language_modeling.megatron.megatron_batch_samplers import ( + MegatronPretrainingBatchSampler, +) +from nemo.collections.nlp.models.language_modeling.megatron_gpt_model import MegatronGPTModel +from nemo.collections.nlp.parts.megatron_trainer_builder import MegatronTrainerBuilder +from nemo.core.config import hydra_runner +from nemo.utils import logging +from nemo.utils.exp_manager import exp_manager +from nemo_aligner.algorithms.supervised import SupervisedTrainer +from nemo_aligner.data.nlp.builders import build_dataloader, build_sft_dataset +from nemo_aligner.models.nlp.gpt.gpt_sft_model import GPTSFTModel +from nemo_aligner.utils.distributed import Timer +from nemo_aligner.utils.train_script_utils import ( + CustomLoggerWrapper, + add_custom_checkpoint_callback, + extract_optimizer_scheduler_from_ptl_model, + init_distributed, + init_peft, + init_using_ptl, + resolve_and_create_trainer, + retrieve_custom_trainer_state_dict, +) +from nemo_aligner.utils.utils import load_from_nemo + +"""Script to start SFT training""" + +OmegaConf.register_new_resolver("multiply", lambda x, y: x * y, replace=True) +OmegaConf.register_new_resolver("int_div", lambda x, y: x // y, replace=True) + +mp.set_start_method("spawn", force=True) + + +def _modify_config(gpt_cfg, cfg, add_cfg_to_tree=False): + """ + This function modifies the original gpt pre-training config (gpt_cfg) with attributes from the finetuning config (cfg). + The `add_cfg_to_tree` arg adds `cfg` to the top of the yaml tree which is needed for all `hparams.yaml` files when passed as an arg to `load_from_checkpoint()`. + """ + OmegaConf.set_struct(gpt_cfg, True) + OmegaConf.resolve(cfg) + with open_dict(gpt_cfg): + gpt_cfg.megatron_amp_O2 = cfg.model.get("megatron_amp_O2", False) + gpt_cfg.micro_batch_size = cfg.model.data.train_ds.micro_batch_size + gpt_cfg.global_batch_size = cfg.model.data.train_ds.global_batch_size + gpt_cfg.sequence_parallel = cfg.model.get("sequence_parallel", False) + gpt_cfg.activations_checkpoint_granularity = cfg.model.get("activations_checkpoint_granularity", None) + gpt_cfg.activations_checkpoint_num_layers = cfg.model.get("activations_checkpoint_num_layers", None) + gpt_cfg.activations_checkpoint_method = cfg.model.get("activations_checkpoint_method", None) + gpt_cfg.activations_checkpoint_layers_per_pipeline = cfg.model.get( + "activations_checkpoint_layers_per_pipeline", None + ) + gpt_cfg.peft = cfg.model.peft + gpt_cfg.data = cfg.model.data + gpt_cfg.optim = cfg.model.optim + gpt_cfg.precision = cfg.trainer.precision + gpt_cfg.answer_only_loss = cfg.model.answer_only_loss + gpt_cfg.restore_from_path = cfg.model.restore_from_path + gpt_cfg.resume_from_checkpoint = cfg.model.resume_from_checkpoint + gpt_cfg.save_nemo_on_validation_end = cfg.model.save_nemo_on_validation_end + gpt_cfg.gradient_as_bucket_view = cfg.model.gradient_as_bucket_view + gpt_cfg.hidden_dropout = cfg.model.get("hidden_dropout", 0.0) + gpt_cfg.attention_dropout = cfg.model.get("attention_dropout", 0.0) + gpt_cfg.ffn_dropout = cfg.model.ffn_dropout + gpt_cfg.use_flash_attention = cfg.model.get("use_flash_attention", False) + # if TP/PP size is -1, use default TP/PP size as original model + if cfg.model.get("tensor_model_parallel_size", 1) > 0: + gpt_cfg.tensor_model_parallel_size = cfg.model.get("tensor_model_parallel_size", 1) + if cfg.model.get("pipeline_model_parallel_size", 1) > 0: + gpt_cfg.pipeline_model_parallel_size = cfg.model.get("pipeline_model_parallel_size", 1) + gpt_cfg.pipeline_model_parallel_split_rank = cfg.model.get("pipeline_model_parallel_split_rank", 0) + + if cfg.model.data.get("chat", False): + # chat model, overwrite the prompt template + prompt_template = get_prompt_template_example(cfg.model.data.chat_prompt_tokens) + gpt_cfg.data.train_ds.prompt_template = prompt_template + gpt_cfg.data.validation_ds.prompt_template = prompt_template + + sft_cls = GPTSFTModel + gpt_cfg.target = f"{sft_cls.__module__}.{sft_cls.__name__}" + + if cfg.model.get("use_flash_attention", None) is not None: + gpt_cfg.use_flash_attention = cfg.model.use_flash_attention + + if cfg.model.get("seq_len_interpolation_factor", None) is not None: + gpt_cfg.seq_len_interpolation_factor = cfg.model.seq_len_interpolation_factor + + gpt_cfg.inference = cfg.model.get("inference", {}) + + # This is needed when modifying a hparam file directly to load `.ckpt` files. + # This is not needed to modify the cfg in `.nemo` files. + if add_cfg_to_tree: + OmegaConf.resolve(gpt_cfg) + gpt_cfg.cfg = gpt_cfg + + return gpt_cfg + + +@hydra_runner(config_path="conf", config_name="gpt_sft4linky") +def main(cfg) -> None: + logging.info("\n\n************** Experiment configuration ***********") + logging.info(f"\n{OmegaConf.to_yaml(cfg)}") + + trainer = resolve_and_create_trainer(cfg, "sft") + exp_manager(trainer, cfg.exp_manager) + logger = CustomLoggerWrapper(trainer.loggers) + + # hydra interpolation does not work here as the interpolation key is lost when PTL saves hparams + with open_dict(cfg): + cfg.model.precision = cfg.trainer.precision + + ptl_model, updated_cfg = load_from_nemo( + GPTSFTModel, + cfg, + trainer, + strict=True, + modify_config_fn=_modify_config, + restore_path=cfg.model.restore_from_path, + return_updated_cfg=True, + ) + + init_peft(ptl_model, updated_cfg) + + with open_dict(cfg): + # overwrite the model config with the config from the checkpoint + cfg.model.encoder_seq_length = ptl_model.cfg.encoder_seq_length + + # pull values from checkpoint + trainer_restore_path = trainer.ckpt_path + + # TODO: log this restore path + if trainer_restore_path is not None: + custom_trainer_state_dict = retrieve_custom_trainer_state_dict(trainer) + consumed_samples = custom_trainer_state_dict["consumed_samples"] + else: + custom_trainer_state_dict = None + consumed_samples = 0 + + init_distributed(trainer, ptl_model, cfg.model.get("transformer_engine", False)) + + train_data_cfg = cfg.model.data.train_ds + val_data_cfg = cfg.model.data.validation_ds + + if cfg.model.data.get("sample", False): + # if it is negative, num_samples is None + if cfg.trainer.sft.max_steps < 0: + num_samples = None + else: + num_samples = cfg.trainer.sft.max_steps * train_data_cfg.global_batch_size + else: + num_samples = None + train_ds = build_sft_dataset( + train_data_cfg, + ptl_model.tokenizer, + num_samples, + answer_only_loss=True, + is_chat=cfg.model.data.chat, + special_tokens=cfg.model.data.chat_prompt_tokens, + ) + if cfg.model.data.get("sample", False): + num_samples = cfg.trainer.sft.limit_val_batches * val_data_cfg.global_batch_size + else: + num_samples = None + validation_ds = build_sft_dataset( + val_data_cfg, + ptl_model.tokenizer, + num_samples, + answer_only_loss=True, + is_chat=cfg.model.data.chat, + special_tokens=cfg.model.data.chat_prompt_tokens, + ) + + train_dataloader = build_dataloader( + cfg=cfg, + dataset=train_ds, + consumed_samples=consumed_samples, + mbs=train_data_cfg.micro_batch_size, + gbs=train_data_cfg.global_batch_size, + collate_fn=train_ds.collate_fn, + drop_last=train_data_cfg.drop_last, + pad_samples_to_global_batch_size=not train_data_cfg.drop_last, + load_gbs=True, + ) + + val_dataloader = build_dataloader( + cfg=cfg, + dataset=validation_ds, + consumed_samples=0, + mbs=val_data_cfg.micro_batch_size, + gbs=val_data_cfg.global_batch_size, + collate_fn=validation_ds.collate_fn, + drop_last=val_data_cfg.drop_last, + pad_samples_to_global_batch_size=not val_data_cfg.drop_last, + load_gbs=True, + use_random_sampler=False, + ) + + init_using_ptl(trainer, ptl_model, train_dataloader, train_ds) + optimizer, scheduler = extract_optimizer_scheduler_from_ptl_model(ptl_model) + + ckpt_callback = add_custom_checkpoint_callback(trainer, ptl_model) + + logger.log_hyperparams(OmegaConf.to_container(cfg)) + timer = Timer(cfg.exp_manager.get("max_time_per_run")) + + sft_trainer = SupervisedTrainer( + cfg=cfg.trainer.sft, + model=ptl_model, + optimizer=optimizer, + scheduler=scheduler, + train_dataloader=train_dataloader, + val_dataloader=val_dataloader, + test_dataloader=None, + logger=logger, + ckpt_callback=ckpt_callback, + run_timer=timer, + ) + + if custom_trainer_state_dict is not None: + sft_trainer.load_state_dict(custom_trainer_state_dict) + + sft_trainer.fit() + + +if __name__ == "__main__": + main()