Fine-tune Google Flan UL2 with Multiple GPUs

In this tutorial, we will go through fine-tune Flan-UL2 combining Peft, LoRA, and Deepspeed on multiple GPUs (single machine). With A100 80GB cards, we can expect the training to finish in 24 hours for 3 epochs.

Flan-UL2 is an encoder-decoder model based on the T5 architecture. It is a 20B parameter model fine-tuned using the “Flan” prompt tuning and dataset collection.

The model is was initialized using UL2 checkpoints. For more information on UL2, please take a look at the original paper.

While focus of this article is to cover use of multiple GPUs on TIR platform, you may run through the same with a single GPU at the cost of training speed.

We can also try to fit the model across GPUs using Deepspeed’s pipeline parallelism but it does require playing with device map mainly because T5 blocks in UL2 have residual connections which can not be split across GPUs. There are workarounds but for the scope of this tutorials we will use Deepspeed’s Zero Offloading feature which will allow us to fine-tune one model copy per GPU through model parallelism.

Steps

[1] Start a GPU Notebook (with 4xA10080 Plan and atleast 100GB disk) from TIR Dashbaord.

[2] Install Requirements. .. code:

!pip install accelerate transformers peft deepspeed

[3] Initiate setup of Accelerate Config. .. code:

accelerate config --config_file launcher_config.yaml

[4] Choose following parameters: .. code:

In which environment are you running?
This machine
Which type of machine are you using?
Multi-GPU
How many different machines will you use? 1
Do you wish to optimise your script with torch dynamo? No
Do you want to use Deepspeed? Yes
Do you want to specify json for deepspeed config? No
What should be your Deepspeed's Zero Optimization stage? 3
Where to offload CPU optimization stages? cpu
where to offload parameters? cpu
How many gradient accumulation steps your are passing to the script? 8
Do you want to use gradient clipping? no
Do you want to save 16-bit model.. ? no
Do you want to enable 'deepspeed.zero.init' ... ? no
How many gpus should be used for training? 4

Do you wish to use FP16 or BF16? no

[5] Prepare dataset.

The fine-tuning script can work with any csv file. here, we will use alpaca dataset. Create a new file in jypter labs with name prepare_alpaca_csv.py. Copy the following contents to this file. .. code:

import json

import pandas as pd

with open('alpaca_data.json') as f:
    data = json.load(f)


new_format = []
for i, point in enumerate(data):
    # no input
    if len(point['input']) == 0:
        inputt = "Below is an instruction that describes a task.\n "
        inputt += "Write a response that appropriately completes the request.\n\n"
        inputt += f"### Instruction:\n{point['instruction']}\n\n### Response:"
    else:
        inputt = "Below is an instruction that describes a task.\n "
        inputt += "Write a response that appropriately completes the request.\n\n"
        inputt += f"### Instruction:\n{point['instruction']}\n\n### Input:\n{point['input']}\n\n### Response:"

    item = {'input': inputt, 'output': str(point['output'])}
    new_format.append(item)

df = pd.DataFrame(new_format)
df = df.dropna()
df.to_csv('alpaca_data.csv')

[6] Open terminal in Jypyter labs. Run the following commands to download alpaca set and prepare a csv from it. .. code:

# download the json alpaca dataset
wget https://raw.githubusercontent.com/tatsu-lab/stanford_alpaca/main/alpaca_data.json

# run in the terminal shell
python prepare_alpaca_csv.py

[7] Prepare fine-tuning script.

Create a file named train.py in jupyter labs (from file browser) and copy the following contents to the file.

# Modified from https://github.com/huggingface/peft/blob/main/examples/conditional_generation/peft_lora_seq2seq_accelerate_ds_zero3_offload.py
import argparse
import gc
import logging
import os
import threading

import psutil
import torch
from accelerate import Accelerator
from datasets import load_dataset
from deepspeed.accelerator import get_accelerator
from peft import LoraConfig, TaskType, get_peft_model
from torch.utils.data import DataLoader
from tqdm import tqdm
from transformers import (AutoModelForSeq2SeqLM, AutoTokenizer,
                        get_linear_schedule_with_warmup, set_seed)


def b2mb(x):
    '''
    Converting Bytes to Megabytes
    '''
    return int(x / 2**20)


class TorchTracemalloc:
    '''
    # Context manager is used to track the peak memory usage of the process
    '''

    def __enter__(self):
        gc.collect()
        torch.cuda.empty_cache()
        torch.cuda.reset_max_memory_allocated()  # reset the peak gauge to zero
        self.begin = torch.cuda.memory_allocated()
        self.process = psutil.Process()

        self.cpu_begin = self.cpu_mem_used()
        self.peak_monitoring = True
        peak_monitor_thread = threading.Thread(target=self.peak_monitor_func)
        peak_monitor_thread.daemon = True
        peak_monitor_thread.start()
        return self

    def cpu_mem_used(self):
        """get resident set size memory for the current process"""
        return self.process.memory_info().rss

    def peak_monitor_func(self):
        self.cpu_peak = -1
        while True:
            self.cpu_peak = max(self.cpu_mem_used(), self.cpu_peak)
            if not self.peak_monitoring:
                break

    def __exit__(self, *exc):
        self.peak_monitoring = False
        gc.collect()
        torch.cuda.empty_cache()
        self.end = torch.cuda.memory_allocated()
        self.peak = torch.cuda.max_memory_allocated()
        self.used = b2mb(self.end - self.begin)
        self.peaked = b2mb(self.peak - self.begin)
        self.cpu_end = self.cpu_mem_used()
        self.cpu_used = b2mb(self.cpu_end - self.cpu_begin)
        self.cpu_peaked = b2mb(self.cpu_peak - self.cpu_begin)


# Handle argument parsing
def parse_args():
    parser = argparse.ArgumentParser()
    parser.add_argument("--model_path", type=str,
                        help="Model path. Supports T5/UL2 models")
    parser.add_argument("--datafile_path", type=str, default="sample.csv",
                        help="Path to the already processed dataset.")
    parser.add_argument("--num_epochs", type=int, default=1,
                        help="Number of epochs to train for.")
    parser.add_argument("--per_device_batch_size", type=int,
                        default=2, help="Batch size to use for training.")
    parser.add_argument("--input_max_length", type=int, default=128,
                        help="Maximum input length to use for generation")
    parser.add_argument("--target_max_length", type=int, default=128,
                        help="Maximum target length to use for generation")
    parser.add_argument("--lr", type=float, default=3e-4,
                        help="Learning rate to use for training.")
    parser.add_argument("--seed", type=int, default=42,
                        help="Seed to use for training.")
    parser.add_argument("--input_column", type=str,
                        default='input', help='csv input text column')
    parser.add_argument("--target_column", type=str,
                        default='output', help='csv target text column')
    parser.add_argument("--save_path", type=str,
                        default='peft_ckpt', help="Save path")

    args = parser.parse_known_args()
    return args


# Main function
def main():
    args, _ = parse_args()
    text_column = args.input_column
    label_column = args.target_column
    lr = args.lr
    num_epochs = args.num_epochs
    batch_size = args.per_device_batch_size
    seed = args.seed
    model_name_or_path = args.model_path
    data_file = args.datafile_path
    save_path = args.save_path
    target_max_length = args.target_max_length
    source_max_length = args.input_max_length

    # Create  dir if it doesn't exist
    os.makedirs(save_path, exist_ok=True)

    # Setup logging
    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s [%(levelname)s] %(message)s",
        handlers=[
            logging.FileHandler(os.path.join(save_path, "training_log.log")),
            logging.StreamHandler()
        ]
    )

    logging.info(f'Args:\n {args}')

    # launch configs
    accelerator = Accelerator()
    peft_config = LoraConfig(
        task_type=TaskType.SEQ_2_SEQ_LM, inference_mode=False, r=8,
        lora_alpha=32, lora_dropout=0.1
    )
    set_seed(seed)

    # Save logs only on the main process
    @accelerator.on_main_process
    def log_info(logging, s):
        logging.info(s)

    # load dataset
    dataset = load_dataset('csv', data_files={'train': data_file})
    log_info(logging, f"Dataset length :{len(dataset['train'])}")
    # load model
    model = AutoModelForSeq2SeqLM.from_pretrained(model_name_or_path)
    # load peft model
    model = get_peft_model(model, peft_config)
    model.print_trainable_parameters()
    # load tokenizer
    tokenizer = AutoTokenizer.from_pretrained(model_name_or_path)

    def preprocess_function(sample, padding="max_length"):
        # created prompted input
        inputs = sample[text_column]
        # tokenize inputs
        model_inputs = tokenizer(
            inputs, max_length=source_max_length,
            padding=padding, truncation=True)
        # Tokenize targets with the `text_target` keyword argument
        labels = tokenizer(text_target=sample[label_column],
                        max_length=target_max_length,
                        padding=padding, truncation=True)
        # If we are padding here, replace all tokenizer.pad_token_id
        # in the labels by -100 when we want to ignore padding in the loss.
        if padding == "max_length":
            labels["input_ids"] = [
                [(l if l != tokenizer.pad_token_id else -100) for l in label] for label in labels["input_ids"]
            ]
        model_inputs["labels"] = labels["input_ids"]
        return model_inputs

    # Prepare and preprocess the dataset
    with accelerator.main_process_first():
        # preventing string conversion errors

        def str_convert(example):
            example[label_column] = str(example[label_column])
            return example

        dataset['train'] = dataset['train'].map(str_convert)
        processed_datasets = dataset.map(
            preprocess_function,
            batched=True,
            num_proc=1,
            remove_columns=dataset["train"].column_names,
            load_from_cache_file=True,
            desc="Running tokenizer on dataset",
        )
    accelerator.wait_for_everyone()
    train_dataset = processed_datasets["train"]

    def collate_fn(examples):
        return tokenizer.pad(examples, padding="longest", return_tensors="pt")

    train_dataloader = DataLoader(
        train_dataset, shuffle=True, collate_fn=collate_fn,
        batch_size=batch_size, pin_memory=True
    )

    # optimizer
    optimizer = torch.optim.AdamW(model.parameters(), lr=lr)

    # lr scheduler
    lr_scheduler = get_linear_schedule_with_warmup(
        optimizer=optimizer,
        num_warmup_steps=0,
        num_training_steps=(len(train_dataloader) * num_epochs),
    )

    # accelerator prepapre
    model, train_dataloader, optimizer, lr_scheduler = accelerator.prepare(
        model, train_dataloader, optimizer, lr_scheduler
    )

    # Train the model
    for epoch in range(num_epochs):
        with TorchTracemalloc() as tracemalloc:
            model.train()
            total_loss = 0
            for step, batch in enumerate(tqdm(train_dataloader)):
                # using accelerator accumulate to perform gradient accumulation
                with accelerator.accumulate(model):
                    outputs = model(**batch)
                    loss = outputs.loss
                    total_loss += loss.detach().float()
                    accelerator.backward(loss)
                    optimizer.step()
                    lr_scheduler.step()
                    optimizer.zero_grad()

                    get_accelerator().empty_cache()

        # Printing the GPU memory usage details
        log_info(logging,
                "GPU Peak Memory consumed during train: {}".format(tracemalloc.peaked))
        log_info(logging,
                "GPU Total Peak Memory consumed during the train: {}".format(
                    tracemalloc.peaked + b2mb(tracemalloc.begin)
                )
                )
        log_info(
            logging, "CPU Peak Memory consumed during the train (max-begin): {}".format(tracemalloc.cpu_peaked))
        log_info(logging,
                "CPU Total Peak Memory consumed during the train (max): {}".format(
                    tracemalloc.cpu_peaked + b2mb(tracemalloc.cpu_begin)
                )
                )

        train_epoch_loss = total_loss / len(train_dataloader)
        train_ppl = torch.exp(train_epoch_loss)
        log_info(
            logging, "........................ : TRAINING DETAILS : .......................")
        log_info(logging, f"{epoch=}: {train_ppl=} {train_epoch_loss=}")

        # save intermediate checkpoint
        log_info(logging, "Saving intermediate ckpt")
        accelerator.wait_for_everyone()
        success = model.save_checkpoint(f'{save_path}', f'{epoch}')
        # save peft config
        peft_config.save_pretrained(os.path.join(f'{save_path}', f'{epoch}'))

        status_msg = f"checkpointing: checkpoint_folder={save_path}"
        if success:
            log_info(logging, f"Success {status_msg}")
        else:
            log_info(logging, f"Failure {status_msg}")

    log_info(logging, "Training complete ......")


if __name__ == "__main__":
    main()

[8] Run the training with accelerate. .. code:

accelerate launch --config_file launcher_config.yaml train.py \
--model_path google/flan-ul2 \
--datafile_path alpaca_data.csv \
--save_path checkpoint
--num_epochs 1 \
--lr 1e-4\
--per_device_batch_size 2 \
--input_max_length 256 \
--target_max_length 256

[9] Prepare checkpoint for inference

Deepspeed will create sharded model files and a zero_to_fp32.py in the checkpoint folder. Run the following command to convert the checkpoints to Pytorch bin file. .. code:

# replace 0 with the latest checkpoints number
python ./checkpoint/zero_to_fp32.py ./checkpoint/ ./checkpoint/0/adapter_model.bin

[10] Load fine-tuned model for inference. .. code:

# Start a new notebook to run this code from a notebook cell.
from transformers import AutoModelForSeq2SeqLM
from peft import PeftModel, PeftConfig

peft_model_id = 'checkpoint\0'
base_model_id = 'Google\flan-ul2'

model = AutoModelForSeq2SeqLM.from_pretrained(base_model_id) # The original model path
model = PeftModel.from_pretrained(model, peft_model_id) # The fine-tuned model path

You may also further merge the base weights with LoRA checkpoints.

Conclusion

In this tutorial, we have fine-tuned a large language model with multiple GPUs using Deepspeed and LoRA. A similar apporach can be followed for fine-tuning other models.