一般的なデータ処理ではpandasが用いられるが今回はGPU上から同様の処理を行えるcudfの紹介をするぞ
pandasと操作が変わるとコードの書き換えが面倒そうですね
その点は互換性を持った記述が可能なので心配しなくてよい
目次
高速にデータ処理を行いたい
pandasをデータ処理で用いることが多いですが、データサイズが大きくなると遅くなり、待ち時間が長くなってしまいます。そこで今回はGPUを使用して高速に処理が可能なcudfの紹介をします。
環境構築
検証環境
- Ubuntu 18.04
- メモリ:64GB
- GPU: Geforce 1080
- CPU : Intel(R) Core(TM) i7-6700 CPU @ 3.40GHz
NVIDIA GPU CLOUDにすでに環境構築されたDocker環境が存在します。今回はDockerを使用して環境構築をできるだけスキップして行います。
NVIDIA GPU CLOUDとは
Dockerコンテナ、学習済みモデル、学習用スクリプトなどを提供しているサイトです。ここにあるリソースを使用すればGPUを用いた処理を始めることが容易になります。
https://www.nvidia.com/ja-jp/gpu-cloud/
NVIDIA GPU CLOUDを用いた環境構築
NVIDIA GPU CLOUDにアクセスします。下記のような画面がでます。”Create an Account”をクリックします。

名前とメールアドレスの登録画面に遷移するので登録したい情報を入力します。

メールアドレスの確認する旨の画面に遷移します。

メールアドレスに下記のような内容が届きます。”SIGN IN TO NVIDIA GPU CLOUD”をクリックしましょう。

パスワード設定画面になるのでパスワードを設定します。パスワードは条件があるので条件通りになるように入力しましょう。条件を満たすと満たした部分の色が変わります。

契約条件などの合意ができるかの画面がでます。

上記のプロセスを終えると下記のような画面にでます。

NVIDIA GPU CLOUDからDockerを取得
NVIDIA GPU CLOUDからDockerを取得するには認証が必要になります。認証に必要な処理を行います。先程の画面の右上のSetupを選択します。

Generate API KeyのGet API Keyを選択します。

下記のような画面に遷移するのでGenerate API Keyを選択します。

現在使用しているAPIキーが無効になる警告がでますが初回は関係ないのでConfirmを押します。

先程取得したAPI Keyをパスワードにして下記コマンドをterminalを起動して入力します。
docker login nvcr.io
Username: $oauthtoken
Password: <Your Key>
GPU周りの設定が必要なので、まだ設定していない方はこちらを参照ください。下記コマンドでcudfが動作可能なDockerコンテナを取得します。
docker pull rapidsai/rapidsai:cuda10.1-runtime-ubuntu18.04
下記コマンドで先程取得したDockerコンテナに入ります。
docker run --gpus all --rm -it -p 8889:8889 -p 8787:8787 -p 8786:8786 rapidsai/rapidsai:cuda10.1-runtime-ubuntu18.04
コンテナ内で下記コマンドを実行してください。
jupyter-lab --allow-root --ip=0.0.0.0 --NotebookApp.token=""
ブラウザーで”http://localhost:8889“にアクセスするもしくは自動起動するまで待つと下記のような画面になります。

cudfとpandasを比較する
実際にコードを書いて動作を確認してみよう!
cudfには標準で2つのnotebookが具体例として用意されています。一つは単純な処理の比較が可能な`notebooks_Apply_Operations_in_cuDF.ipynb`。もう一つはデータだけなく内部処理の関数もGPUから処理できるnumbaを使用することでより高速に処理することが可能な`notebooks_numba_cuDF_integration.ipynb`があります。
まずは`notebooks_Apply_Operations_in_cuDF.ipynb`を確認します。必要なライブラリのimportとテストデータを作成しています。テストデータは1000のデータがin1に設定されているシンプルなものです。データはGPUで処理可能なcudfに設定されています。
import cudf
import numpy as np
from numba import cuda
df = cudf.DataFrame()
df['in1'] = np.arange(1000, dtype=np.float64)

下記で関数kernelを設定しています。
これはcudfで行ごとに処理を行うapply_rows関数に設定するためです。
関数kernelをincolsで指定している列”in1″に適用しています。関数kernelを行ごとに適用しています。
出力結果はoutcolsに出力されます。データのフォーマットはout=np.float64で指定しています。
他に指定したい場合はkwargsに値を指定しますが今回はないので空のdictを指定しています。詳細は下記リンクになります。
https://rapidsai.github.io/projects/cudf/en/0.8.0/api.html#cudf.dataframe.DataFrame.apply_rows
def kernel(in1, out):
for i, x in enumerate(in1):
print('tid:', cuda.threadIdx.x, 'bid:', cuda.blockIdx.x,
'array size:', in1.size, 'block threads:', cuda.blockDim.x)
out[i] = x * 2.0
outdf = df.apply_rows(kernel,
incols=['in1'],
outcols=dict(out=np.float64),
kwargs=dict())
print(outdf['in1'].sum()*2.0)
print(outdf['out'].sum())
出力結果は各行の値を2倍しただけのものになります。

ここで次の処理に進む前にGPUの処理構造を理解しておきましょう。下記の図はホストデバイスからGPUを呼ぶ際の仕組みです。用語を理解しましょう
https://www.nvidia.co.jp/docs/IO/51174/NVIDIA_CUDA_Programming_Guide_1.1_JPN.pdf
- Host: GPU動作の呼び出し側
- Device: GPU
- Kernel: Device上で実行される処理のこと。Host上のコードで定義される
- Thread: Kernelを実行するための最小単位のプロセスのこど
- Block: Threadの塊
- Grid: Blockの塊

下記のコードを見てみます。先程との違いはchunkとtpbを指定している部分が異なります。また関数はapply_chunksを使用しており、chunk単位で処理を適用しています。
import cudf
import numpy as np
from numba import cuda
df = cudf.DataFrame()
df['in1'] = np.arange(100, dtype=np.float64)
def kernel(in1, out):
print('tid:', cuda.threadIdx.x, 'bid:', cuda.blockIdx.x,
'array size:', in1.size, 'block threads:', cuda.blockDim.x)
for i in range(cuda.threadIdx.x, in1.size, cuda.blockDim.x):
out[i] = in1[i] * 2.0
outdf = df.apply_chunks(kernel,
incols=['in1'],
outcols=dict(out=np.float64),
kwargs=dict(),
chunks=16,
tpb=8)
print(outdf['in1'].sum()*2.0)
print(outdf['out'].sum())
chunkはデータを収める単位になります。例えば100のデータを16のサイズのchunkに入れる場合は7chunk生成されます。この7chunkは7Blockに処理するデータとしてアサインされます。
tpbはブロックあたりのスレッド数になります。下記のようなイメージになります。

実際にcuDFとpandasでどの程度速度が異なるかを把握してみます。データ・セットのサイズは先程より大きくないと差が十分にでないのでデータ・セットのサイズを大きくしています。またassert関数で導出した値の正しさもチェックしています。
import cudf
import pandas as pd
import numpy as np
import time
data_length = 1e6
df = cudf.DataFrame()
df['in1'] = np.arange(data_length, dtype=np.float64)
def kernel(in1, out):
for i, x in enumerate(in1):
out[i] = x * 2.0
start = time.time()
df = df.apply_rows(kernel,
incols=['in1'],
outcols=dict(out=np.float64),
kwargs=dict())
end = time.time()
print('cuDF time', end-start)
assert(np.isclose(df['in1'].sum()*2.0, df['out'].sum()))
df = pd.DataFrame()
df['in1'] = np.arange(data_length, dtype=np.float64)
start = time.time()
df['out'] = df.in1.apply(lambda x: x*2)
end = time.time()
print('pandas time', end-start)
assert(np.isclose(df['in1'].sum()*2.0, df['out'].sum()))
Numbaを用いたNotebookの確認
関数もGPUから呼び出すようにしてみるぞ
”notebooks_numba_cuDF_integration.ipynb”で関数の処理がCPUで行われているので関数の処理もGPU化します。
その際にnumbaを使用します。numbaはPythonの仮想マシンコードを取得し、LLVM IRにコンパイル、LLVMを使用してネイティブコードに変換するので環境に応じたコード生成が可能です。今回はGPUに適したコードに変換するように処理を行います。下記のように関数に@cuda.jitを加えるだけで動作します。
@cuda.jit
def double_kernel(result, array_len):
"""
double each element of the array
"""
i = cuda.grid(1)
if i < array_len:
result[i] = result[i] * 2.0
下記で動作確認用のサンプルデータを作成します。
array_len = 1000
df = cudf.DataFrame()
df['in'] = np.arange(array_len, dtype=np.float64)
スレッド数とデータサイズから導出されるブロック数を出します。関数を呼び出す際はブロック数とスレッド数を渡してからデータを関数に渡します。
number_of_threads = 128
number_of_blocks = (array_len + (number_of_threads - 1)) // number_of_threads
before = df['in'].sum()
gpu_array = df['in'].values
print(type(gpu_array))
double_kernel[(number_of_blocks,), (number_of_threads,)](gpu_array, array_len)
after = df['in'].sum()
assert(np.isclose(before * 2.0, after))
続いて移動平均に対してchunkごとに処理をした場合にCPUとGPUでは差が出るのかをみてみます。移動平均とは下記の図のようなイメージになります。全体で平均を取らずに的幅を決めて、その窓幅の範囲で平均を計算する処理です。

先程同様にサンプルデータを作成します。ただし先程よりは大きなサンプルデータにします。元のNotebookではarray_lenは5e8でしたが私の環境ではMemory Out of Errorが発生したので5e7に減らしています。
array_len = int(5e7)
average_window = 3000
number_of_threads = 128
number_of_blocks = (array_len + (number_of_threads - 1)) // number_of_threads
df = cudf.DataFrame()
df['in'] = np.arange(array_len, dtype=np.float64)
df['out'] = np.arange(array_len, dtype=np.float64)
移動平均を導出する関数を設定します。内部処理ですが
- 移動平均を計算するためのnumba専用のローカル変数sを指定します。
- ローカル変数sを初期化します。
- グリッド情報iを取得します。
- グリッド情報iからデータサイズ外にアクセスしていないかを確認します。
- 移動平均を計算する幅の範囲内かを確認します。
- 移動平均を計算します。ここでi-jになっている点ですがiが移動平均より大きい場合に処理をするようになっているためこのような処理になっています。
@cuda.jit
def kernel1(in_arr, out_arr, average_length, arr_len):
s = numba.cuda.local.array(1, numba.float64)
s[0] = 0.0
i = cuda.grid(1)
if i < arr_len:
if i < average_length-1:
out_arr[i] = np.inf
else:
for j in range(0, average_length):
s[0] += in_arr[i-j]
out_arr[i] = s[0] / np.float64(average_length)
一度目の計算はコンパイル処理が入ります。そのため計算速度が遅くなります。`cuda.synchronize()` でcudaのすべてのthread処理が終わってから次の処理が行えるように同期処理をおこないます。
gpu_in = df['in'].values
gpu_out = df['out'].values
start = time.time()
kernel1[(number_of_blocks,), (number_of_threads,)](gpu_in, gpu_out,
average_window, array_len)
cuda.synchronize()
end = time.time()
print('Numba with compile time', end-start)
pandasで同様に処理を行う場合は下記のようなコードになります。
pdf = pd.DataFrame()
pdf['in'] = np.arange(array_len, dtype=np.float64)
start = time.time()
pdf['out'] = pdf.rolling(average_window).mean()
end = time.time()
print('pandas time', end-start)
結果ですがなぜかnumbaで処理した方が遅くなりました。これはデータのサイズが小さかったからなのか不明です。
Numba with compile time 1.3360743522644043
Numba without compile time 1.169957160949707
pandas time 0.5781576633453369
次に同様の処理をshared Memoryを使用して行います。これを使用することで同一のブロック内のスレッドなら値を共有できるためグローバルメモリにアクセスするよりも高速に動作できます。
- shared Memoryのサイズを指定。スレッド数と移動平均幅分確保します。
- ブロックサイズを取得します。
- shared Memoryを確保します。
- グリッド情報を取得します。
- スレッドIDを取得します。
- ブロックIDを取得します。
- ブロックサイズとブロックIDから現在のデータの位置情報を取得します。
- shared Memoryにグリッド上のデータを代入します。
- 処理を同期させます。
- shared Memoryに窓幅分のデータを代入します
- あとは先程同様に移動平均を計算します。違う点はデータをshared Memoryから取得している点になります。
shared_buffer_size = number_of_threads + average_window - 1
@cuda.jit
def kernel1(in_arr, out_arr, average_length, arr_len):
block_size = cuda.blockDim.x
shared = cuda.shared.array(shape=(shared_buffer_size),
dtype=numba.float64)
i = cuda.grid(1)
tx = cuda.threadIdx.x
# Block id in a 1D grid
bid = cuda.blockIdx.x
starting_id = bid * block_size
shared[tx + average_length - 1] = in_arr[i]
cuda.syncthreads()
for j in range(0, average_length - 1, block_size):
if (tx + j) < average_length - 1:
shared[tx + j] = in_arr[starting_id -
average_length + 1 +
tx + j]
cuda.syncthreads()
s = numba.cuda.local.array(1, numba.float64)
s[0] = 0.0
if i < arr_len:
if i < average_length-1:
out_arr[i] = np.inf
else:
for j in range(0, average_length):
s[0] += shared[tx + average_length - 1 - j]
out_arr[i] = s[0] / np.float64(average_length)
こちらもpandasより遅い結果になりました。shared memoryを使用しないより遅くなっています。
Numba with compile time 1.5754859447479248
Numba without compile time 1.2757561206817627
pandas time 0.5760288238525391
通常Numbaを使用した方が早くなると思うのですが以外に結果が期待したものにはなりませんでした。ただしGPUが少し古いので最新のGPUにする、GPUメモリのサイズを大きくすると改善するかもしれません。
GPUを使える環境であればpandasからcudfに変えて高速に動作できるか試してみても良いと思います!