Google Colabの環境で画像の前処理を高速化するDALIで学習が高速化出来るか試してみた

画像の前処理に時間がかかるケースってありますよね

最近の画像の前処理は複雑になってきているから特にそう感じるかもしれないな

CPUとGPUは異なるメモリを使用しているからそのデータ移動にも時間がかかるので前処理からGPUで処理ができると良いな

DALIとは

画像の前処理をCPUで処理している場合、その処理が複雑になればなるほど時間がかかるので、その処理をGPUにして高速化しようとする試みです。

前処理の高速化とCPUとGP間のデータ転送の時間も削減できます。下記が公式のリンク

https://docs.nvidia.com/deeplearning/sdk/dali-developer-guide/docs/index.html

検証環境

Google Colabの環境で構築しました。Google Colabの環境構築方法は下記リンクを参照ください。

DALIによる画像の前処理の高速化

下記のコードを元に修正して検証しました。

https://github.com/NVIDIA/DALI/blob/master/docs/examples/frameworks/pytorch/pytorch-external_input.ipynb

Google Colabのための環境設定。DALIのライブラリをインストールします。

!pip install --extra-index-url https://developer.download.nvidia.com/compute/redist nvidia-dali-cuda100

cudaのGPUデバイス設定をします。複数GPUがある場合は0から順に番号が振られています。今回は1つのGPUしか使用しないので0を設定します。

import os 

os.environ["CUDA_VISIBLE_DEVICES"] = '0'

DALIのgithubレポジトリに検証用のサンプルデータがあるので取得します。

! git clone https://github.com/NVIDIA/DALI.git

サンプルデータの中身を確認してみます。下記のように確認できます。

import cv2
from google.colab.patches import cv2_imshow
from pathlib import Path

jpg_file_path = "/content/DALI/docs/examples/data/images/"
path_list = Path(jpg_file_path + '/dog').glob('**/*.jpg')

for path_jpg in path_list:
    image = cv2.imread(str(path_jpg))
    imS = cv2.resize(image, (125, 100))
    cv2_imshow(imS)

ここから本格的にDALIの実装に入るぞ

ExternalInputIterator

外部ファイルの読み込用Iteratorを設定します。ここではjpgファイルの読み込みを行うIteratorを設定しています。

  • __init__関数:jpgファイルのパスを設定、バッチサイズの設定、データサイズの設定、jpegファイルのリストを作成
  • __iter__関数:ファイルをシャフルしてオブジェクトを返す
  • __next__関数:インデックスがファイルサイズを超えたらイテレーション処理を止める。バッチサイズ分(画像、ラベル)を取得して返す
  • size関数:データのサイズを取得
class ExternalInputIterator(object):
    def __init__(self, batch_size, device_id, num_gpus, jpg_file_path=jpg_file_path):
        self.images_dir = jpg_file_path
        self.batch_size = batch_size
        with open(self.images_dir + "file_list.txt", 'r') as f:
            self.files = [line.rstrip() for line in f if line is not '']
        # whole data set size
        self.data_set_len = len(self.files) 
        # based on the device_id and total number of GPUs - world size
        # get proper shard
        self.files = self.files[self.data_set_len * device_id // num_gpus:
                                self.data_set_len * (device_id + 1) // num_gpus]
        self.n = len(self.files)

    def __iter__(self):
        self.i = 0
        shuffle(self.files)
        return self

    def __next__(self):
        batch = []
        labels = []

        if self.i >= self.n:
            raise StopIteration

        for _ in range(self.batch_size):
            jpeg_filename, label = self.files[self.i].split(' ')
            f = open(self.images_dir + jpeg_filename, 'rb')
            batch.append(np.frombuffer(f.read(), dtype = np.uint8))
            labels.append(np.array([label], dtype = np.uint8))
            self.i = (self.i + 1) % self.n
        return (batch, labels)

    @property
    def size(self,):
        return self.data_set_len

    next = __next__

ExternalSourcePipeline

データの入力から前処理を設定したパイプラインを作成

  • __init__関数:入力データ、ラベル、前処理の設定
  • define_graph関数:入力->前処理->出力のパイプラインを設定
  • iter_setup関数:self.iteratorで設定した外部データを読み込むための設定。
class ExternalSourcePipeline(Pipeline):
    def __init__(self, batch_size, num_threads, device_id, external_data, decode_device='mixed', device='gpu'):
        super(ExternalSourcePipeline, self).__init__(batch_size,
                                      num_threads,
                                      device_id,
                                      seed=12)
        self.input = ops.ExternalSource()
        self.input_label = ops.ExternalSource()
        self.decode = ops.ImageDecoder(device = decode_device, output_type = types.RGB)
        self.res = ops.Resize(device=device, resize_x=240, resize_y=240)
        self.cast = ops.Cast(device = device,
                             dtype = types.UINT8)
        self.external_data = external_data
        self.iterator = iter(self.external_data)

    def define_graph(self):
        self.jpegs = self.input()
        self.labels = self.input_label()
        images = self.decode(self.jpegs)
        images = self.res(images)
        output = self.cast(images)
        return (output, self.labels)

    def iter_setup(self):
        try:
            (images, labels) = self.iterator.next()
            self.feed_input(self.jpegs, images)
            self.feed_input(self.labels, labels)
        except StopIteration:
            self.iterator = iter(self.external_data)
            raise StopIteration

DALIによる学習の高速化が可能を検証

ここからDALIの検証を行うぞ

どの程度、高速化できるのだろう

前処理、前処理から学習までの高速化が可能化をPyTorchで検証しました。

検証条件

  • GPU、CPUは初回実行で初期化などで時間がかかるのでウォームアップを行う
  • 単純な前処理の比較
  • 前処理から学習までの比較
  • Google Colab

前処理(CPU)

ウォームアップ処理

from nvidia.dali.plugin.pytorch import DALIClassificationIterator as PyTorchIterator

eii = ExternalInputIterator(batch_size, 0, 1)
pipe = ExternalSourcePipeline(batch_size=batch_size, num_threads=2, device_id = 0,
                              external_data = eii, decode_device='cpu', device='cpu')
pii = PyTorchIterator(pipe, size=eii.size, last_batch_padded=True, fill_last_batch=False)

epochs = 3
for e in range(epochs):
    for i, data in enumerate(pii):
        print("epoch: {}, iter {}, real batch size: {}".format(e, i, len(data[0]["data"])))
    pii.reset()

速度を計測(total: 272 ms)

%%time
for e in range(epochs):
    for i, data in enumerate(pii):
        print("epoch: {}, iter {}, real batch size: {}".format(e, i, len(data[0]["data"])))
    pii.reset()

前処理(GPU)

ウォームアップ処理

from nvidia.dali.plugin.pytorch import DALIClassificationIterator as PyTorchIterator

eii = ExternalInputIterator(batch_size, 0, 1)
pipe = ExternalSourcePipeline(batch_size=batch_size, num_threads=2, device_id = 0,
                              external_data = eii)
pii = PyTorchIterator(pipe, size=eii.size, last_batch_padded=True, fill_last_batch=False)

for e in range(epochs):
    for i, data in enumerate(pii):
        print("epoch: {}, iter {}, real batch size: {}".format(e, i, len(data[0]["data"])))
    pii.reset()

速度を計測(total: 75.3 ms)。約3.6倍程度、高速化しました。

%%time

for e in range(epochs):
    for i, data in enumerate(pii):
        print("epoch: {}, iter {}, real batch size: {}".format(e, i, len(data[0]["data"])))
    pii.reset()

前処理から学習(CPU)

まずは学習のためのモデルを設定します。モデルは畳み込みを2層からの全結合処理が入ったかんたんな構造です。モデルの処理はGPUで行います。

from torch import nn
import torch
import torch.nn.functional as F

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


class Net(nn.Module):

    def __init__(self):
        super(Net, self).__init__()
        # 1 input image channel, 6 output channels, 3x3 square convolution
        # kernel
        self.conv1 = nn.Conv2d(3, 6, 3)
        self.conv2 = nn.Conv2d(6, 16, 3)
        # an affine operation: y = Wx + b
        self.fc1 = nn.Linear(53824, 128)  # 6*6 from image dimension
        self.fc2 = nn.Linear(128, 84)
        self.fc3 = nn.Linear(84, 2)

    def forward(self, x):
        # Max pooling over a (2, 2) window
        x = F.max_pool2d(F.relu(self.conv1(x)), (2, 2))
        # If the size is a square you can only specify a single number
        x = F.max_pool2d(F.relu(self.conv2(x)), 2)
        x = x.reshape(-1, self.num_flat_features(x))
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x

    def num_flat_features(self, x):
        size = x.size()[1:]  # all dimensions except the batch dimension
        num_features = 1
        for s in size:
            num_features *= s
        return num_features


model = Net()
model.to(device)

ロス関数とOptimizerを設定します。

from torch import optim

loss_fn = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.01)

CPU用のDaliパイプラインを作成します。

from tqdm import tqdm

eii = ExternalInputIterator(batch_size, 0, 1)
pipe = ExternalSourcePipeline(batch_size=batch_size, num_threads=2, device_id = 0,
                              external_data = eii, decode_device='cpu', device='cpu')
pii = PyTorchIterator(pipe, size=eii.size, last_batch_padded=True, fill_last_batch=False)

前処理から学習までのウォームアップ処理をします。

epochs = 100

for e in tqdm(range(epochs)):
    for i, data in enumerate(pii):
      
        each_data, targets = data[0]['data'], data[0]['label']
        each_data = each_data.transpose(1, 3).transpose(2, 3)
        each_data = torch.tensor(each_data, dtype=torch.float).cuda()
        targets = torch.tensor(targets, dtype=torch.long).squeeze().cuda()

        optimizer.zero_grad()
        outputs = model(each_data)
        loss = loss_fn(outputs, targets)

        loss.backward()
        optimizer.step()
    pii.reset()

速度を計測(total: 13.1 s)

%%time

epochs = 100
for e in tqdm(range(epochs)):
    for i, data in enumerate(pii):
        
        each_data, targets = data[0]['data'], data[0]['label']
        each_data = each_data.transpose(1, 3).transpose(2, 3)
        each_data = torch.tensor(each_data, dtype=torch.float).cuda()
        targets = torch.tensor(targets, dtype=torch.long).squeeze().cuda()

        optimizer.zero_grad()
        outputs = model(each_data)
        loss = loss_fn(outputs, targets)

        loss.backward()
        optimizer.step()
    pii.reset()

前処理から学習(GPU)

GPU用のDaliパイプラインを作成します

eii = ExternalInputIterator(batch_size, 0, 1)
pipe = ExternalSourcePipeline(batch_size=batch_size, num_threads=2, device_id = 0,
                              external_data = eii, decode_device='cpu', device='cpu')
pii = PyTorchIterator(pipe, size=eii.size, last_batch_padded=True, fill_last_batch=False)

前処理から学習までのウォームアップ処理をします。

epochs = 100

for e in tqdm(range(epochs)):
    for i, data in enumerate(pii):
        
        each_data, targets = data[0]['data'], data[0]['label']
        each_data = each_data.transpose(1, 3).transpose(2, 3)
        each_data = torch.tensor(each_data, dtype=torch.float).cuda()
        targets = torch.tensor(targets, dtype=torch.long).squeeze().cuda()

        optimizer.zero_grad()
        outputs = model(each_data)
        loss = loss_fn(outputs, targets)

        loss.backward()
        optimizer.step()
    pii.reset()

速度を計測(total: 8.2 s)。約1.6倍程度、高速化しました。

%%time
epochs = 100

for e in tqdm(range(epochs)):
    for i, data in enumerate(pii):
        
        each_data, targets = data[0]['data'], data[0]['label']
        each_data = each_data.transpose(1, 3).transpose(2, 3)
        each_data = torch.tensor(each_data, dtype=torch.float).cuda()
        targets = torch.tensor(targets, dtype=torch.long).squeeze().cuda()

        optimizer.zero_grad()
        outputs = model(each_data)
        loss = loss_fn(outputs, targets)

        loss.backward()
        optimizer.step()
    pii.reset()

前処理だけでは3.6倍程度、前処理から学習までだと1.6倍程度、高速化!!
前処理がシンプルな分、効果が薄いがより複雑な処理になるともっと効果がでるぞ

Close Bitnami banner
Bitnami