画像の前処理に時間がかかるケースってありますよね
最近の画像の前処理は複雑になってきているから特にそう感じるかもしれないな
CPUとGPUは異なるメモリを使用しているからそのデータ移動にも時間がかかるので前処理からGPUで処理ができると良いな
目次
DALIとは
画像の前処理をCPUで処理している場合、その処理が複雑になればなるほど時間がかかるので、その処理をGPUにして高速化しようとする試みです。
前処理の高速化とCPUとGP間のデータ転送の時間も削減できます。下記が公式のリンク
https://docs.nvidia.com/deeplearning/sdk/dali-developer-guide/docs/index.html
検証環境
Google Colabの環境で構築しました。Google Colabの環境構築方法は下記リンクを参照ください。
DALIによる画像の前処理の高速化
下記のコードを元に修正して検証しました。
Google Colabのための環境設定。DALIのライブラリをインストールします。
!pip install --extra-index-url https://developer.download.nvidia.com/compute/redist nvidia-dali-cuda100
cudaのGPUデバイス設定をします。複数GPUがある場合は0から順に番号が振られています。今回は1つのGPUしか使用しないので0を設定します。
import os
os.environ["CUDA_VISIBLE_DEVICES"] = '0'
DALIのgithubレポジトリに検証用のサンプルデータがあるので取得します。
! git clone https://github.com/NVIDIA/DALI.git
サンプルデータの中身を確認してみます。下記のように確認できます。
import cv2
from google.colab.patches import cv2_imshow
from pathlib import Path
jpg_file_path = "/content/DALI/docs/examples/data/images/"
path_list = Path(jpg_file_path + '/dog').glob('**/*.jpg')
for path_jpg in path_list:
image = cv2.imread(str(path_jpg))
imS = cv2.resize(image, (125, 100))
cv2_imshow(imS)

ここから本格的にDALIの実装に入るぞ
ExternalInputIterator
外部ファイルの読み込用Iteratorを設定します。ここではjpgファイルの読み込みを行うIteratorを設定しています。
- __init__関数:jpgファイルのパスを設定、バッチサイズの設定、データサイズの設定、jpegファイルのリストを作成
- __iter__関数:ファイルをシャフルしてオブジェクトを返す
- __next__関数:インデックスがファイルサイズを超えたらイテレーション処理を止める。バッチサイズ分(画像、ラベル)を取得して返す
- size関数:データのサイズを取得
class ExternalInputIterator(object):
def __init__(self, batch_size, device_id, num_gpus, jpg_file_path=jpg_file_path):
self.images_dir = jpg_file_path
self.batch_size = batch_size
with open(self.images_dir + "file_list.txt", 'r') as f:
self.files = [line.rstrip() for line in f if line is not '']
# whole data set size
self.data_set_len = len(self.files)
# based on the device_id and total number of GPUs - world size
# get proper shard
self.files = self.files[self.data_set_len * device_id // num_gpus:
self.data_set_len * (device_id + 1) // num_gpus]
self.n = len(self.files)
def __iter__(self):
self.i = 0
shuffle(self.files)
return self
def __next__(self):
batch = []
labels = []
if self.i >= self.n:
raise StopIteration
for _ in range(self.batch_size):
jpeg_filename, label = self.files[self.i].split(' ')
f = open(self.images_dir + jpeg_filename, 'rb')
batch.append(np.frombuffer(f.read(), dtype = np.uint8))
labels.append(np.array([label], dtype = np.uint8))
self.i = (self.i + 1) % self.n
return (batch, labels)
@property
def size(self,):
return self.data_set_len
next = __next__
ExternalSourcePipeline
データの入力から前処理を設定したパイプラインを作成
- __init__関数:入力データ、ラベル、前処理の設定
- define_graph関数:入力->前処理->出力のパイプラインを設定
- iter_setup関数:self.iteratorで設定した外部データを読み込むための設定。
class ExternalSourcePipeline(Pipeline):
def __init__(self, batch_size, num_threads, device_id, external_data, decode_device='mixed', device='gpu'):
super(ExternalSourcePipeline, self).__init__(batch_size,
num_threads,
device_id,
seed=12)
self.input = ops.ExternalSource()
self.input_label = ops.ExternalSource()
self.decode = ops.ImageDecoder(device = decode_device, output_type = types.RGB)
self.res = ops.Resize(device=device, resize_x=240, resize_y=240)
self.cast = ops.Cast(device = device,
dtype = types.UINT8)
self.external_data = external_data
self.iterator = iter(self.external_data)
def define_graph(self):
self.jpegs = self.input()
self.labels = self.input_label()
images = self.decode(self.jpegs)
images = self.res(images)
output = self.cast(images)
return (output, self.labels)
def iter_setup(self):
try:
(images, labels) = self.iterator.next()
self.feed_input(self.jpegs, images)
self.feed_input(self.labels, labels)
except StopIteration:
self.iterator = iter(self.external_data)
raise StopIteration
DALIによる学習の高速化が可能を検証
ここからDALIの検証を行うぞ
どの程度、高速化できるのだろう
前処理、前処理から学習までの高速化が可能化をPyTorchで検証しました。
検証条件
- GPU、CPUは初回実行で初期化などで時間がかかるのでウォームアップを行う
- 単純な前処理の比較
- 前処理から学習までの比較
- Google Colab
前処理(CPU)
ウォームアップ処理
from nvidia.dali.plugin.pytorch import DALIClassificationIterator as PyTorchIterator
eii = ExternalInputIterator(batch_size, 0, 1)
pipe = ExternalSourcePipeline(batch_size=batch_size, num_threads=2, device_id = 0,
external_data = eii, decode_device='cpu', device='cpu')
pii = PyTorchIterator(pipe, size=eii.size, last_batch_padded=True, fill_last_batch=False)
epochs = 3
for e in range(epochs):
for i, data in enumerate(pii):
print("epoch: {}, iter {}, real batch size: {}".format(e, i, len(data[0]["data"])))
pii.reset()
速度を計測(total: 272 ms)
%%time
for e in range(epochs):
for i, data in enumerate(pii):
print("epoch: {}, iter {}, real batch size: {}".format(e, i, len(data[0]["data"])))
pii.reset()
前処理(GPU)
ウォームアップ処理
from nvidia.dali.plugin.pytorch import DALIClassificationIterator as PyTorchIterator
eii = ExternalInputIterator(batch_size, 0, 1)
pipe = ExternalSourcePipeline(batch_size=batch_size, num_threads=2, device_id = 0,
external_data = eii)
pii = PyTorchIterator(pipe, size=eii.size, last_batch_padded=True, fill_last_batch=False)
for e in range(epochs):
for i, data in enumerate(pii):
print("epoch: {}, iter {}, real batch size: {}".format(e, i, len(data[0]["data"])))
pii.reset()
速度を計測(total: 75.3 ms)。約3.6倍程度、高速化しました。
%%time
for e in range(epochs):
for i, data in enumerate(pii):
print("epoch: {}, iter {}, real batch size: {}".format(e, i, len(data[0]["data"])))
pii.reset()
前処理から学習(CPU)
まずは学習のためのモデルを設定します。モデルは畳み込みを2層からの全結合処理が入ったかんたんな構造です。モデルの処理はGPUで行います。
from torch import nn
import torch
import torch.nn.functional as F
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
# 1 input image channel, 6 output channels, 3x3 square convolution
# kernel
self.conv1 = nn.Conv2d(3, 6, 3)
self.conv2 = nn.Conv2d(6, 16, 3)
# an affine operation: y = Wx + b
self.fc1 = nn.Linear(53824, 128) # 6*6 from image dimension
self.fc2 = nn.Linear(128, 84)
self.fc3 = nn.Linear(84, 2)
def forward(self, x):
# Max pooling over a (2, 2) window
x = F.max_pool2d(F.relu(self.conv1(x)), (2, 2))
# If the size is a square you can only specify a single number
x = F.max_pool2d(F.relu(self.conv2(x)), 2)
x = x.reshape(-1, self.num_flat_features(x))
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
x = self.fc3(x)
return x
def num_flat_features(self, x):
size = x.size()[1:] # all dimensions except the batch dimension
num_features = 1
for s in size:
num_features *= s
return num_features
model = Net()
model.to(device)
ロス関数とOptimizerを設定します。
from torch import optim
loss_fn = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.01)
CPU用のDaliパイプラインを作成します。
from tqdm import tqdm
eii = ExternalInputIterator(batch_size, 0, 1)
pipe = ExternalSourcePipeline(batch_size=batch_size, num_threads=2, device_id = 0,
external_data = eii, decode_device='cpu', device='cpu')
pii = PyTorchIterator(pipe, size=eii.size, last_batch_padded=True, fill_last_batch=False)
前処理から学習までのウォームアップ処理をします。
epochs = 100
for e in tqdm(range(epochs)):
for i, data in enumerate(pii):
each_data, targets = data[0]['data'], data[0]['label']
each_data = each_data.transpose(1, 3).transpose(2, 3)
each_data = torch.tensor(each_data, dtype=torch.float).cuda()
targets = torch.tensor(targets, dtype=torch.long).squeeze().cuda()
optimizer.zero_grad()
outputs = model(each_data)
loss = loss_fn(outputs, targets)
loss.backward()
optimizer.step()
pii.reset()
速度を計測(total: 13.1 s)
%%time
epochs = 100
for e in tqdm(range(epochs)):
for i, data in enumerate(pii):
each_data, targets = data[0]['data'], data[0]['label']
each_data = each_data.transpose(1, 3).transpose(2, 3)
each_data = torch.tensor(each_data, dtype=torch.float).cuda()
targets = torch.tensor(targets, dtype=torch.long).squeeze().cuda()
optimizer.zero_grad()
outputs = model(each_data)
loss = loss_fn(outputs, targets)
loss.backward()
optimizer.step()
pii.reset()
前処理から学習(GPU)
GPU用のDaliパイプラインを作成します
eii = ExternalInputIterator(batch_size, 0, 1)
pipe = ExternalSourcePipeline(batch_size=batch_size, num_threads=2, device_id = 0,
external_data = eii, decode_device='cpu', device='cpu')
pii = PyTorchIterator(pipe, size=eii.size, last_batch_padded=True, fill_last_batch=False)
前処理から学習までのウォームアップ処理をします。
epochs = 100
for e in tqdm(range(epochs)):
for i, data in enumerate(pii):
each_data, targets = data[0]['data'], data[0]['label']
each_data = each_data.transpose(1, 3).transpose(2, 3)
each_data = torch.tensor(each_data, dtype=torch.float).cuda()
targets = torch.tensor(targets, dtype=torch.long).squeeze().cuda()
optimizer.zero_grad()
outputs = model(each_data)
loss = loss_fn(outputs, targets)
loss.backward()
optimizer.step()
pii.reset()
速度を計測(total: 8.2 s)。約1.6倍程度、高速化しました。
%%time
epochs = 100
for e in tqdm(range(epochs)):
for i, data in enumerate(pii):
each_data, targets = data[0]['data'], data[0]['label']
each_data = each_data.transpose(1, 3).transpose(2, 3)
each_data = torch.tensor(each_data, dtype=torch.float).cuda()
targets = torch.tensor(targets, dtype=torch.long).squeeze().cuda()
optimizer.zero_grad()
outputs = model(each_data)
loss = loss_fn(outputs, targets)
loss.backward()
optimizer.step()
pii.reset()
前処理だけでは3.6倍程度、前処理から学習までだと1.6倍程度、高速化!!
前処理がシンプルな分、効果が薄いがより複雑な処理になるともっと効果がでるぞ