2023-09-21

エラーでハマったので、hugging faceのdatasetからbatche_sizeごとのinput_idsやlabelsにするあたりの実装、特にDataLoaderとDataCollatorあたりをちゃんと確認しておく

train loopは以下から始まる

def train(
    self,
    resume_from_checkpoint: Optional[Union[str, bool]] = None,
    trial: Union["optuna.Trial", Dict[str, Any]] = None,
    ignore_keys_for_eval: Optional[List[str]] = None,
    **kwargs,
):

train loopの中で主な部分は_inner_training_loop関数となる

def _inner_training_loop(
    self, batch_size=None, args=None, resume_from_checkpoint=None, trial=None, ignore_keys_for_eval=None
):
    self.accelerator.free_memory()
    self._train_batch_size = batch_size
    logger.debug(f"Currently training with a batch size of: {self._train_batch_size}")
    # Data loader and number of training steps
    train_dataloader = self.get_train_dataloader()

train_dataloaderがdatasetをbatch sizeに揃えたり、paddingを行う
https://github.com/huggingface/transformers/blob/v4.30.2/src/transformers/trainer.py#L1659

DataLoader

dataloaderは、以下のようにインスタンス化される

return DataLoader(
            train_dataset,
            batch_size=self._train_batch_size,
            sampler=train_sampler,
            collate_fn=data_collator,
            drop_last=self.args.dataloader_drop_last,
            num_workers=self.args.dataloader_num_workers,
            pin_memory=self.args.dataloader_pin_memory,
            worker_init_fn=seed_worker,
        )

例として、以下のような配列をtransfomers.Trainerの引数として渡すとfrom torch.utils.data import DataLoaderがreturnされる

[
  {
    'input_ids': [0, 0, 0, 0, 0...],
    'labels': [0, 0, 0, 0, 0...],
    'attention_mask': [0, 0, 0, 0, 0...]
  }, 
  {
    'input_ids': [0, 0, 0, 0, 0...],
    'labels': [0, 0, 0, 0, 0...],
    'attention_mask': [0, 0, 0, 0, 0...]
  }
]

※ 配列のまま使うのではなく、DataSet Classとして扱う方が都合良いです。
train_data = Dataset.from_list(配列データ)のような感じ

DataLoaderはイテレーターとして、train.pyでは以下のように使われる。
epoch_iteratorはdataloaderから作られるDataLoader型

このときのループ変数であるinputsは、input_idsとlabelsとattention_maskでbatchごとにまとめたものとなる

{
  'input_ids': tensor(batch_size, embedding_len),
  'labels': tensor(batch_size, embedding_len),
  'attention_mask': tensor(batch_size, embedding_len)
}

DataLoaderに引数として渡したdata_collatorが1 loopごとに呼び出される。
このdata_collatorはtransformers.Trainerに渡したものが使われる。
例えば、data_collatorは以下のようになる。

data_collator=DataCollatorForSeq2Seq(
            tokenizer, 
            pad_to_multiple_of=None,
            return_tensors="pt",
            padding=True, 
            label_pad_token_id=tokenizer.pad_token_id,
        ),

DataLoaderの実装

どのようにbatch_sizeごとのデータをとりだしているか確認する

DataLoader Classの実装は以下

iterator部分の実装は以下

    def __iter__(self) -> '_BaseDataLoaderIter':
        # When using a single worker the returned iterator should be
        # created everytime to avoid reseting its state
        # However, in the case of a multiple workers iterator
        # the iterator is only created once in the lifetime of the
        # DataLoader object so that workers can be reused
        if self.persistent_workers and self.num_workers > 0:
            if self._iterator is None:
                self._iterator = self._get_iterator()
            else:
                self._iterator._reset(self)
            return self._iterator
        else:
            return self._get_iterator()

  いくつかの分岐があるが、よくあるパターンでは最後の_get_iterator()が呼び出される

return self._get_iterator()

self._get_iterator()により、_SingleProcessDataLoaderIterがiteratorの実装として返される。
_SingleProcessDataLoaderIter_BaseDataLoaderIterを継承したもの

 def _get_iterator(self) -> '_BaseDataLoaderIter':
     ...
     return _SingleProcessDataLoaderIter(self)

_SingleProcessDataLoaderIterの実装は以下を参照

class _SingleProcessDataLoaderIter(_BaseDataLoaderIter):
    def __init__(self, loader):

iteratorの1要素の作成は、_SingleProcessDataLoaderIterself._dataset_fetcherによって作成される。

self._dataset_fetcher = _DatasetKind.create_fetcher(
        self._dataset_kind, 
        self._dataset, 
        self._auto_collation, 
        self._collate_fn, 
        self._drop_last)

fetcherを使ってindexをもとにデータを取り出す。

def _next_data(self):
    index = self._next_index()  # may raise StopIteration
    data = self._dataset_fetcher.fetch(index)  # may raise StopIteration

ここのindexについては後述

self._dataset_fetcherは、create_fetcherにより_MapDatasetFetcherクラスをインスタンス化したもの

@staticmethod
def create_fetcher(kind, dataset, auto_collation, collate_fn, drop_last):
...
return _utils.fetch._MapDatasetFetcher(dataset, auto_collation, collate_fn, drop_last)

_MapDatasetFetcherのfetch methodでDataCollatorが呼び出される

class _MapDatasetFetcher(_BaseDatasetFetcher):
    def fetch(self, possibly_batched_index):
        ...
        return self.collate_fn(data)

index

next_indexの実装は、_BaseDataLoaderIterが持つ。
_BaseDataLoaderIterは、_SingleProcessDataLoaderIterの継承元。

indexの中身は、長さがbatch_sizeのarrayでindexが要素である。
e.g. [4, 128, 20, 10....]

def _next_index(self):
    return next(self._sampler_iter)  # may raise StopIteration

ここのindexは、DataLoaderのbatch_samplerが作り出す

@property
def _index_sampler(self):
    # The actual sampler used for generating indices for `_DatasetFetcher`
    # (see _utils/fetch.py) to read data at each time. This would be
    # `.batch_sampler` if in auto-collation mode, and `.sampler` otherwise.
    # We can't change `.sampler` and `.batch_sampler` attributes for BC
    # reasons.
    if self._auto_collation:
        return self.batch_sampler
    else:
        return self.sampler

batch_samplerは、DataLoaderのインスタンスを作ったときに渡したものが使われる。
ここでは、sampler=train_sampler,

return DataLoader(
    train_dataset,
    batch_size=self._train_batch_size,
    sampler=train_sampler,
    collate_fn=data_collator,
    drop_last=self.args.dataloader_drop_last,
    num_workers=self.args.dataloader_num_workers,
    pin_memory=self.args.dataloader_pin_memory,
    worker_init_fn=seed_worker,
)

transformersでは、シンプルなパターンの場合train_samplerとしてRandomSamplerが用いられる

return RandomSampler(self.train_dataset, generator=generator)

RandomSamplerClassの実装はpytorchが持つ

DataCollator

DataCollatorの実装は以下

DataLoaderがiteratorとして呼び出された場合は、DataCollator__call__が呼び出される。

引数のfeaturesは、batche_sizeごとのinput_idsとlabels、attention_maskとなる。
batch_sizeが2の場合は以下のようになる。

[
  {
    'input_ids': [...],
    'labels': [...],
    'attention_mask': [...]
  },
  {
    'input_ids': [...],
    'labels': [...],
    'attention_mask': [...]
  }
]

feturesに対してpaddingが行われる

  features = self.tokenizer.pad(
            features,
            padding=self.padding,
            max_length=self.max_length,
            pad_to_multiple_of=self.pad_to_multiple_of,
            return_tensors=return_tensors,
        )

paddingしたことによって、以下のような形になる

{
  'input_ids': tensor(batch_size, embedding_len),
  'labels': tensor(batch_size, embedding_len),
  'attention_mask': tensor(batch_size, embedding_len)
}

このデータがinputsで受け取れるものとなる。

for step, inputs in enumerate(epoch_iterator):
  ...

stepごとにDataCollator.__call__が呼び出されることになる。

まとめ

DataLoaderがiteratorの実装を持っており、batch_size単位でループが回る。 ループの1要素である以下のobjectは、ループごとにDataCollatorが整形する

{
  'input_ids': tensor(batch_size, embedding_len),
  'labels': tensor(batch_size, embedding_len),
  'attention_mask': tensor(batch_size, embedding_len)
}

追記

transformers.Trainerのtrain_datasetとして配列を渡すと、2epoch目からdata_setがおかしくなる。例えば、DataCollatorForSeq2Seqはbatchごとにpaddingを追加する処理を行うが、1epoch目で追加したpaddingに対し更にpaddingを行うため、input_idsとlabelsの長さが異なってしまう。

これは、epochのループの中でepoch_iteratorつまりDataLoaderを呼び出しループしているため。datasetが参照渡しになり、前回の変更が残ってしまう。

for epoch in range(epochs_trained, num_train_epochs):
  ...
  for step, inputs in enumerate(epoch_iterator):
     ...

transformers.Trainerのtrain_datasetに渡すのは、DataSet Classを使うのが良さそう。

data = load_dataset("json", data_files=data_path)  
train_val = data["train"].train_test_split(test_size=val_set_size, shuffle=True, seed=42)
train_data = (tokenizerなどの処理)
## listからDatasetに変換
train_data = Dataset.from_list(train_data)

or 

data = load_dataset("json", data_files=data_path)  
train_val = data["train"].train_test_split(test_size=val_set_size, shuffle=True, seed=42)

## mapを使う
train_data = train_val["train"].shuffle().map(generate_and_tokenize_prompt)

確認用サンプル

from torch.utils.data.dataloader import DataLoader
from torch.utils.data.sampler import RandomSampler
import torch
import transformers
from datasets import load_dataset

data = load_dataset('json', data_files="./sample.json")
train_val = data["train"].train_test_split(
            test_size=1, shuffle=True, seed=42
)
train_data = train_val["train"].shuffle().map(any_function)
# mapを使うとDataSet型のまま
# train_dataが配列の場合はDataSet型にしておく
# train_data = Dataset.from_list(train_data)

sampler = RandomSampler(train_data, generator=torch.Generator())

data_collator=transformers.DataCollatorForSeq2Seq(
    tokenizer, pad_to_multiple_of=8, return_tensors="pt", padding=True
)
data_loader=DataLoader(
    train_data,
    batch_size=4,
    collate_fn=data_collator,
    sampler=sampler,    
)
for steps, input in enumerate(data_loader):    
    print(steps, input['input_ids'].shape, input['labels'].shape)
print('end')

See Also