エラーでハマったので、hugging faceのdatasetからbatche_sizeごとのinput_idsやlabelsにするあたりの実装、特にDataLoaderとDataCollatorあたりをちゃんと確認しておく
train loopは以下から始まる
def train(
self,
resume_from_checkpoint: Optional[Union[str, bool]] = None,
trial: Union["optuna.Trial", Dict[str, Any]] = None,
ignore_keys_for_eval: Optional[List[str]] = None,
**kwargs,
):train loopの中で主な部分は_inner_training_loop関数となる
def _inner_training_loop(
self, batch_size=None, args=None, resume_from_checkpoint=None, trial=None, ignore_keys_for_eval=None
):
self.accelerator.free_memory()
self._train_batch_size = batch_size
logger.debug(f"Currently training with a batch size of: {self._train_batch_size}")
# Data loader and number of training steps
train_dataloader = self.get_train_dataloader()train_dataloaderがdatasetをbatch sizeに揃えたり、paddingを行う
https://github.com/huggingface/transformers/blob/v4.30.2/src/transformers/trainer.py#L1659
DataLoader
dataloaderは、以下のようにインスタンス化される
return DataLoader(
train_dataset,
batch_size=self._train_batch_size,
sampler=train_sampler,
collate_fn=data_collator,
drop_last=self.args.dataloader_drop_last,
num_workers=self.args.dataloader_num_workers,
pin_memory=self.args.dataloader_pin_memory,
worker_init_fn=seed_worker,
)例として、以下のような配列をtransfomers.Trainerの引数として渡すとfrom torch.utils.data import DataLoaderがreturnされる
[
{
'input_ids': [0, 0, 0, 0, 0...],
'labels': [0, 0, 0, 0, 0...],
'attention_mask': [0, 0, 0, 0, 0...]
},
{
'input_ids': [0, 0, 0, 0, 0...],
'labels': [0, 0, 0, 0, 0...],
'attention_mask': [0, 0, 0, 0, 0...]
}
]※ 配列のまま使うのではなく、DataSet Classとして扱う方が都合良いです。
train_data = Dataset.from_list(配列データ)のような感じ
DataLoaderはイテレーターとして、train.pyでは以下のように使われる。
epoch_iteratorはdataloaderから作られるDataLoader型
for step, inputs in enumerate(epoch_iterator):このときのループ変数であるinputsは、input_idsとlabelsとattention_maskでbatchごとにまとめたものとなる
{
'input_ids': tensor(batch_size, embedding_len),
'labels': tensor(batch_size, embedding_len),
'attention_mask': tensor(batch_size, embedding_len)
}DataLoaderに引数として渡したdata_collatorが1 loopごとに呼び出される。
このdata_collatorはtransformers.Trainerに渡したものが使われる。
例えば、data_collatorは以下のようになる。
data_collator=DataCollatorForSeq2Seq(
tokenizer,
pad_to_multiple_of=None,
return_tensors="pt",
padding=True,
label_pad_token_id=tokenizer.pad_token_id,
),DataLoaderの実装
どのようにbatch_sizeごとのデータをとりだしているか確認する
DataLoader Classの実装は以下
class DataLoader(Generic[T_co]):iterator部分の実装は以下
def __iter__(self) -> '_BaseDataLoaderIter':
# When using a single worker the returned iterator should be
# created everytime to avoid reseting its state
# However, in the case of a multiple workers iterator
# the iterator is only created once in the lifetime of the
# DataLoader object so that workers can be reused
if self.persistent_workers and self.num_workers > 0:
if self._iterator is None:
self._iterator = self._get_iterator()
else:
self._iterator._reset(self)
return self._iterator
else:
return self._get_iterator()いくつかの分岐があるが、よくあるパターンでは最後の_get_iterator()が呼び出される
return self._get_iterator()self._get_iterator()により、_SingleProcessDataLoaderIterがiteratorの実装として返される。
_SingleProcessDataLoaderIterは_BaseDataLoaderIterを継承したもの
def _get_iterator(self) -> '_BaseDataLoaderIter':
...
return _SingleProcessDataLoaderIter(self)_SingleProcessDataLoaderIterの実装は以下を参照
class _SingleProcessDataLoaderIter(_BaseDataLoaderIter):
def __init__(self, loader):iteratorの1要素の作成は、_SingleProcessDataLoaderIterのself._dataset_fetcherによって作成される。
self._dataset_fetcher = _DatasetKind.create_fetcher(
self._dataset_kind,
self._dataset,
self._auto_collation,
self._collate_fn,
self._drop_last)fetcherを使ってindexをもとにデータを取り出す。
def _next_data(self):
index = self._next_index() # may raise StopIteration
data = self._dataset_fetcher.fetch(index) # may raise StopIterationここのindexについては後述
self._dataset_fetcherは、create_fetcherにより_MapDatasetFetcherクラスをインスタンス化したもの
@staticmethod
def create_fetcher(kind, dataset, auto_collation, collate_fn, drop_last):
...
return _utils.fetch._MapDatasetFetcher(dataset, auto_collation, collate_fn, drop_last)_MapDatasetFetcherのfetch methodでDataCollatorが呼び出される
class _MapDatasetFetcher(_BaseDatasetFetcher):
def fetch(self, possibly_batched_index):
...
return self.collate_fn(data)index
next_indexの実装は、_BaseDataLoaderIterが持つ。
_BaseDataLoaderIterは、_SingleProcessDataLoaderIterの継承元。
indexの中身は、長さがbatch_sizeのarrayでindexが要素である。
e.g. [4, 128, 20, 10....]
def _next_index(self):
return next(self._sampler_iter) # may raise StopIterationここのindexは、DataLoaderのbatch_samplerが作り出す
@property
def _index_sampler(self):
# The actual sampler used for generating indices for `_DatasetFetcher`
# (see _utils/fetch.py) to read data at each time. This would be
# `.batch_sampler` if in auto-collation mode, and `.sampler` otherwise.
# We can't change `.sampler` and `.batch_sampler` attributes for BC
# reasons.
if self._auto_collation:
return self.batch_sampler
else:
return self.samplerbatch_samplerは、DataLoaderのインスタンスを作ったときに渡したものが使われる。
ここでは、sampler=train_sampler,
return DataLoader(
train_dataset,
batch_size=self._train_batch_size,
sampler=train_sampler,
collate_fn=data_collator,
drop_last=self.args.dataloader_drop_last,
num_workers=self.args.dataloader_num_workers,
pin_memory=self.args.dataloader_pin_memory,
worker_init_fn=seed_worker,
)transformersでは、シンプルなパターンの場合train_samplerとしてRandomSamplerが用いられる
return RandomSampler(self.train_dataset, generator=generator)RandomSamplerClassの実装はpytorchが持つ
class RandomSampler(Sampler[int]):DataCollator
DataCollatorの実装は以下
@dataclass
class DataCollatorForSeq2Seq:DataLoaderがiteratorとして呼び出された場合は、DataCollatorの__call__が呼び出される。
def __call__(self, features, return_tensors=None):引数のfeaturesは、batche_sizeごとのinput_idsとlabels、attention_maskとなる。
batch_sizeが2の場合は以下のようになる。
[
{
'input_ids': [...],
'labels': [...],
'attention_mask': [...]
},
{
'input_ids': [...],
'labels': [...],
'attention_mask': [...]
}
]feturesに対してpaddingが行われる
features = self.tokenizer.pad(
features,
padding=self.padding,
max_length=self.max_length,
pad_to_multiple_of=self.pad_to_multiple_of,
return_tensors=return_tensors,
)paddingしたことによって、以下のような形になる
{
'input_ids': tensor(batch_size, embedding_len),
'labels': tensor(batch_size, embedding_len),
'attention_mask': tensor(batch_size, embedding_len)
}このデータがinputsで受け取れるものとなる。
for step, inputs in enumerate(epoch_iterator):
...stepごとにDataCollator.__call__が呼び出されることになる。
まとめ
DataLoaderがiteratorの実装を持っており、batch_size単位でループが回る。 ループの1要素である以下のobjectは、ループごとにDataCollatorが整形する
{
'input_ids': tensor(batch_size, embedding_len),
'labels': tensor(batch_size, embedding_len),
'attention_mask': tensor(batch_size, embedding_len)
}追記
transformers.Trainerのtrain_datasetとして配列を渡すと、2epoch目からdata_setがおかしくなる。例えば、DataCollatorForSeq2Seqはbatchごとにpaddingを追加する処理を行うが、1epoch目で追加したpaddingに対し更にpaddingを行うため、input_idsとlabelsの長さが異なってしまう。
これは、epochのループの中でepoch_iteratorつまりDataLoaderを呼び出しループしているため。datasetが参照渡しになり、前回の変更が残ってしまう。
for epoch in range(epochs_trained, num_train_epochs):
...
for step, inputs in enumerate(epoch_iterator):
...transformers.Trainerのtrain_datasetに渡すのは、DataSet Classを使うのが良さそう。
data = load_dataset("json", data_files=data_path)
train_val = data["train"].train_test_split(test_size=val_set_size, shuffle=True, seed=42)
train_data = (tokenizerなどの処理)
## listからDatasetに変換
train_data = Dataset.from_list(train_data)
or
data = load_dataset("json", data_files=data_path)
train_val = data["train"].train_test_split(test_size=val_set_size, shuffle=True, seed=42)
## mapを使う
train_data = train_val["train"].shuffle().map(generate_and_tokenize_prompt)確認用サンプル
from torch.utils.data.dataloader import DataLoader
from torch.utils.data.sampler import RandomSampler
import torch
import transformers
from datasets import load_dataset
data = load_dataset('json', data_files="./sample.json")
train_val = data["train"].train_test_split(
test_size=1, shuffle=True, seed=42
)
train_data = train_val["train"].shuffle().map(any_function)
# mapを使うとDataSet型のまま
# train_dataが配列の場合はDataSet型にしておく
# train_data = Dataset.from_list(train_data)
sampler = RandomSampler(train_data, generator=torch.Generator())
data_collator=transformers.DataCollatorForSeq2Seq(
tokenizer, pad_to_multiple_of=8, return_tensors="pt", padding=True
)
data_loader=DataLoader(
train_data,
batch_size=4,
collate_fn=data_collator,
sampler=sampler,
)
for steps, input in enumerate(data_loader):
print(steps, input['input_ids'].shape, input['labels'].shape)
print('end')